This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 13bd46b511 [Improve][E2E] modify the method of obtaining JobId (#7880)
13bd46b511 is described below
commit 13bd46b511e1d8e9bb5fe4da5ba740ba4b41c577
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Oct 23 14:23:54 2024 +0800
[Improve][E2E] modify the method of obtaining JobId (#7880)
---
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 24 ++++-------
.../seatunnel/cdc/postgres/OpengaussCDCIT.java | 42 ++++++-------------
.../seatunnel/cdc/oracle/OracleCDCIT.java | 22 ++++------
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 43 ++++++-------------
.../seatunnel/e2e/connector/tidb/TiDBCDCIT.java | 22 +++-------
.../e2e/connector/paimon/PaimonSinkCDCIT.java | 11 +++--
.../seatunnel/e2e/common/util/JobIdGenerator.java | 4 +-
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 48 +++++-----------------
8 files changed, 64 insertions(+), 152 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index f5057a4fd0..bf7e8d8fe7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -49,8 +50,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Stream;
import static org.awaitility.Awaitility.await;
@@ -329,11 +328,13 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
+ Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
+
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -365,26 +366,15 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
.pollInterval(1000, TimeUnit.MILLISECONDS)
.until(() -> getConnectionStatus("st_user_sink").size() == 1);
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
-
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId);
+
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index 5529c82396..ed3fdd74b4 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -288,12 +289,14 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
+ Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
+
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -319,19 +322,7 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
OPENGAUSS_SCHEMA,
SINK_TABLE_1)))));
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf
\\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// Restore job with add a new table
CompletableFuture.supplyAsync(
@@ -339,7 +330,7 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
try {
container.restoreJob(
"/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf",
- jobId);
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -397,12 +388,14 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
+ Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/opengausscdc_to_opengauss_test_add_Filed.conf");
+
"/opengausscdc_to_opengauss_test_add_Filed.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -425,19 +418,7 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
OPENGAUSS_SCHEMA,
SINK_TABLE_3)))));
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// add filed add insert source table data
addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
@@ -449,7 +430,8 @@ public class OpengaussCDCIT extends TestSuiteBase
implements TestResource {
() -> {
try {
container.restoreJob(
-
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
+
"/opengausscdc_to_opengauss_test_add_Filed.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 03cd2039b0..86282b7ff0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -390,11 +391,13 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
insertSourceTable(DATABASE, SOURCE_TABLE1);
insertSourceTable(DATABASE, SOURCE_TABLE2);
+ Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf");
+
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -432,26 +435,15 @@ public class OracleCDCIT extends TestSuiteBase implements
TestResource {
getSourceQuerySQL(
DATABASE, SINK_TABLE1)))));
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
-
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", jobId);
+
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 5b6d810de7..3abca057fb 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -274,12 +275,14 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testMultiTableWithRestore(TestContainer container)
throws IOException, InterruptedException {
+ Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf");
+
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -305,26 +308,15 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
POSTGRESQL_SCHEMA,
SINK_TABLE_1)))));
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// Restore job with add a new table
CompletableFuture.supplyAsync(
() -> {
try {
container.restoreJob(
-
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf", jobId);
+
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -382,12 +374,14 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
disabledReason = "Currently SPARK and FLINK do not support
restore")
public void testAddFiledWithRestore(TestContainer container)
throws IOException, InterruptedException {
+ Long jobId = JobIdGenerator.newJobId();
try {
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/postgrescdc_to_postgres_test_add_Filed.conf");
+
"/postgrescdc_to_postgres_test_add_Filed.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -410,19 +404,7 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
POSTGRESQL_SCHEMA,
SINK_TABLE_3)))));
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
-
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// add filed add insert source table data
addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
@@ -434,7 +416,8 @@ public class PostgresCDCIT extends TestSuiteBase implements
TestResource {
() -> {
try {
container.restoreJob(
-
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
+
"/postgrescdc_to_postgres_test_add_Filed.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
index f132264532..78c7077940 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
@@ -41,8 +42,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import static org.awaitility.Awaitility.await;
@@ -167,11 +166,11 @@ public class TiDBCDCIT extends TiDBTestBase implements
TestResource {
// Clear related content to ensure that multiple operations are not
affected
clearTable(TIDB_DATABASE, SOURCE_TABLE);
clearTable(TIDB_DATABASE, SINK_TABLE);
-
+ Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
- container.executeJob("/tidb/tidbcdc_to_tidb.conf");
+ container.executeJob("/tidb/tidbcdc_to_tidb.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -192,24 +191,13 @@ public class TiDBCDCIT extends TiDBTestBase implements
TestResource {
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
});
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job tidbcdc_to_tidb.conf
\\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
- String jobId;
- if (matcher.matches()) {
- jobId = matcher.group(1);
- } else {
- throw new RuntimeException("Can not find jobId");
- }
- Assertions.assertEquals(0,
container.savepointJob(jobId).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// Restore job
CompletableFuture.supplyAsync(
() -> {
try {
- container.restoreJob("/tidb/tidbcdc_to_tidb.conf",
jobId);
+ container.restoreJob("/tidb/tidbcdc_to_tidb.conf",
String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 293cf6c76e..05d6467931 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -559,7 +559,9 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
TimeUnit.SECONDS.sleep(20);
String[] jobIds =
new String[] {
- JobIdGenerator.newJobId(), JobIdGenerator.newJobId(),
JobIdGenerator.newJobId()
+ String.valueOf(JobIdGenerator.newJobId()),
+ String.valueOf(JobIdGenerator.newJobId()),
+ String.valueOf(JobIdGenerator.newJobId())
};
log.info("jobIds: {}", Arrays.toString(jobIds));
List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -641,14 +643,15 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
@TestTemplate
public void testChangelogFullCompaction(TestContainer container) throws
Exception {
- String jobId = JobIdGenerator.newJobId();
+ Long jobId = JobIdGenerator.newJobId();
log.info("jobId: {}", jobId);
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.runAsync(
() -> {
try {
container.executeJob(
-
"/changelog_fake_cdc_sink_paimon_case2.conf", jobId);
+
"/changelog_fake_cdc_sink_paimon_case2.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
throw new SeaTunnelException(e);
}
@@ -657,7 +660,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase
implements TestResource {
TimeUnit.SECONDS.sleep(20);
changeLogEnabled = true;
// cancel stream job
- container.cancelJob(jobId);
+ container.cancelJob(String.valueOf(jobId));
TimeUnit.SECONDS.sleep(5);
// copy paimon to local
container.executeExtraCommands(containerExtendedFactory);
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
index 6904593b24..08fe26893f 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
public class JobIdGenerator {
- public static String newJobId() {
- return
String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong()));
+ public static Long newJobId() {
+ return Math.abs(ThreadLocalRandom.current().nextLong());
}
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 5ee0a1e7bd..f415082941 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.container.TestContainerId;
import
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
@@ -96,12 +97,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
public void testZetaStreamingCheckpointInterval(TestContainer container)
throws IOException, InterruptedException, ExecutionException {
// start job
+ Long jobId = JobIdGenerator.newJobId();
CompletableFuture<Container.ExecResult> startFuture =
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" +
e.getMessage());
throw new RuntimeException(e);
@@ -109,24 +112,9 @@ public class CheckpointEnableIT extends TestSuiteBase {
});
// wait obtain job id
- AtomicReference<String> jobId = new AtomicReference<>();
- await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> {
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
stream_fakesource_to_localfile_interval.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher =
jobIdPattern.matcher(container.getServerLogs());
- if (matcher.matches()) {
- jobId.set(matcher.group(1));
- }
- Assertions.assertNotNull(jobId.get());
- });
-
Thread.sleep(15000);
Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
- Assertions.assertEquals(0,
container.savepointJob(jobId.get()).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
Assertions.assertEquals(0, startFuture.get().getExitCode());
// restore job
CompletableFuture.supplyAsync(
@@ -134,7 +122,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
try {
return container.restoreJob(
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
- jobId.get());
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
@@ -164,36 +152,22 @@ public class CheckpointEnableIT extends TestSuiteBase {
public void testZetaStreamingCheckpointNoInterval(TestContainer container)
throws IOException, InterruptedException {
// start job
+ Long jobId = JobIdGenerator.newJobId();
CompletableFuture.supplyAsync(
() -> {
try {
return container.executeJob(
-
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf");
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
+ String.valueOf(jobId));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});
- // wait obtain job id
- AtomicReference<String> jobId = new AtomicReference<>();
- await().atMost(60000, TimeUnit.MILLISECONDS)
- .untilAsserted(
- () -> {
- Pattern jobIdPattern =
- Pattern.compile(
- ".*Init JobMaster for Job
stream_fakesource_to_localfile.conf \\(([0-9]*)\\).*",
- Pattern.DOTALL);
- Matcher matcher =
jobIdPattern.matcher(container.getServerLogs());
- if (matcher.matches()) {
- jobId.set(matcher.group(1));
- }
- Assertions.assertNotNull(jobId.get());
- });
-
Thread.sleep(15000);
Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
- Assertions.assertEquals(0,
container.savepointJob(jobId.get()).getExitCode());
+ Assertions.assertEquals(0,
container.savepointJob(String.valueOf(jobId)).getExitCode());
// restore job
CompletableFuture.supplyAsync(
@@ -202,7 +176,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
return container
.restoreJob(
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
- jobId.get())
+ String.valueOf(jobId))
.getExitCode();
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());