This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 13bd46b511 [Improve][E2E] modify the method of obtaining JobId (#7880)
13bd46b511 is described below

commit 13bd46b511e1d8e9bb5fe4da5ba740ba4b41c577
Author: zhangdonghao <[email protected]>
AuthorDate: Wed Oct 23 14:23:54 2024 +0800

    [Improve][E2E] modify the method of obtaining JobId (#7880)
---
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 24 ++++-------
 .../seatunnel/cdc/postgres/OpengaussCDCIT.java     | 42 ++++++-------------
 .../seatunnel/cdc/oracle/OracleCDCIT.java          | 22 ++++------
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 43 ++++++-------------
 .../seatunnel/e2e/connector/tidb/TiDBCDCIT.java    | 22 +++-------
 .../e2e/connector/paimon/PaimonSinkCDCIT.java      | 11 +++--
 .../seatunnel/e2e/common/util/JobIdGenerator.java  |  4 +-
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   | 48 +++++-----------------
 8 files changed, 64 insertions(+), 152 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index f5057a4fd0..bf7e8d8fe7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -49,8 +50,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import static org.awaitility.Awaitility.await;
@@ -329,11 +328,13 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_1);
         clearTable(MYSQL_DATABASE2, SOURCE_TABLE_2);
 
+        Long jobId = JobIdGenerator.newJobId();
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
                         return container.executeJob(
-                                
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf");
+                                
"/mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf",
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
@@ -365,26 +366,15 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                 .pollInterval(1000, TimeUnit.MILLISECONDS)
                 .until(() -> getConnectionStatus("st_user_sink").size() == 1);
 
-        Pattern jobIdPattern =
-                Pattern.compile(
-                        ".*Init JobMaster for Job 
mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
-                        Pattern.DOTALL);
-        Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-        String jobId;
-        if (matcher.matches()) {
-            jobId = matcher.group(1);
-        } else {
-            throw new RuntimeException("Can not find jobId");
-        }
-
-        Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
         // Restore job with add a new table
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
                         container.restoreJob(
-                                
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf", jobId);
+                                
"/mysqlcdc_to_mysql_with_multi_table_mode_two_table.conf",
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
index 5529c82396..ed3fdd74b4 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -288,12 +289,14 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
             disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
+        Long jobId = JobIdGenerator.newJobId();
         try {
             CompletableFuture.supplyAsync(
                     () -> {
                         try {
                             return container.executeJob(
-                                    
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf");
+                                    
"/opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -319,19 +322,7 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
                                                                             
OPENGAUSS_SCHEMA,
                                                                             
SINK_TABLE_1)))));
 
-            Pattern jobIdPattern =
-                    Pattern.compile(
-                            ".*Init JobMaster for Job 
opengausscdc_to_opengauss_with_multi_table_mode_one_table.conf 
\\(([0-9]*)\\).*",
-                            Pattern.DOTALL);
-            Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-            String jobId;
-            if (matcher.matches()) {
-                jobId = matcher.group(1);
-            } else {
-                throw new RuntimeException("Can not find jobId");
-            }
-
-            Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+            Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
             // Restore job with add a new table
             CompletableFuture.supplyAsync(
@@ -339,7 +330,7 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
                         try {
                             container.restoreJob(
                                     
"/opengausscdc_to_opengauss_with_multi_table_mode_two_table.conf",
-                                    jobId);
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -397,12 +388,14 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
             disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testAddFiledWithRestore(TestContainer container)
             throws IOException, InterruptedException {
+        Long jobId = JobIdGenerator.newJobId();
         try {
             CompletableFuture.supplyAsync(
                     () -> {
                         try {
                             return container.executeJob(
-                                    
"/opengausscdc_to_opengauss_test_add_Filed.conf");
+                                    
"/opengausscdc_to_opengauss_test_add_Filed.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -425,19 +418,7 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
                                                                             
OPENGAUSS_SCHEMA,
                                                                             
SINK_TABLE_3)))));
 
-            Pattern jobIdPattern =
-                    Pattern.compile(
-                            ".*Init JobMaster for Job 
opengausscdc_to_opengauss_test_add_Filed.conf \\(([0-9]*)\\).*",
-                            Pattern.DOTALL);
-            Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-            String jobId;
-            if (matcher.matches()) {
-                jobId = matcher.group(1);
-            } else {
-                throw new RuntimeException("Can not find jobId");
-            }
-
-            Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+            Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
             // add filed add insert source table data
             addFieldsForTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_3);
@@ -449,7 +430,8 @@ public class OpengaussCDCIT extends TestSuiteBase 
implements TestResource {
                     () -> {
                         try {
                             container.restoreJob(
-                                    
"/opengausscdc_to_opengauss_test_add_Filed.conf", jobId);
+                                    
"/opengausscdc_to_opengauss_test_add_Filed.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 03cd2039b0..86282b7ff0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -390,11 +391,13 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
         insertSourceTable(DATABASE, SOURCE_TABLE1);
         insertSourceTable(DATABASE, SOURCE_TABLE2);
 
+        Long jobId = JobIdGenerator.newJobId();
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
                         return container.executeJob(
-                                
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf");
+                                
"/oraclecdc_to_oracle_with_multi_table_mode_one_table.conf",
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
@@ -432,26 +435,15 @@ public class OracleCDCIT extends TestSuiteBase implements 
TestResource {
                                                                 
getSourceQuerySQL(
                                                                         
DATABASE, SINK_TABLE1)))));
 
-        Pattern jobIdPattern =
-                Pattern.compile(
-                        ".*Init JobMaster for Job 
oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
-                        Pattern.DOTALL);
-        Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-        String jobId;
-        if (matcher.matches()) {
-            jobId = matcher.group(1);
-        } else {
-            throw new RuntimeException("Can not find jobId");
-        }
-
-        Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
         // Restore job with add a new table
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
                         container.restoreJob(
-                                
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf", jobId);
+                                
"/oraclecdc_to_oracle_with_multi_table_mode_two_table.conf",
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 5b6d810de7..3abca057fb 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -274,12 +275,14 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
             disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testMultiTableWithRestore(TestContainer container)
             throws IOException, InterruptedException {
+        Long jobId = JobIdGenerator.newJobId();
         try {
             CompletableFuture.supplyAsync(
                     () -> {
                         try {
                             return container.executeJob(
-                                    
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf");
+                                    
"/pgcdc_to_pg_with_multi_table_mode_one_table.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -305,26 +308,15 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
                                                                             
POSTGRESQL_SCHEMA,
                                                                             
SINK_TABLE_1)))));
 
-            Pattern jobIdPattern =
-                    Pattern.compile(
-                            ".*Init JobMaster for Job 
pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*",
-                            Pattern.DOTALL);
-            Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-            String jobId;
-            if (matcher.matches()) {
-                jobId = matcher.group(1);
-            } else {
-                throw new RuntimeException("Can not find jobId");
-            }
-
-            Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+            Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
             // Restore job with add a new table
             CompletableFuture.supplyAsync(
                     () -> {
                         try {
                             container.restoreJob(
-                                    
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf", jobId);
+                                    
"/pgcdc_to_pg_with_multi_table_mode_two_table.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -382,12 +374,14 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
             disabledReason = "Currently SPARK and FLINK do not support 
restore")
     public void testAddFiledWithRestore(TestContainer container)
             throws IOException, InterruptedException {
+        Long jobId = JobIdGenerator.newJobId();
         try {
             CompletableFuture.supplyAsync(
                     () -> {
                         try {
                             return container.executeJob(
-                                    
"/postgrescdc_to_postgres_test_add_Filed.conf");
+                                    
"/postgrescdc_to_postgres_test_add_Filed.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
@@ -410,19 +404,7 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
                                                                             
POSTGRESQL_SCHEMA,
                                                                             
SINK_TABLE_3)))));
 
-            Pattern jobIdPattern =
-                    Pattern.compile(
-                            ".*Init JobMaster for Job 
postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*",
-                            Pattern.DOTALL);
-            Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-            String jobId;
-            if (matcher.matches()) {
-                jobId = matcher.group(1);
-            } else {
-                throw new RuntimeException("Can not find jobId");
-            }
-
-            Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+            Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
             // add filed add insert source table data
             addFieldsForTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_3);
@@ -434,7 +416,8 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
                     () -> {
                         try {
                             container.restoreJob(
-                                    
"/postgrescdc_to_postgres_test_add_Filed.conf", jobId);
+                                    
"/postgrescdc_to_postgres_test_add_Filed.conf",
+                                    String.valueOf(jobId));
                         } catch (Exception e) {
                             log.error("Commit task exception :" + 
e.getMessage());
                             throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
index f132264532..78c7077940 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-tidb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tidb/TiDBCDCIT.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -41,8 +42,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import static org.awaitility.Awaitility.await;
 
@@ -167,11 +166,11 @@ public class TiDBCDCIT extends TiDBTestBase implements 
TestResource {
         // Clear related content to ensure that multiple operations are not 
affected
         clearTable(TIDB_DATABASE, SOURCE_TABLE);
         clearTable(TIDB_DATABASE, SINK_TABLE);
-
+        Long jobId = JobIdGenerator.newJobId();
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
-                        container.executeJob("/tidb/tidbcdc_to_tidb.conf");
+                        container.executeJob("/tidb/tidbcdc_to_tidb.conf", 
String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
@@ -192,24 +191,13 @@ public class TiDBCDCIT extends TiDBTestBase implements 
TestResource {
                                             
query(getSinkQuerySQL(TIDB_DATABASE, SINK_TABLE))));
                         });
 
-        Pattern jobIdPattern =
-                Pattern.compile(
-                        ".*Init JobMaster for Job tidbcdc_to_tidb.conf 
\\(([0-9]*)\\).*",
-                        Pattern.DOTALL);
-        Matcher matcher = jobIdPattern.matcher(container.getServerLogs());
-        String jobId;
-        if (matcher.matches()) {
-            jobId = matcher.group(1);
-        } else {
-            throw new RuntimeException("Can not find jobId");
-        }
-        Assertions.assertEquals(0, 
container.savepointJob(jobId).getExitCode());
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
         // Restore job
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
-                        container.restoreJob("/tidb/tidbcdc_to_tidb.conf", 
jobId);
+                        container.restoreJob("/tidb/tidbcdc_to_tidb.conf", 
String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
index 293cf6c76e..05d6467931 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java
@@ -559,7 +559,9 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         TimeUnit.SECONDS.sleep(20);
         String[] jobIds =
                 new String[] {
-                    JobIdGenerator.newJobId(), JobIdGenerator.newJobId(), 
JobIdGenerator.newJobId()
+                    String.valueOf(JobIdGenerator.newJobId()),
+                    String.valueOf(JobIdGenerator.newJobId()),
+                    String.valueOf(JobIdGenerator.newJobId())
                 };
         log.info("jobIds: {}", Arrays.toString(jobIds));
         List<CompletableFuture<Void>> futures = new ArrayList<>();
@@ -641,14 +643,15 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
 
     @TestTemplate
     public void testChangelogFullCompaction(TestContainer container) throws 
Exception {
-        String jobId = JobIdGenerator.newJobId();
+        Long jobId = JobIdGenerator.newJobId();
         log.info("jobId: {}", jobId);
         CompletableFuture<Void> voidCompletableFuture =
                 CompletableFuture.runAsync(
                         () -> {
                             try {
                                 container.executeJob(
-                                        
"/changelog_fake_cdc_sink_paimon_case2.conf", jobId);
+                                        
"/changelog_fake_cdc_sink_paimon_case2.conf",
+                                        String.valueOf(jobId));
                             } catch (Exception e) {
                                 throw new SeaTunnelException(e);
                             }
@@ -657,7 +660,7 @@ public class PaimonSinkCDCIT extends TestSuiteBase 
implements TestResource {
         TimeUnit.SECONDS.sleep(20);
         changeLogEnabled = true;
         // cancel stream job
-        container.cancelJob(jobId);
+        container.cancelJob(String.valueOf(jobId));
         TimeUnit.SECONDS.sleep(5);
         // copy paimon to local
         container.executeExtraCommands(containerExtendedFactory);
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
index 6904593b24..08fe26893f 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/JobIdGenerator.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 public class JobIdGenerator {
 
-    public static String newJobId() {
-        return 
String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong()));
+    public static Long newJobId() {
+        return Math.abs(ThreadLocalRandom.current().nextLong());
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index 5ee0a1e7bd..f415082941 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.e2e.common.container.TestContainer;
 import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import 
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
@@ -96,12 +97,14 @@ public class CheckpointEnableIT extends TestSuiteBase {
     public void testZetaStreamingCheckpointInterval(TestContainer container)
             throws IOException, InterruptedException, ExecutionException {
         // start job
+        Long jobId = JobIdGenerator.newJobId();
         CompletableFuture<Container.ExecResult> startFuture =
                 CompletableFuture.supplyAsync(
                         () -> {
                             try {
                                 return container.executeJob(
-                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+                                        
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+                                        String.valueOf(jobId));
                             } catch (Exception e) {
                                 log.error("Commit task exception :" + 
e.getMessage());
                                 throw new RuntimeException(e);
@@ -109,24 +112,9 @@ public class CheckpointEnableIT extends TestSuiteBase {
                         });
 
         // wait obtain job id
-        AtomicReference<String> jobId = new AtomicReference<>();
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Pattern jobIdPattern =
-                                    Pattern.compile(
-                                            ".*Init JobMaster for Job 
stream_fakesource_to_localfile_interval.conf \\(([0-9]*)\\).*",
-                                            Pattern.DOTALL);
-                            Matcher matcher = 
jobIdPattern.matcher(container.getServerLogs());
-                            if (matcher.matches()) {
-                                jobId.set(matcher.group(1));
-                            }
-                            Assertions.assertNotNull(jobId.get());
-                        });
-
         Thread.sleep(15000);
         Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
-        Assertions.assertEquals(0, 
container.savepointJob(jobId.get()).getExitCode());
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
         Assertions.assertEquals(0, startFuture.get().getExitCode());
         // restore job
         CompletableFuture.supplyAsync(
@@ -134,7 +122,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
                     try {
                         return container.restoreJob(
                                 
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
-                                jobId.get());
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
@@ -164,36 +152,22 @@ public class CheckpointEnableIT extends TestSuiteBase {
     public void testZetaStreamingCheckpointNoInterval(TestContainer container)
             throws IOException, InterruptedException {
         // start job
+        Long jobId = JobIdGenerator.newJobId();
         CompletableFuture.supplyAsync(
                 () -> {
                     try {
                         return container.executeJob(
-                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf");
+                                
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
+                                String.valueOf(jobId));
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());
                         throw new RuntimeException(e);
                     }
                 });
 
-        // wait obtain job id
-        AtomicReference<String> jobId = new AtomicReference<>();
-        await().atMost(60000, TimeUnit.MILLISECONDS)
-                .untilAsserted(
-                        () -> {
-                            Pattern jobIdPattern =
-                                    Pattern.compile(
-                                            ".*Init JobMaster for Job 
stream_fakesource_to_localfile.conf \\(([0-9]*)\\).*",
-                                            Pattern.DOTALL);
-                            Matcher matcher = 
jobIdPattern.matcher(container.getServerLogs());
-                            if (matcher.matches()) {
-                                jobId.set(matcher.group(1));
-                            }
-                            Assertions.assertNotNull(jobId.get());
-                        });
-
         Thread.sleep(15000);
         Assertions.assertTrue(container.getServerLogs().contains("checkpoint 
is enabled"));
-        Assertions.assertEquals(0, 
container.savepointJob(jobId.get()).getExitCode());
+        Assertions.assertEquals(0, 
container.savepointJob(String.valueOf(jobId)).getExitCode());
 
         // restore job
         CompletableFuture.supplyAsync(
@@ -202,7 +176,7 @@ public class CheckpointEnableIT extends TestSuiteBase {
                         return container
                                 .restoreJob(
                                         
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
-                                        jobId.get())
+                                        String.valueOf(jobId))
                                 .getExitCode();
                     } catch (Exception e) {
                         log.error("Commit task exception :" + e.getMessage());


Reply via email to