This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 068bbf70c2 [Improve] Add custom job id arg in client (#6943)
068bbf70c2 is described below

commit 068bbf70c2849dad724a101e8bcc36af4e08dcb8
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jun 6 10:14:32 2024 +0800

    [Improve] Add custom job id arg in client (#6943)
---
 .../starter/seatunnel/args/ClientCommandArgs.java  |  5 +++
 .../seatunnel/command/ClientExecuteCommand.java    |  5 ++-
 .../seatunnel/args/ClientCommandArgsTest.java      | 18 +++++++++-
 .../seatunnel/engine/client/SeaTunnelClient.java   | 13 +++++++-
 .../engine/client/SeaTunnelClientInstance.java     |  7 ++++
 .../client/job/ClientJobExecutionEnvironment.java  | 14 +++++---
 .../engine/client/SeaTunnelClientTest.java         | 39 ++++++++++++++++++++++
 7 files changed, 94 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9504c9cb2c..67293ba2a3 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
             description = "Get job metrics by JobId")
     private String metricsJobId;
 
+    @Parameter(
+            names = {"--set-job-id"},
+            description = "Set custom job id for job")
+    private String customJobId;
+
     @Parameter(
             names = {"--get_running_job_metrics"},
             description = "Gets metrics for running jobs")
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index d1e8b78009..4527d98d66 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -144,7 +144,10 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
                                     configFile.toString(),
                                     clientCommandArgs.getVariables(),
                                     jobConfig,
-                                    seaTunnelConfig);
+                                    seaTunnelConfig,
+                                    clientCommandArgs.getCustomJobId() != null
+                                            ? 
Long.parseLong(clientCommandArgs.getCustomJobId())
+                                            : null);
                 }
 
                 // get job start time
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
index 87d4aa5b72..b40cc6ad3b 100644
--- 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
+++ 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
@@ -41,6 +41,15 @@ public class ClientCommandArgsTest {
         Assertions.assertDoesNotThrow(() -> 
SeaTunnel.run(clientCommandArgs.buildCommand()));
     }
 
+    @Test
+    public void testSetJobId() throws FileNotFoundException, 
URISyntaxException {
+        String configurePath = "/config/fake_to_inmemory.json";
+        String configFile = 
MultiTableSinkTest.getTestConfigFile(configurePath);
+        long jobId = 999;
+        ClientCommandArgs clientCommandArgs = 
buildClientCommandArgs(configFile, jobId);
+        Assertions.assertDoesNotThrow(() -> 
SeaTunnel.run(clientCommandArgs.buildCommand()));
+    }
+
     @Test
     public void testExecuteClientCommandArgsWithoutPluginName()
             throws FileNotFoundException, URISyntaxException {
@@ -58,12 +67,19 @@ public class ClientCommandArgsTest {
                 commandExecuteException.getCause().getMessage());
     }
 
-    private static ClientCommandArgs buildClientCommandArgs(String configFile) 
{
+    private static ClientCommandArgs buildClientCommandArgs(String configFile, 
Long jobId) {
         ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
         clientCommandArgs.setVariables(new ArrayList<>());
         clientCommandArgs.setConfigFile(configFile);
         clientCommandArgs.setMasterType(MasterType.LOCAL);
         clientCommandArgs.setCheckConfig(false);
+        if (jobId != null) {
+            clientCommandArgs.setCustomJobId(String.valueOf(jobId));
+        }
         return clientCommandArgs;
     }
+
+    private static ClientCommandArgs buildClientCommandArgs(String configFile) 
{
+        return buildClientCommandArgs(configFile, null);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 2d7508ee2d..0e89171073 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -64,7 +64,18 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance, AutoCloseable {
             @NonNull JobConfig jobConfig,
             @NonNull SeaTunnelConfig seaTunnelConfig) {
         return new ClientJobExecutionEnvironment(
-                jobConfig, filePath, variables, hazelcastClient, 
seaTunnelConfig);
+                jobConfig, filePath, variables, hazelcastClient, 
seaTunnelConfig, null);
+    }
+
+    @Override
+    public ClientJobExecutionEnvironment createExecutionContext(
+            @NonNull String filePath,
+            List<String> variables,
+            @NonNull JobConfig jobConfig,
+            @NonNull SeaTunnelConfig seaTunnelConfig,
+            Long jobId) {
+        return new ClientJobExecutionEnvironment(
+                jobConfig, filePath, variables, hazelcastClient, 
seaTunnelConfig, jobId);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 36a3f2e36e..a275f3cab7 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -39,6 +39,13 @@ public interface SeaTunnelClientInstance {
             @NonNull JobConfig config,
             @NonNull SeaTunnelConfig seaTunnelConfig);
 
+    ClientJobExecutionEnvironment createExecutionContext(
+            @NonNull String filePath,
+            List<String> variables,
+            @NonNull JobConfig config,
+            @NonNull SeaTunnelConfig seaTunnelConfig,
+            Long jobId);
+
     ClientJobExecutionEnvironment restoreExecutionContext(
             @NonNull String filePath,
             @NonNull JobConfig config,
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 18f1a7376f..6e33354351 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -67,8 +67,13 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
         this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
         this.jobClient = new JobClient(seaTunnelHazelcastClient);
         this.seaTunnelConfig = seaTunnelConfig;
-        this.jobConfig.setJobContext(
-                new JobContext(isStartWithSavePoint ? jobId : 
jobClient.getNewJobId()));
+        Long finalJobId;
+        if (isStartWithSavePoint || jobId != null) {
+            finalJobId = jobId;
+        } else {
+            finalJobId = jobClient.getNewJobId();
+        }
+        this.jobConfig.setJobContext(new JobContext(finalJobId));
         this.connectorPackageClient = new 
ConnectorPackageClient(seaTunnelHazelcastClient);
     }
 
@@ -77,7 +82,8 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
             String jobFilePath,
             List<String> variables,
             SeaTunnelHazelcastClient seaTunnelHazelcastClient,
-            SeaTunnelConfig seaTunnelConfig) {
+            SeaTunnelConfig seaTunnelConfig,
+            Long jobId) {
         this(
                 jobConfig,
                 jobFilePath,
@@ -85,7 +91,7 @@ public class ClientJobExecutionEnvironment extends 
AbstractJobEnvironment {
                 seaTunnelHazelcastClient,
                 seaTunnelConfig,
                 false,
-                null);
+                jobId);
     }
 
     /** Search all jars in SEATUNNEL_HOME/plugins */
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0fbbe80572..f16f61a7f0 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -47,6 +47,7 @@ import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.ArrayList;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -312,6 +313,44 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testSetJobId() throws ExecutionException, InterruptedException 
{
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/streaming_fake_to_console.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("testSetJobId");
+        long jobId = 12345;
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
+        try {
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    seaTunnelClient.createExecutionContext(
+                            filePath, new ArrayList<>(), jobConfig, 
SEATUNNEL_CONFIG, jobId);
+
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+            Assertions.assertEquals(jobId, clientJobProxy.getJobId());
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "RUNNING", 
jobClient.getJobStatus(jobId)));
+
+            jobClient.cancelJob(jobId);
+
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "CANCELED", 
jobClient.getJobStatus(jobId)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            seaTunnelClient.close();
+        }
+    }
+
     @Test
     public void testGetJobInfo() {
         Common.setDeployMode(DeployMode.CLIENT);

Reply via email to