This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 068bbf70c2 [Improve] Add custom job id arg in client (#6943)
068bbf70c2 is described below
commit 068bbf70c2849dad724a101e8bcc36af4e08dcb8
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jun 6 10:14:32 2024 +0800
[Improve] Add custom job id arg in client (#6943)
---
.../starter/seatunnel/args/ClientCommandArgs.java | 5 +++
.../seatunnel/command/ClientExecuteCommand.java | 5 ++-
.../seatunnel/args/ClientCommandArgsTest.java | 18 +++++++++-
.../seatunnel/engine/client/SeaTunnelClient.java | 13 +++++++-
.../engine/client/SeaTunnelClientInstance.java | 7 ++++
.../client/job/ClientJobExecutionEnvironment.java | 14 +++++---
.../engine/client/SeaTunnelClientTest.java | 39 ++++++++++++++++++++++
7 files changed, 94 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9504c9cb2c..67293ba2a3 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Get job metrics by JobId")
private String metricsJobId;
+ @Parameter(
+ names = {"--set-job-id"},
+ description = "Set custom job id for job")
+ private String customJobId;
+
@Parameter(
names = {"--get_running_job_metrics"},
description = "Gets metrics for running jobs")
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index d1e8b78009..4527d98d66 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -144,7 +144,10 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
configFile.toString(),
clientCommandArgs.getVariables(),
jobConfig,
- seaTunnelConfig);
+ seaTunnelConfig,
+ clientCommandArgs.getCustomJobId() != null
+ ?
Long.parseLong(clientCommandArgs.getCustomJobId())
+ : null);
}
// get job start time
diff --git
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
index 87d4aa5b72..b40cc6ad3b 100644
---
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
+++
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java
@@ -41,6 +41,15 @@ public class ClientCommandArgsTest {
Assertions.assertDoesNotThrow(() ->
SeaTunnel.run(clientCommandArgs.buildCommand()));
}
+ @Test
+ public void testSetJobId() throws FileNotFoundException,
URISyntaxException {
+ String configurePath = "/config/fake_to_inmemory.json";
+ String configFile =
MultiTableSinkTest.getTestConfigFile(configurePath);
+ long jobId = 999;
+ ClientCommandArgs clientCommandArgs =
buildClientCommandArgs(configFile, jobId);
+ Assertions.assertDoesNotThrow(() ->
SeaTunnel.run(clientCommandArgs.buildCommand()));
+ }
+
@Test
public void testExecuteClientCommandArgsWithoutPluginName()
throws FileNotFoundException, URISyntaxException {
@@ -58,12 +67,19 @@ public class ClientCommandArgsTest {
commandExecuteException.getCause().getMessage());
}
- private static ClientCommandArgs buildClientCommandArgs(String configFile)
{
+ private static ClientCommandArgs buildClientCommandArgs(String configFile,
Long jobId) {
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setVariables(new ArrayList<>());
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setMasterType(MasterType.LOCAL);
clientCommandArgs.setCheckConfig(false);
+ if (jobId != null) {
+ clientCommandArgs.setCustomJobId(String.valueOf(jobId));
+ }
return clientCommandArgs;
}
+
+ private static ClientCommandArgs buildClientCommandArgs(String configFile)
{
+ return buildClientCommandArgs(configFile, null);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 2d7508ee2d..0e89171073 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -64,7 +64,18 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance, AutoCloseable {
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig) {
return new ClientJobExecutionEnvironment(
- jobConfig, filePath, variables, hazelcastClient,
seaTunnelConfig);
+ jobConfig, filePath, variables, hazelcastClient,
seaTunnelConfig, null);
+ }
+
+ @Override
+ public ClientJobExecutionEnvironment createExecutionContext(
+ @NonNull String filePath,
+ List<String> variables,
+ @NonNull JobConfig jobConfig,
+ @NonNull SeaTunnelConfig seaTunnelConfig,
+ Long jobId) {
+ return new ClientJobExecutionEnvironment(
+ jobConfig, filePath, variables, hazelcastClient,
seaTunnelConfig, jobId);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
index 36a3f2e36e..a275f3cab7 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java
@@ -39,6 +39,13 @@ public interface SeaTunnelClientInstance {
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig);
+ ClientJobExecutionEnvironment createExecutionContext(
+ @NonNull String filePath,
+ List<String> variables,
+ @NonNull JobConfig config,
+ @NonNull SeaTunnelConfig seaTunnelConfig,
+ Long jobId);
+
ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
@NonNull JobConfig config,
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 18f1a7376f..6e33354351 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -67,8 +67,13 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.seaTunnelConfig = seaTunnelConfig;
- this.jobConfig.setJobContext(
- new JobContext(isStartWithSavePoint ? jobId :
jobClient.getNewJobId()));
+ Long finalJobId;
+ if (isStartWithSavePoint || jobId != null) {
+ finalJobId = jobId;
+ } else {
+ finalJobId = jobClient.getNewJobId();
+ }
+ this.jobConfig.setJobContext(new JobContext(finalJobId));
this.connectorPackageClient = new
ConnectorPackageClient(seaTunnelHazelcastClient);
}
@@ -77,7 +82,8 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
String jobFilePath,
List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
- SeaTunnelConfig seaTunnelConfig) {
+ SeaTunnelConfig seaTunnelConfig,
+ Long jobId) {
this(
jobConfig,
jobFilePath,
@@ -85,7 +91,7 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
seaTunnelHazelcastClient,
seaTunnelConfig,
false,
- null);
+ jobId);
}
/** Search all jars in SEATUNNEL_HOME/plugins */
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 0fbbe80572..f16f61a7f0 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -47,6 +47,7 @@ import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -312,6 +313,44 @@ public class SeaTunnelClientTest {
}
}
+ @Test
+ public void testSetJobId() throws ExecutionException, InterruptedException
{
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("/streaming_fake_to_console.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("testSetJobId");
+ long jobId = 12345;
+ SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+ JobClient jobClient = seaTunnelClient.getJobClient();
+ try {
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ seaTunnelClient.createExecutionContext(
+ filePath, new ArrayList<>(), jobConfig,
SEATUNNEL_CONFIG, jobId);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ Assertions.assertEquals(jobId, clientJobProxy.getJobId());
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "RUNNING",
jobClient.getJobStatus(jobId)));
+
+ jobClient.cancelJob(jobId);
+
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "CANCELED",
jobClient.getJobStatus(jobId)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ seaTunnelClient.close();
+ }
+ }
+
@Test
public void testGetJobInfo() {
Common.setDeployMode(DeployMode.CLIENT);