This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a09d3bbcfd [Fix][Rest-API] Submit or stop job from an inactive master
node (#6217)
a09d3bbcfd is described below
commit a09d3bbcfdeb2c13fa3fdb05f14b0b85f0b01f29
Author: Guangdong Liu <[email protected]>
AuthorDate: Wed Jan 17 22:54:02 2024 +0800
[Fix][Rest-API] Submit or stop job from an inactive master node (#6217)
---
.../engine/e2e/ClusterSeaTunnelContainer.java | 464 +++++++++++++++++++++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 330 ---------------
.../src/test/resources/cluster/hazelcast.yaml | 44 ++
.../server/rest/RestHttpGetCommandProcessor.java | 67 +--
.../server/rest/RestHttpPostCommandProcessor.java | 72 ++--
5 files changed, 589 insertions(+), 388 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
new file mode 100644
index 0000000000..a349a9f812
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import io.restassured.response.Response;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static io.restassured.RestAssured.given;
+import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ClusterSeaTunnelContainer extends SeaTunnelContainer {
+ private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+ private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+
+ private GenericContainer<?> secondServer;
+
+ private final Network NETWORK = Network.newNetwork();
+
+ private static final String jobName = "test测试";
+ private static final String paramJobName = "param_test测试";
+
+ private static final String http = "http://";
+
+ private static final String colon = ":";
+
+ private static final String confFile = "/fakesource_to_console.conf";
+
+ private static final Path binPath = Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL);
+ private static final Path config = Paths.get(SEATUNNEL_HOME, "config");
+ private static final Path hadoopJar =
+ Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
+
+ @Override
+ @BeforeEach
+ public void startUp() throws Exception {
+
+ server = createServer("server");
+ secondServer = createServer("secondServer");
+
+ // check cluster
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ Response response =
+ given().get(
+ http
+ + server.getHost()
+ + colon
+ +
server.getFirstMappedPort()
+ +
"/hazelcast/rest/cluster");
+ response.then().statusCode(200);
+ Assertions.assertEquals(
+ 2,
response.jsonPath().getList("members").size());
+ });
+ }
+
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (secondServer != null) {
+ secondServer.close();
+ }
+ }
+
+ @Test
+ public void testSubmitJob() {
+ AtomicInteger i = new AtomicInteger();
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Response response =
+ i.get() == 0
+ ? submitJob(container, "BATCH",
jobName, paramJobName)
+ : submitJob(container, "BATCH",
jobName, null);
+ if (i.get() == 0) {
+ response.then()
+ .statusCode(200)
+ .body("jobName",
equalTo(paramJobName));
+ } else {
+
response.then().statusCode(200).body("jobName", equalTo(jobName));
+ }
+ String jobId =
response.getBody().jsonPath().getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ given().get(
+ http
+ +
container.getHost()
+ + colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/FINISHED")
+ .then()
+ .statusCode(200)
+ .body(
+ "[" + i.get()
+ "].jobName",
+ equalTo(
+
i.get() == 0
+
? paramJobName
+
: jobName))
+ .body(
+ "[" + i.get()
+ "].errorMsg",
+ equalTo(null))
+ .body(
+ "[" + i.get()
+ "].jobDag.jobId",
+
equalTo(Long.parseLong(jobId)))
+ .body(
+ "["
+ +
i.get()
+ +
"].metrics.SourceReceivedCount",
+ equalTo("100"))
+ .body(
+ "["
+ +
i.get()
+ +
"].metrics.SinkWriteCount",
+ equalTo("100"))
+ .body(
+ "[" + i.get()
+ "].jobStatus",
+
equalTo("FINISHED"));
+
+ // test for without status
parameter.
+ given().get(
+ http
+ +
container.getHost()
+ + colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.FINISHED_JOBS_INFO)
+ .then()
+ .statusCode(200)
+ .body(
+ "[" + i.get()
+ "].jobName",
+ equalTo(
+
i.get() == 0
+
? paramJobName
+
: jobName))
+ .body(
+ "[" + i.get()
+ "].errorMsg",
+ equalTo(null))
+ .body(
+ "[" + i.get()
+ "].jobDag.jobId",
+
equalTo(Long.parseLong(jobId)))
+ .body(
+ "["
+ +
i.get()
+ +
"].metrics.SourceReceivedCount",
+ equalTo("100"))
+ .body(
+ "["
+ +
i.get()
+ +
"].metrics.SinkWriteCount",
+ equalTo("100"))
+ .body(
+ "[" + i.get()
+ "].jobStatus",
+
equalTo("FINISHED"));
+ });
+
+ i.getAndIncrement();
+ });
+ }
+
+ @Test
+ public void testStartWithSavePointWithoutJobId() {
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ Response response =
+ submitJob("BATCH", container, true,
jobName, paramJobName);
+ response.then()
+ .statusCode(400)
+ .body(
+ "message",
+ equalTo(
+ "Please provide jobId when
start with save point."));
+ });
+ }
+
+ @Test
+ public void testStopJob() {
+
+ Arrays.asList(server, secondServer)
+ .forEach(
+ container -> {
+ String jobId =
+ submitJob(container, "STREAMING", jobName,
paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+
+ String parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId
+ + ","
+ + "\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ +
container.getFirstMappedPort()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/SAVEPOINT_DONE")
+ .then()
+ .statusCode(200)
+ .body("[0].jobId",
equalTo(jobId)));
+
+ String jobId2 =
+ submitJob(container, "STREAMING", jobName,
paramJobName)
+ .getBody()
+ .jsonPath()
+ .getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.RUNNING_JOB_URL
+ +
"/"
+ +
jobId2)
+ .then()
+ .statusCode(200)
+ .body("jobStatus",
equalTo("RUNNING")));
+ parameters =
+ "{"
+ + "\"jobId\":"
+ + jobId2
+ + ","
+ + "\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ http
+ + container.getHost()
+ + colon
+ +
container.getFirstMappedPort()
+ +
RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ given().get(
+ http
+ +
container.getHost()
+ +
colon
+ +
container
+
.getFirstMappedPort()
+ +
RestConstant
+
.FINISHED_JOBS_INFO
+ +
"/CANCELED")
+ .then()
+ .statusCode(200)
+ .body("[0].jobId",
equalTo(jobId2)));
+ });
+ }
+
+ private Response submitJob(
+ GenericContainer<?> container, String jobMode, String jobName,
String paramJobName) {
+ return submitJob(jobMode, container, false, jobName, paramJobName);
+ }
+
+ private Response submitJob(
+ String jobMode,
+ GenericContainer<?> container,
+ boolean isStartWithSavePoint,
+ String jobName,
+ String paramJobName) {
+ String requestBody =
+ "{\n"
+ + " \"env\": {\n"
+ + " \"job.name\": \""
+ + jobName
+ + "\",\n"
+ + " \"job.mode\": \""
+ + jobMode
+ + "\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"FakeSource\",\n"
+ + " \"result_table_name\": \"fake\",\n"
+ + " \"row.num\": 100,\n"
+ + " \"schema\": {\n"
+ + " \"fields\": {\n"
+ + " \"name\": \"string\",\n"
+ + " \"age\": \"int\",\n"
+ + " \"card\": \"int\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"transform\": [\n"
+ + " ],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"Console\",\n"
+ + " \"source_table_name\": [\"fake\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ String parameters = null;
+ if (paramJobName != null) {
+ parameters = "jobName=" + paramJobName;
+ }
+ if (isStartWithSavePoint) {
+ parameters = parameters + "&isStartWithSavePoint=true";
+ }
+ Response response =
+ given().body(requestBody)
+ .header("Content-Type", "application/json;
charset=utf-8")
+ .post(
+ parameters == null
+ ? http
+ + container.getHost()
+ + colon
+ +
container.getFirstMappedPort()
+ + RestConstant.SUBMIT_JOB_URL
+ : http
+ + container.getHost()
+ + colon
+ +
container.getFirstMappedPort()
+ + RestConstant.SUBMIT_JOB_URL
+ + "?"
+ + parameters);
+ return response;
+ }
+
+ private GenericContainer<?> createServer(String networkAlias)
+ throws IOException, InterruptedException {
+ GenericContainer<?> server =
+ new GenericContainer<>(getDockerImage())
+ .withNetwork(NETWORK)
+ .withEnv("TZ", "UTC")
+
.withCommand(ContainerUtil.adaptPathForWin(binPath.toString()))
+ .withNetworkAliases(networkAlias)
+ .withExposedPorts()
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+ "seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
+ .waitingFor(Wait.forListeningPort());
+ copySeaTunnelStarterToContainer(server);
+ server.setExposedPorts(Collections.singletonList(5801));
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+ config.toString());
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"),
+ config.toString());
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+ hadoopJar.toString());
+ server.start();
+ // execute extra commands
+ executeExtraCommands(server);
+ ContainerUtil.copyConnectorJarToContainer(
+ server,
+ confFile,
+ getConnectorModulePath(),
+ getConnectorNamePrefix(),
+ getConnectorType(),
+ SEATUNNEL_HOME);
+
+ return server;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index e64243be79..2d2fc5d96e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -20,12 +20,10 @@ package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.engine.server.rest.RestConstant;
@@ -36,16 +34,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
-import io.restassured.response.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
@@ -64,9 +57,6 @@ public class RestApiIT {
private static SeaTunnelClient engineClient;
- private static final String jobName = "test测试";
- private static final String paramJobName = "param_test测试";
-
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
@@ -155,248 +145,6 @@ public class RestApiIT {
});
}
- @Test
- public void testSubmitJob() {
- AtomicInteger i = new AtomicInteger();
-
- Arrays.asList(node2, node1)
- .forEach(
- instance -> {
- Response response =
- i.get() == 0
- ? submitJob(instance, "BATCH",
jobName, paramJobName)
- : submitJob(instance, "BATCH",
jobName, null);
- if (i.get() == 0) {
- response.then()
- .statusCode(200)
- .body("jobName",
equalTo(paramJobName));
- } else {
-
response.then().statusCode(200).body("jobName", equalTo(jobName));
- }
- String jobId =
response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer = null;
-
- for (HazelcastInstance hazelcastInstance :
- Hazelcast.getAllHazelcastInstances()) {
- SeaTunnelServer server =
- (SeaTunnelServer)
- ((HazelcastInstanceProxy)
hazelcastInstance)
- .getOriginal()
- .node
- .getNodeExtension()
-
.createExtensionServices()
-
.get(Constant.SEATUNNEL_SERVICE_NAME);
-
- if (server.isMasterNode()) {
- seaTunnelServer = server;
- }
- }
-
- SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
-
finalSeaTunnelServer
-
.getCoordinatorService()
-
.getJobStatus(
-
Long.parseLong(
-
jobId))));
-
- given().get(
- HOST
- + instance.getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- +
RestConstant.FINISHED_JOBS_INFO
- + "/FINISHED")
- .then()
- .statusCode(200)
- .body(
- "[" + i.get() + "].jobName",
- equalTo(i.get() == 0 ?
paramJobName : jobName))
- .body("[" + i.get() + "].errorMsg",
equalTo(null))
- .body(
- "[" + i.get() + "].jobDag.jobId",
- equalTo(Long.parseLong(jobId)))
- .body(
- "[" + i.get() +
"].metrics.SourceReceivedCount",
- equalTo("100"))
- .body(
- "[" + i.get() +
"].metrics.SinkWriteCount",
- equalTo("100"))
- .body("[" + i.get() + "].jobStatus",
equalTo("FINISHED"));
-
- // test for without status parameter.
- given().get(
- HOST
- + instance.getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- +
RestConstant.FINISHED_JOBS_INFO)
- .then()
- .statusCode(200)
- .body(
- "[" + i.get() + "].jobName",
- equalTo(i.get() == 0 ?
paramJobName : jobName))
- .body("[" + i.get() + "].errorMsg",
equalTo(null))
- .body(
- "[" + i.get() + "].jobDag.jobId",
- equalTo(Long.parseLong(jobId)))
- .body(
- "[" + i.get() +
"].metrics.SourceReceivedCount",
- equalTo("100"))
- .body(
- "[" + i.get() +
"].metrics.SinkWriteCount",
- equalTo("100"))
- .body("[" + i.get() + "].jobStatus",
equalTo("FINISHED"));
- i.getAndIncrement();
- });
- }
-
- @Test
- public void testStopJob() {
-
- Arrays.asList(node2, node1)
- .forEach(
- instance -> {
- String jobId =
- submitJob(instance, "STREAMING", jobName,
paramJobName)
- .getBody()
- .jsonPath()
- .getString("jobId");
- SeaTunnelServer seaTunnelServer = null;
-
- for (HazelcastInstance hazelcastInstance :
- Hazelcast.getAllHazelcastInstances()) {
- SeaTunnelServer server =
- (SeaTunnelServer)
- ((HazelcastInstanceProxy)
hazelcastInstance)
- .getOriginal()
- .node
- .getNodeExtension()
-
.createExtensionServices()
-
.get(Constant.SEATUNNEL_SERVICE_NAME);
-
- if (server.isMasterNode()) {
- seaTunnelServer = server;
- }
- }
-
- SeaTunnelServer finalSeaTunnelServer =
seaTunnelServer;
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
-
finalSeaTunnelServer
-
.getCoordinatorService()
-
.getJobStatus(
-
Long.parseLong(
-
jobId))));
-
- String parameters =
- "{"
- + "\"jobId\":"
- + jobId
- + ","
- + "\"isStopWithSavePoint\":true}";
-
- given().body(parameters)
- .post(
- HOST
- + instance.getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- +
RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId));
-
- Awaitility.await()
- .atMost(6, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
-
JobStatus.SAVEPOINT_DONE,
-
finalSeaTunnelServer
-
.getCoordinatorService()
-
.getJobStatus(
-
Long.parseLong(
-
jobId))));
-
- String jobId2 =
- submitJob(instance, "STREAMING", jobName,
paramJobName)
- .getBody()
- .jsonPath()
- .getString("jobId");
-
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.RUNNING,
-
finalSeaTunnelServer
-
.getCoordinatorService()
-
.getJobStatus(
-
Long.parseLong(
-
jobId2))));
- parameters =
- "{"
- + "\"jobId\":"
- + jobId2
- + ","
- + "\"isStopWithSavePoint\":false}";
-
- given().body(parameters)
- .post(
- HOST
- + instance.getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- +
RestConstant.STOP_JOB_URL)
- .then()
- .statusCode(200)
- .body("jobId", equalTo(jobId2));
-
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.CANCELED,
-
finalSeaTunnelServer
-
.getCoordinatorService()
-
.getJobStatus(
-
Long.parseLong(
-
jobId2))));
- });
- }
-
- @Test
- public void testStartWithSavePointWithoutJobId() {
- Arrays.asList(node2, node1)
- .forEach(
- instance -> {
- Response response =
- submitJob("BATCH", instance, true,
jobName, paramJobName);
- response.then()
- .statusCode(400)
- .body(
- "message",
- equalTo(
- "Please provide jobId when
start with save point."));
- });
- }
-
@Test
public void testEncryptConfig() {
Arrays.asList(node2, node1)
@@ -469,82 +217,4 @@ public class RestApiIT {
node2.shutdown();
}
}
-
- private Response submitJob(
- HazelcastInstanceImpl hazelcastInstance,
- String jobMode,
- String jobName,
- String paramJobName) {
- return submitJob(jobMode, hazelcastInstance, false, jobName,
paramJobName);
- }
-
- private Response submitJob(
- String jobMode,
- HazelcastInstanceImpl hazelcastInstance,
- boolean isStartWithSavePoint,
- String jobName,
- String paramJobName) {
- String requestBody =
- "{\n"
- + " \"env\": {\n"
- + " \"job.name\": \""
- + jobName
- + "\",\n"
- + " \"job.mode\": \""
- + jobMode
- + "\"\n"
- + " },\n"
- + " \"source\": [\n"
- + " {\n"
- + " \"plugin_name\": \"FakeSource\",\n"
- + " \"result_table_name\": \"fake\",\n"
- + " \"row.num\": 100,\n"
- + " \"schema\": {\n"
- + " \"fields\": {\n"
- + " \"name\": \"string\",\n"
- + " \"age\": \"int\",\n"
- + " \"card\": \"int\"\n"
- + " }\n"
- + " }\n"
- + " }\n"
- + " ],\n"
- + " \"transform\": [\n"
- + " ],\n"
- + " \"sink\": [\n"
- + " {\n"
- + " \"plugin_name\": \"Console\",\n"
- + " \"source_table_name\": [\"fake\"]\n"
- + " }\n"
- + " ]\n"
- + "}";
- String parameters = null;
- if (paramJobName != null) {
- parameters = "jobName=" + paramJobName;
- }
- if (isStartWithSavePoint) {
- parameters = parameters + "&isStartWithSavePoint=true";
- }
- Response response =
- given().body(requestBody)
- .header("Content-Type", "application/json;
charset=utf-8")
- .post(
- parameters == null
- ? HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SUBMIT_JOB_URL
- : HOST
- + hazelcastInstance
- .getCluster()
- .getLocalMember()
- .getAddress()
- .getPort()
- + RestConstant.SUBMIT_JOB_URL
- + "?"
- + parameters);
- return response;
- }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
new file mode 100644
index 0000000000..45879e2fd6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/hazelcast.yaml
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+hazelcast:
+ cluster-name: seatunnel
+ network:
+ rest-api:
+ enabled: true
+ endpoint-groups:
+ CLUSTER_WRITE:
+ enabled: true
+ DATA:
+ enabled: true
+ join:
+ tcp-ip:
+ enabled: true
+ member-list:
+ - secondServer
+ - server
+ port:
+ auto-increment: true
+ port-count: 100
+ port: 5801
+ properties:
+ hazelcast.invocation.max.retry.count: 100
+ hazelcast.invocation.retry.pause.millis: 1000
+ hazelcast.tcp.join.port.try.count: 30
+ hazelcast.slow.operation.detector.stacktrace.logging.enabled: true
+ hazelcast.logging.type: log4j2
+ hazelcast.operation.generic.thread.count: 200
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 370dd71953..6d9619abbf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -34,14 +34,13 @@ import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Cluster;
import com.hazelcast.cluster.Member;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
@@ -224,11 +223,24 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
.map(
jobState -> {
Long jobId = jobState.getJobId();
- String jobMetrics =
- getSeaTunnelServer()
- .getCoordinatorService()
- .getJobMetrics(jobId)
- .toJsonString();
+ SeaTunnelServer seaTunnelServer =
getSeaTunnelServer();
+ String jobMetrics;
+ if (seaTunnelServer == null) {
+ jobMetrics =
+ (String)
+
NodeEngineUtil.sendOperationToMasterNode(
+
getNode().nodeEngine,
+ new
GetJobMetricsOperation(
+
jobId))
+ .join();
+ } else {
+ jobMetrics =
+ seaTunnelServer
+
.getCoordinatorService()
+ .getJobMetrics(jobId)
+ .toJsonString();
+ }
+
JobDAGInfo jobDAGInfo =
finishedJobDAGInfo.get(jobId);
return convertToJson(
@@ -293,20 +305,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
if (!seaTunnelServer.isMasterNode()) {
- for (HazelcastInstance hazelcastInstance :
Hazelcast.getAllHazelcastInstances()) {
- SeaTunnelServer seaTunnelServer1 =
- (SeaTunnelServer)
- ((HazelcastInstanceProxy) hazelcastInstance)
- .getOriginal()
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
-
- if (seaTunnelServer1.isMasterNode()) {
- return seaTunnelServer1;
- }
- }
+ return null;
}
return seaTunnelServer;
}
@@ -334,9 +333,27 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
classLoader,
jobImmutableInformation.getLogicalDag());
- String jobMetrics =
-
getSeaTunnelServer().getCoordinatorService().getJobMetrics(jobId).toJsonString();
- JobStatus jobStatus =
getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+ String jobMetrics;
+ JobStatus jobStatus;
+ if (seaTunnelServer == null) {
+ jobMetrics =
+ (String)
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine, new
GetJobMetricsOperation(jobId))
+ .join();
+ jobStatus =
+ JobStatus.values()[
+ (int)
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine,
+ new
GetJobStatusOperation(jobId))
+ .join()];
+ } else {
+ jobMetrics =
+
seaTunnelServer.getCoordinatorService().getJobMetrics(jobId).toJsonString();
+ jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
+ }
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index 465942c6f4..6f09e0aadf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -32,13 +32,14 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor;
+import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
+import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import org.apache.seatunnel.engine.server.utils.RestUtil;
import org.apache.commons.lang.StringUtils;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.instance.impl.HazelcastInstanceProxy;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpPostCommand;
@@ -102,20 +103,7 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
if (!seaTunnelServer.isMasterNode()) {
- for (HazelcastInstance hazelcastInstance :
Hazelcast.getAllHazelcastInstances()) {
- seaTunnelServer =
- (SeaTunnelServer)
- ((HazelcastInstanceProxy) hazelcastInstance)
- .getOriginal()
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
-
- if (seaTunnelServer.isMasterNode()) {
- return seaTunnelServer;
- }
- }
+ return null;
}
return seaTunnelServer;
}
@@ -146,14 +134,21 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
?
Long.parseLong(requestParams.get(RestConstant.JOB_ID))
: null);
JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
-
+ Long jobId = jobImmutableInformation.getJobId();
SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
- Long jobId =
- submitJob(
- seaTunnelServer,
- jobImmutableInformation,
- jobConfig,
- restJobExecutionEnvironment);
+ if (seaTunnelServer == null) {
+
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine,
+ new SubmitJobOperation(
+ jobImmutableInformation.getJobId(),
+
getNode().nodeEngine.toData(jobImmutableInformation)))
+ .join();
+
+ } else {
+
+ submitJob(seaTunnelServer, jobImmutableInformation, jobConfig);
+ }
this.prepareResponse(
httpPostCommand,
@@ -174,12 +169,26 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
}
- CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
+ SeaTunnelServer seaTunnelServer = getSeaTunnelServer();
+ if (seaTunnelServer == null) {
+ if (isStopWithSavePoint) {
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine, new
SavePointJobOperation(jobId))
+ .join();
+ } else {
+ NodeEngineUtil.sendOperationToMasterNode(
+ getNode().nodeEngine, new
CancelJobOperation(jobId))
+ .join();
+ }
- if (isStopWithSavePoint) {
- coordinatorService.savePoint(jobId);
} else {
- coordinatorService.cancelJob(jobId);
+ CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
+
+ if (isStopWithSavePoint) {
+ coordinatorService.savePoint(jobId);
+ } else {
+ coordinatorService.cancelJob(jobId);
+ }
}
this.prepareResponse(
@@ -215,11 +224,10 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
return requestBodyJsonNode;
}
- private Long submitJob(
+ private void submitJob(
SeaTunnelServer seaTunnelServer,
JobImmutableInformation jobImmutableInformation,
- JobConfig jobConfig,
- RestJobExecutionEnvironment restJobExecutionEnvironment) {
+ JobConfig jobConfig) {
CoordinatorService coordinatorService =
seaTunnelServer.getCoordinatorService();
Data data =
textCommandService
@@ -231,7 +239,5 @@ public class RestHttpPostCommandProcessor extends
HttpCommandProcessor<HttpPostC
coordinatorService.submitJob(
Long.parseLong(jobConfig.getJobContext().getJobId()),
data);
voidPassiveCompletableFuture.join();
-
- return restJobExecutionEnvironment.getJobId();
}
}