This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 12ed49133d [Feature][ZETA REST API] Can submit job with rest api to
Zeta master node automaticly (#5950)
12ed49133d is described below
commit 12ed49133dc8d4a50b60eb5c9f4c66dcfb09c254
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Dec 13 12:30:58 2023 +0800
[Feature][ZETA REST API] Can submit job with rest api to Zeta master node
automaticly (#5950)
---
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 486 ++++++++++++---------
.../server/rest/RestHttpGetCommandProcessor.java | 23 +-
.../server/rest/RestHttpPostCommandProcessor.java | 63 ++-
3 files changed, 357 insertions(+), 215 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index fedfa4ce25..2f53e9a475 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -36,10 +36,14 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import io.restassured.response.Response;
import lombok.extern.slf4j.Slf4j;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import static io.restassured.RestAssured.given;
@@ -53,14 +57,19 @@ public class RestApiIT {
private static ClientJobProxy clientJobProxy;
- private static HazelcastInstanceImpl hazelcastInstance;
+ private static HazelcastInstanceImpl node1;
+
+ private static HazelcastInstanceImpl node2;
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
- hazelcastInstance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+
String filePath =
TestUtils.getResource("stream_fakesource_to_file.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_file");
@@ -83,242 +92,317 @@ public class RestApiIT {
@Test
public void testGetRunningJobById() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOB_URL
- + "/"
- + clientJobProxy.getJobId())
- .then()
- .statusCode(200)
- .body("jobName", equalTo("fake_to_file"))
- .body("jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOB_URL
+ + "/"
+ +
clientJobProxy.getJobId())
+ .then()
+ .statusCode(200)
+ .body("jobName", equalTo("fake_to_file"))
+ .body("jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testGetRunningJobs() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.RUNNING_JOBS_URL)
- .then()
- .statusCode(200)
- .body("[0].jobName", equalTo("fake_to_file"))
- .body("[0].jobStatus", equalTo("RUNNING"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.RUNNING_JOBS_URL)
+ .then()
+ .statusCode(200)
+ .body("[0].jobName",
equalTo("fake_to_file"))
+ .body("[0].jobStatus", equalTo("RUNNING"));
+ });
}
@Test
public void testSystemMonitoringInformation() {
- given().get(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SYSTEM_MONITORING_INFORMATION)
- .then()
- .assertThat()
- .time(lessThan(5000L))
- .statusCode(200);
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ given().get(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.SYSTEM_MONITORING_INFORMATION)
+ .then()
+ .assertThat()
+ .time(lessThan(5000L))
+ .statusCode(200);
+ });
}
@Test
public void testSubmitJob() {
- Response response = submitJob("BATCH");
- response.then().statusCode(200).body("jobName", equalTo("test测试"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ Response response = submitJob(instance, "BATCH");
+ response.then().statusCode(200).body("jobName",
equalTo("test测试"));
+ String jobId =
response.getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
+
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer server =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy)
hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+
.createExtensionServices()
+
.get(Constant.SEATUNNEL_SERVICE_NAME);
+
+ if (server.isMasterNode()) {
+ seaTunnelServer = server;
+ }
+ }
+
+ SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId))));
+ });
}
@Test
public void testStopJob() {
- String jobId =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
-
- String parameters = "{" + "\"jobId\":" + jobId + "," +
"\"isStopWithSavePoint\":true}";
-
- given().body(parameters)
- .post(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId));
- Awaitility.await()
- .atMost(6, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.SAVEPOINT_DONE,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId))));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ String jobId =
+ submitJob(instance, "STREAMING")
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+ SeaTunnelServer seaTunnelServer = null;
- String jobId2 =
submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+ for (HazelcastInstance hazelcastInstance :
+ Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer server =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy)
hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+
.createExtensionServices()
+
.get(Constant.SEATUNNEL_SERVICE_NAME);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId2))));
- parameters = "{" + "\"jobId\":" + jobId2 + "," +
"\"isStopWithSavePoint\":false}";
-
- given().body(parameters)
- .post(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId2));
+ if (server.isMasterNode()) {
+ seaTunnelServer = server;
+ }
+ }
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.CANCELED,
- seaTunnelServer
- .getCoordinatorService()
-
.getJobStatus(Long.parseLong(jobId2))));
+ SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId))));
+
+ String parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId
+ + ","
+ + "\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
JobStatus.SAVEPOINT_DONE,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId))));
+
+ String jobId2 =
+ submitJob(instance, "STREAMING")
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId2))));
+ parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId2
+ + ","
+ + "\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
+
finalSeaTunnelServer
+
.getCoordinatorService()
+
.getJobStatus(
+
Long.parseLong(
+
jobId2))));
+ });
}
@Test
public void testStartWithSavePointWithoutJobId() {
- Response response = submitJob("BATCH", true);
- response.then()
- .statusCode(400)
- .body("message", equalTo("Please provide jobId when start with
save point."));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ Response response = submitJob("BATCH", instance,
true);
+ response.then()
+ .statusCode(400)
+ .body(
+ "message",
+ equalTo(
+ "Please provide jobId when
start with save point."));
+ });
}
@Test
public void testEncryptConfig() {
- String config =
- "{\n"
- + " \"env\": {\n"
- + " \"execution.parallelism\": 1,\n"
- + " \"shade.identifier\":\"base64\"\n"
- + " },\n"
- + " \"source\": [\n"
- + " {\n"
- + " \"plugin_name\": \"MySQL-CDC\",\n"
- + " \"schema\" : {\n"
- + " \"fields\": {\n"
- + " \"name\": \"string\",\n"
- + " \"age\": \"int\"\n"
- + " }\n"
- + " },\n"
- + " \"result_table_name\": \"fake\",\n"
- + " \"parallelism\": 1,\n"
- + " \"hostname\": \"127.0.0.1\",\n"
- + " \"username\": \"seatunnel\",\n"
- + " \"password\": \"seatunnel_password\",\n"
- + " \"table-name\": \"inventory_vwyw0n\"\n"
- + " }\n"
- + " ],\n"
- + " \"transform\": [\n"
- + " ],\n"
- + " \"sink\": [\n"
- + " {\n"
- + " \"plugin_name\": \"Clickhouse\",\n"
- + " \"host\": \"localhost:8123\",\n"
- + " \"database\": \"default\",\n"
- + " \"table\": \"fake_all\",\n"
- + " \"username\": \"seatunnel\",\n"
- + " \"password\": \"seatunnel_password\"\n"
- + " }\n"
- + " ]\n"
- + "}";
- given().body(config)
- .post(
- HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.ENCRYPT_CONFIG)
- .then()
- .statusCode(200)
- .body("source[0].result_table_name", equalTo("fake"))
- .body("source[0].username", equalTo("c2VhdHVubmVs"))
- .body("source[0].password",
equalTo("c2VhdHVubmVsX3Bhc3N3b3Jk"));
+ Arrays.asList(node2, node1)
+ .forEach(
+ instance -> {
+ String config =
+ "{\n"
+ + " \"env\": {\n"
+ + "
\"execution.parallelism\": 1,\n"
+ + "
\"shade.identifier\":\"base64\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\":
\"MySQL-CDC\",\n"
+ + " \"schema\" : {\n"
+ + " \"fields\": {\n"
+ + " \"name\":
\"string\",\n"
+ + " \"age\":
\"int\"\n"
+ + " }\n"
+ + " },\n"
+ + "
\"result_table_name\": \"fake\",\n"
+ + " \"parallelism\":
1,\n"
+ + " \"hostname\":
\"127.0.0.1\",\n"
+ + " \"username\":
\"seatunnel\",\n"
+ + " \"password\":
\"seatunnel_password\",\n"
+ + " \"table-name\":
\"inventory_vwyw0n\"\n"
+ + " }\n"
+ + " ],\n"
+ + " \"transform\": [\n"
+ + " ],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\":
\"Clickhouse\",\n"
+ + " \"host\":
\"localhost:8123\",\n"
+ + " \"database\":
\"default\",\n"
+ + " \"table\":
\"fake_all\",\n"
+ + " \"username\":
\"seatunnel\",\n"
+ + " \"password\":
\"seatunnel_password\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ given().body(config)
+ .post(
+ HOST
+ + instance.getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ +
RestConstant.ENCRYPT_CONFIG)
+ .then()
+ .statusCode(200)
+ .body("source[0].result_table_name",
equalTo("fake"))
+ .body("source[0].username",
equalTo("c2VhdHVubmVs"))
+ .body(
+ "source[0].password",
+
equalTo("c2VhdHVubmVsX3Bhc3N3b3Jk"));
+ });
}
@AfterEach
void afterClass() {
- if (hazelcastInstance != null) {
- hazelcastInstance.shutdown();
+ if (node1 != null) {
+ node1.shutdown();
+ }
+ if (node2 != null) {
+ node2.shutdown();
}
}
- private Response submitJob(String jobMode) {
- return submitJob(jobMode, false);
+ private Response submitJob(HazelcastInstanceImpl hazelcastInstance, String
jobMode) {
+ return submitJob(jobMode, hazelcastInstance, false);
}
- private Response submitJob(String jobMode, boolean isStartWithSavePoint) {
+ private Response submitJob(
+ String jobMode, HazelcastInstanceImpl hazelcastInstance, boolean
isStartWithSavePoint) {
String requestBody =
"{\n"
+ " \"env\": {\n"
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 484482322c..6ee5b46e83 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -35,6 +35,9 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.Member;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -211,7 +214,25 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
private SeaTunnelServer getSeaTunnelServer() {
Map<String, Object> extensionServices =
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
- return (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ if (!seaTunnelServer.isMasterNode()) {
+ for (HazelcastInstance hazelcastInstance :
Hazelcast.getAllHazelcastInstances()) {
+ SeaTunnelServer seaTunnelServer1 =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy) hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+
+ if (seaTunnelServer1.isMasterNode()) {
+ return seaTunnelServer1;
+ }
+ }
+ }
+ return seaTunnelServer;
}
private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 02158bbf34..9aa1e8c2e7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -32,6 +32,9 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
import org.apache.seatunnel.engine.server.utils.RestUtil;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpPostCommand;
@@ -92,7 +95,25 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
private SeaTunnelServer getSeaTunnelServer() {
Map<String, Object> extensionServices =
this.textCommandService.getNode().getNodeExtension().createExtensionServices();
- return (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+ if (!seaTunnelServer.isMasterNode()) {
+ for (HazelcastInstance hazelcastInstance :
Hazelcast.getAllHazelcastInstances()) {
+ seaTunnelServer =
+ (SeaTunnelServer)
+ ((HazelcastInstanceProxy) hazelcastInstance)
+ .getOriginal()
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+
+ if (seaTunnelServer.isMasterNode()) {
+ return seaTunnelServer;
+ }
+ }
+ }
+ return seaTunnelServer;
}
private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
@@ -115,19 +136,15 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
?
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
: null);
JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
- CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
- Data data =
- textCommandService
- .getNode()
- .nodeEngine
- .getSerializationService()
- .toData(jobImmutableInformation);
- PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- coordinatorService.submitJob(
- Long.parseLong(jobConfig.getJobContext().getJobId()),
data);
- voidPassiveCompletableFuture.join();
- Long jobId = restJobExecutionEnvironment.getJobId();
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+ Long jobId =
+ submitJob(
+ seaTunnelServer,
+ jobImmutableInformation,
+ jobConfig,
+ restJobExecutionEnvironment);
+
this.prepareResponse(
httpPostCommand,
new JsonObject()
@@ -187,4 +204,24 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
}
return requestBodyJsonNode;
}
+
+ private Long submitJob(
+ SeaTunnelServer seaTunnelServer,
+ JobImmutableInformation jobImmutableInformation,
+ JobConfig jobConfig,
+ RestJobExecutionEnvironment restJobExecutionEnvironment) {
+ CoordinatorService coordinatorService =
seaTunnelServer.getCoordinatorService();
+ Data data =
+ textCommandService
+ .getNode()
+ .nodeEngine
+ .getSerializationService()
+ .toData(jobImmutableInformation);
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ coordinatorService.submitJob(
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
data);
+ voidPassiveCompletableFuture.join();
+
+ return restJobExecutionEnvironment.getJobId();
+ }
}