This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d66e4f2a2f [Fix] [Zeta] Fix REST submit-job and start-with-savepoint
on worker and return 400 when checkpoint missing (#10509)
d66e4f2a2f is described below
commit d66e4f2a2f2d78a239dbc673da3cca98ede11a14
Author: yzeng1618 <[email protected]>
AuthorDate: Mon Mar 2 22:51:15 2026 +0800
[Fix] [Zeta] Fix REST submit-job and start-with-savepoint on worker and
return 400 when checkpoint missing (#10509)
Co-authored-by: zengyi <[email protected]>
---
.../server/rest/RestJobExecutionEnvironment.java | 38 +-
.../RestApiSubmitJobStartWithSavePointTest.java | 449 +++++++++++++++++++++
2 files changed, 483 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
index 76b0487c14..54977563ef 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestJobExecutionEnvironment.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobPipelineCheckpointData;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.operation.GetJobCheckpointOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -99,10 +101,13 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
List<JobPipelineCheckpointData> pipelineCheckpoints =
Collections.emptyList();
if (isStartWithSavePoint) {
LOGGER.info("Start with savepoint, get checkpoint state from
server");
- pipelineCheckpoints =
- seaTunnelServer
- .getCheckpointService()
-
.getLatestCheckpointData(jobConfig.getJobContext().getJobId());
+ pipelineCheckpoints = loadPipelineCheckpointsFromMasterNode();
+ if (pipelineCheckpoints == null || pipelineCheckpoints.isEmpty()) {
+ throw new IllegalArgumentException(
+ "No checkpoint found for jobId="
+ + jobConfig.getJobContext().getJobId()
+ + ", cannot start with save point.");
+ }
}
return new MultipleTableJobConfigParser(
seaTunnelJobConfig,
@@ -113,6 +118,31 @@ public class RestJobExecutionEnvironment extends
AbstractJobEnvironment {
pipelineCheckpoints);
}
+ private List<JobPipelineCheckpointData>
loadPipelineCheckpointsFromMasterNode() {
+ if (seaTunnelServer.isMasterNode() &&
seaTunnelServer.getCheckpointService() != null) {
+ return seaTunnelServer
+ .getCheckpointService()
+
.getLatestCheckpointData(jobConfig.getJobContext().getJobId());
+ }
+
+ try {
+ Object response =
+ NodeEngineUtil.sendOperationToMasterNode(
+ nodeEngine, new
GetJobCheckpointOperation(jobId))
+ .join();
+ if (response == null) {
+ return Collections.emptyList();
+ }
+ return (List<JobPipelineCheckpointData>)
+ nodeEngine.getSerializationService().toObject(response);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Failed to get checkpoint data from master node, jobId="
+ + jobConfig.getJobContext().getJobId(),
+ e);
+ }
+ }
+
public JobImmutableInformation build() {
return new JobImmutableInformation(
Long.parseLong(jobConfig.getJobContext().getJobId()),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiSubmitJobStartWithSavePointTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiSubmitJobStartWithSavePointTest.java
new file mode 100644
index 0000000000..4a36a4812a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiSubmitJobStartWithSavePointTest.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.shade.org.eclipse.jetty.server.Connector;
+import org.apache.seatunnel.shade.org.eclipse.jetty.server.ServerConnector;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import
org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.HttpConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+import org.apache.seatunnel.engine.server.checkpoint.ActionState;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
+import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
+import org.apache.seatunnel.engine.server.checkpoint.CompletedCheckpoint;
+import org.apache.seatunnel.engine.server.utils.RestUtil;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class RestApiSubmitJobStartWithSavePointTest {
+
+ private static final String SOURCE_FACTORY_ID = "FakeSource";
+ private static final String TEST_JOB_NAME = "test";
+
+ private HazelcastInstanceImpl masterInstance;
+ private HazelcastInstanceImpl workerInstance;
+ private SeaTunnelServer masterServer;
+ private SeaTunnelServer workerServer;
+ private Path checkpointDir;
+ private int workerRestPort;
+
+ @BeforeAll
+ public void setUp() throws Exception {
+ String clusterName =
+ TestUtils.getClusterName(
+ "RestApiSubmitJobStartWithSavePointTest_" +
System.nanoTime());
+ checkpointDir = Files.createTempDirectory(clusterName +
"_checkpoint_");
+
+ SeaTunnelConfig masterConfig = createSeaTunnelConfig(clusterName,
20000, false);
+ SeaTunnelConfig workerConfig = createSeaTunnelConfig(clusterName,
23000, true);
+
+ masterInstance =
SeaTunnelServerStarter.createMasterHazelcastInstance(masterConfig);
+ workerInstance =
SeaTunnelServerStarter.createWorkerHazelcastInstance(workerConfig);
+
+ masterServer =
masterInstance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+ workerServer =
workerInstance.node.nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertEquals(
+ 2,
masterInstance.getCluster().getMembers().size());
+ Assertions.assertEquals(
+ 2,
workerInstance.getCluster().getMembers().size());
+ });
+
+ workerRestPort = getHttpPort(workerServer);
+ awaitRestReady(workerRestPort);
+ }
+
+ @AfterAll
+ public void tearDown() {
+ try {
+ if (workerServer != null) {
+ workerServer.shutdown(true);
+ }
+ if (masterServer != null) {
+ masterServer.shutdown(true);
+ }
+ if (workerInstance != null) {
+ workerInstance.shutdown();
+ }
+ if (masterInstance != null) {
+ masterInstance.shutdown();
+ }
+
+ if (checkpointDir != null) {
+ FileUtils.deleteFile(checkpointDir.toString());
+ }
+
+ Path logPath = Paths.get("logs");
+ FileUtils.deleteFile(logPath.toString());
+ } catch (Exception e) {
+ // Best-effort cleanup; avoid masking test assertion failures.
+ System.err.println(ExceptionUtils.getMessage(e));
+ }
+ }
+
+ @Test
+ public void
testSubmitJobStartWithSavePointNoCheckpointOnWorkerReturns400() throws
Exception {
+ long jobId = System.currentTimeMillis();
+ String requestUrl =
+ "http://localhost:"
+ + workerRestPort
+ + "/submit-job?format=json&jobId="
+ + jobId
+ + "&jobName="
+ + TEST_JOB_NAME
+ + "&isStartWithSavePoint=true";
+
+ HttpResponse response = postJson(requestUrl, getRequestBody());
+ Assertions.assertEquals(400, response.code, () -> "responseBody=" +
response.body);
+ Assertions.assertTrue(response.body.contains("\"status\":\"fail\""));
+ Assertions.assertTrue(response.body.contains("No checkpoint found for
jobId=" + jobId));
+ }
+
+ @Test
+ public void testBuildJobStartWithSavePointOnWorkerWhenCheckpointExists()
throws Exception {
+ Assertions.assertNotNull(masterServer);
+ Assertions.assertNotNull(masterServer.getCheckpointService());
+ Assertions.assertNotNull(workerServer);
+ Assertions.assertNull(workerServer.getCheckpointService());
+
+ long jobId = System.currentTimeMillis();
+ storeFakeSourceCheckpoint(jobId);
+
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(TEST_JOB_NAME);
+ org.apache.seatunnel.shade.com.typesafe.config.Config
seaTunnelJobConfig =
+ buildSeaTunnelJobConfigFromJsonRequest();
+
+ RestJobExecutionEnvironment restJobExecutionEnvironment =
+ new RestJobExecutionEnvironment(
+ workerServer,
+ jobConfig,
+ seaTunnelJobConfig,
+ workerInstance.node,
+ true,
+ jobId);
+ JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
+ Assertions.assertEquals(jobId, jobImmutableInformation.getJobId());
+ Assertions.assertTrue(jobImmutableInformation.isStartWithSavePoint());
+ }
+
+ @Test
+ public void testBuildJobStartWithSavePointOnMasterWhenCheckpointExists()
throws Exception {
+ Assertions.assertNotNull(masterServer);
+ Assertions.assertNotNull(masterServer.getCheckpointService());
+
+ long jobId = System.currentTimeMillis();
+ storeFakeSourceCheckpoint(jobId);
+
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(TEST_JOB_NAME);
+ org.apache.seatunnel.shade.com.typesafe.config.Config
seaTunnelJobConfig =
+ buildSeaTunnelJobConfigFromJsonRequest();
+
+ RestJobExecutionEnvironment restJobExecutionEnvironment =
+ new RestJobExecutionEnvironment(
+ masterServer,
+ jobConfig,
+ seaTunnelJobConfig,
+ masterInstance.node,
+ true,
+ jobId);
+ JobImmutableInformation jobImmutableInformation =
restJobExecutionEnvironment.build();
+ Assertions.assertEquals(jobId, jobImmutableInformation.getJobId());
+ Assertions.assertTrue(jobImmutableInformation.isStartWithSavePoint());
+ }
+
+ private int getHttpPort(SeaTunnelServer seaTunnelServer) throws Exception {
+ Field jettyServiceField =
SeaTunnelServer.class.getDeclaredField("jettyService");
+ jettyServiceField.setAccessible(true);
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(() -> jettyServiceField.get(seaTunnelServer) != null);
+ Object jettyService = jettyServiceField.get(seaTunnelServer);
+
+ Field serverField = jettyService.getClass().getDeclaredField("server");
+ serverField.setAccessible(true);
+ org.apache.seatunnel.shade.org.eclipse.jetty.server.Server server =
+ (org.apache.seatunnel.shade.org.eclipse.jetty.server.Server)
+ serverField.get(jettyService);
+
+ return Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ for (Connector connector : server.getConnectors())
{
+ if (connector instanceof ServerConnector) {
+ int port = ((ServerConnector)
connector).getLocalPort();
+ if (port > 0) {
+ return port;
+ }
+ }
+ }
+ return -1;
+ },
+ port -> port > 0);
+ }
+
+ private void storeFakeSourceCheckpoint(long jobId) throws Exception {
+ Assertions.assertNotNull(masterServer);
+ Assertions.assertNotNull(masterServer.getCheckpointService());
+
+ String sourceActionName = JobConfigParser.createSourceActionName(0,
SOURCE_FACTORY_ID);
+ ActionStateKey actionStateKey = new ActionStateKey("ActionStateKey - "
+ sourceActionName);
+
+ ActionState actionState = new ActionState(actionStateKey, 1);
+ actionState.reportState(
+ -1,
+ new ActionSubtaskState(
+ actionStateKey,
+ -1,
+
Collections.singletonList("coordinator".getBytes(StandardCharsets.UTF_8))));
+ actionState.reportState(
+ 0, new ActionSubtaskState(actionStateKey, 0,
Collections.emptyList()));
+
+ Map<ActionStateKey, ActionState> taskStates = new HashMap<>();
+ taskStates.put(actionStateKey, actionState);
+
+ long checkpointId = 1L;
+ int pipelineId = 1;
+ long now = System.currentTimeMillis();
+ CompletedCheckpoint completedCheckpoint =
+ new CompletedCheckpoint(
+ jobId,
+ pipelineId,
+ checkpointId,
+ now,
+ CheckpointType.SAVEPOINT_TYPE,
+ now,
+ taskStates,
+ Collections.emptyMap());
+
+ ProtoStuffSerializer serializer = new ProtoStuffSerializer();
+ byte[] checkpointBytes = serializer.serialize(completedCheckpoint);
+
+ PipelineState pipelineState =
+ PipelineState.builder()
+ .jobId(String.valueOf(jobId))
+ .pipelineId(pipelineId)
+ .checkpointId(checkpointId)
+ .states(checkpointBytes)
+ .build();
+
+
masterServer.getCheckpointService().getCheckpointStorage().storeCheckPoint(pipelineState);
+ }
+
+ private org.apache.seatunnel.shade.com.typesafe.config.Config
+ buildSeaTunnelJobConfigFromJsonRequest() throws IOException {
+ return RestUtil.buildConfig(
+
RestUtil.convertByteToJsonNode(getRequestBody().getBytes(StandardCharsets.UTF_8)),
+ false);
+ }
+
+ private String getRequestBody() {
+ return "{\n"
+ + " \"env\": {\n"
+ + " \"job.mode\": \"BATCH\",\n"
+ + " \"job.name\": \"rest_api_test\"\n"
+ + " },\n"
+ + " \"source\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"FakeSource\",\n"
+ + " \"plugin_output\": \"fake\",\n"
+ + " \"row.num\": 1,\n"
+ + " \"schema\": {\n"
+ + " \"fields\": {\n"
+ + " \"name\": \"string\"\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " ],\n"
+ + " \"transform\": [],\n"
+ + " \"sink\": [\n"
+ + " {\n"
+ + " \"plugin_name\": \"Console\",\n"
+ + " \"plugin_input\": [\"fake\"]\n"
+ + " }\n"
+ + " ]\n"
+ + "}\n";
+ }
+
+ private SeaTunnelConfig createSeaTunnelConfig(
+ String clusterName, int httpPort, boolean enableRest) {
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+ hazelcastConfig.setClusterName(clusterName);
+
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
+
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ httpConfig.setEnabled(enableRest);
+ httpConfig.setEnableHttps(false);
+ if (enableRest) {
+ httpConfig.setPort(httpPort);
+ httpConfig.setEnableDynamicPort(true);
+ httpConfig.setPortRange(2000);
+ }
+
+ if (checkpointDir != null) {
+ seaTunnelConfig
+ .getEngineConfig()
+ .getCheckpointConfig()
+ .getStorage()
+ .setStorage("localfile");
+ seaTunnelConfig
+ .getEngineConfig()
+ .getCheckpointConfig()
+ .getStorage()
+ .getStoragePluginConfig()
+ .put(StorageConstants.STORAGE_NAME_SPACE,
checkpointDir.toString());
+ }
+ return seaTunnelConfig;
+ }
+
+ private void awaitRestReady(int port) {
+ Awaitility.await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .until(
+ () -> {
+ try {
+ HttpURLConnection conn =
+ (HttpURLConnection)
+ new URL("http://localhost:" +
port + "/overview")
+ .openConnection();
+ conn.setRequestMethod("GET");
+ conn.setConnectTimeout(2000);
+ conn.setReadTimeout(2000);
+ int code = conn.getResponseCode();
+ conn.disconnect();
+ return code == 200;
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ }
+
+ private HttpResponse postJson(String requestUrl, String body) throws
IOException {
+ HttpURLConnection conn = (HttpURLConnection) new
URL(requestUrl).openConnection();
+ conn.setRequestMethod("POST");
+ conn.setRequestProperty("Content-Type", "application/json;
charset=UTF-8");
+ conn.setConnectTimeout(5000);
+ conn.setReadTimeout(30000);
+ conn.setDoOutput(true);
+ try (OutputStream os = conn.getOutputStream()) {
+ os.write(body.getBytes(StandardCharsets.UTF_8));
+ }
+
+ int code = conn.getResponseCode();
+ try (BufferedReader in =
+ new BufferedReader(
+ new InputStreamReader(
+ code >= 200 && code < 300
+ ? conn.getInputStream()
+ : conn.getErrorStream(),
+ StandardCharsets.UTF_8))) {
+ String responseBody = in.lines().collect(Collectors.joining());
+ return new HttpResponse(code, responseBody);
+ } finally {
+ conn.disconnect();
+ }
+ }
+
+ private static String getHazelcastConfig() {
+ return "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n";
+ }
+
+ private static class HttpResponse {
+ private final int code;
+ private final String body;
+
+ private HttpResponse(int code, String body) {
+ this.code = code;
+ this.body = body;
+ }
+ }
+}