This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 59f60b9f73 [Hotfix][Zeta] Fix job can not restore when last checkpoint
failed (#6193)
59f60b9f73 is described below
commit 59f60b9f737ecbe40c7c477a07abd11810b6a186
Author: Eric <[email protected]>
AuthorDate: Wed Jan 17 18:38:45 2024 +0800
[Hotfix][Zeta] Fix job can not restore when last checkpoint failed (#6193)
---
.../MultiTableSinkAggregatedCommitter.java | 15 +++-
.../seatunnel/engine/e2e/JobExecutionIT.java | 27 +++++++
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 8 +-
.../resources/batch_last_checkpoint_error.conf | 91 ++++++++++++++++++++++
.../server/checkpoint/CheckpointCoordinator.java | 26 ++++++-
.../server/checkpoint/CheckpointManager.java | 10 +--
.../engine/server/dag/physical/SubPlan.java | 4 +-
7 files changed, 164 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
index cd86a44d92..31dd91f1ee 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -73,6 +73,7 @@ public class MultiTableSinkAggregatedCommitter
@Override
public List<MultiTableAggregatedCommitInfo> commit(
List<MultiTableAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
+ List<MultiTableAggregatedCommitInfo> errorList = new ArrayList<>();
for (String sinkIdentifier : aggCommitters.keySet()) {
SinkAggregatedCommitter<?, ?> sinkCommitter =
aggCommitters.get(sinkIdentifier);
if (sinkCommitter != null) {
@@ -85,10 +86,20 @@ public class MultiTableSinkAggregatedCommitter
.get(sinkIdentifier))
.filter(Objects::nonNull)
.collect(Collectors.toList());
- sinkCommitter.commit(commitInfo);
+ List errCommitList = sinkCommitter.commit(commitInfo);
+ if (errCommitList.size() == 0) {
+ continue;
+ }
+
+ for (int i = 0; i < errCommitList.size(); i++) {
+ if (errorList.size() < i + 1) {
+ errorList.add(i, new
MultiTableAggregatedCommitInfo(new HashMap<>()));
+ }
+ errorList.get(i).getCommitInfo().put(sinkIdentifier,
errCommitList.get(i));
+ }
}
}
- return new ArrayList<>();
+ return errorList;
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 26d021992a..0dc4d7ba12 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -201,4 +201,31 @@ public class JobExecutionIT {
hazelcastInstance.shutdown();
}
}
+
+ @Test
+ public void testLastCheckpointErrorJob() throws Exception {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("batch_last_checkpoint_error.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("batch_last_checkpoint_error");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ CompletableFuture<JobStatus> objectCompletableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ objectCompletableFuture.isDone()
+ && JobStatus.FAILED.equals(
+
objectCompletableFuture.get())));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index c3e9c55849..e64243be79 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -62,6 +62,8 @@ public class RestApiIT {
private static HazelcastInstanceImpl node2;
+ private static SeaTunnelClient engineClient;
+
private static final String jobName = "test测试";
private static final String paramJobName = "param_test测试";
@@ -80,7 +82,7 @@ public class RestApiIT {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig,
seaTunnelConfig);
@@ -456,6 +458,10 @@ public class RestApiIT {
@AfterEach
void afterClass() {
+ if (engineClient != null) {
+ engineClient.close();
+ }
+
if (node1 != null) {
node1.shutdown();
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
new file mode 100644
index 0000000000..84356210ea
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ parallelism = 1
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map<string, array<int>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path = "/hive/warehouse/test1"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ partition_by = ["c_string"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format_type = "text"
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ save_mode = "error"
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6a88f169fc..a27bed8102 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -470,18 +470,24 @@ public class CheckpointCoordinator {
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
entry.getValue().size()));
}
+ @SneakyThrows
public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
LOG.info(String.format("Start save point for Job (%s)", jobId));
if (!isAllTaskReady) {
- CompletableFuture savepointFuture = new CompletableFuture();
+ CompletableFuture<CompletedCheckpoint> savepointFuture = new
CompletableFuture<>();
savepointFuture.completeExceptionally(
new CheckpointException(
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
return new PassiveCompletableFuture<>(savepointFuture);
}
- CompletableFuture<PendingCheckpoint> savepoint =
- createPendingCheckpoint(Instant.now().toEpochMilli(),
SAVEPOINT_TYPE);
- startTriggerPendingCheckpoint(savepoint);
+ CompletableFuture<PendingCheckpoint> savepoint;
+ synchronized (lock) {
+ while (pendingCounter.get() > 0) {
+ Thread.sleep(500);
+ }
+ savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(),
SAVEPOINT_TYPE);
+ startTriggerPendingCheckpoint(savepoint);
+ }
PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
LOG.info(
String.format(
@@ -827,6 +833,18 @@ public class CheckpointCoordinator {
&& !latestCompletedCheckpoint.isRestored();
}
+ public boolean isNoErrorCompleted() {
+ if (latestCompletedCheckpoint == null) {
+ return false;
+ }
+ CheckpointCoordinatorStatus status =
+ (CheckpointCoordinatorStatus)
runningJobStateIMap.get(checkpointStateImapKey);
+ return
latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint()
+ && (status.equals(CheckpointCoordinatorStatus.FINISHED)
+ || status.equals(CheckpointCoordinatorStatus.SUSPEND))
+ && !latestCompletedCheckpoint.isRestored();
+ }
+
public boolean isEndOfSavePoint() {
if (latestCompletedCheckpoint == null) {
return false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 7f2b25b955..9ef2e6623b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -164,14 +164,6 @@ public class CheckpointManager {
.toArray(PassiveCompletableFuture[]::new);
}
- /**
- * Called by the JobMaster, actually triggered by the user. <br>
- * After the savepoint is triggered, it will cause the pipeline to stop
automatically.
- */
- public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int
pipelineId) {
- return getCheckpointCoordinator(pipelineId).startSavepoint();
- }
-
public void reportedPipelineRunning(int pipelineId, boolean
alreadyStarted) {
log.info(
"reported pipeline running stack: "
@@ -253,7 +245,7 @@ public class CheckpointManager {
* the pipeline has been completed;
*/
public boolean isCompletedPipeline(int pipelineId) {
- return getCheckpointCoordinator(pipelineId).isCompleted();
+ return getCheckpointCoordinator(pipelineId).isNoErrorCompleted();
}
/**
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index c1e7f975c4..a2623d5b2c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -560,7 +560,9 @@ public class SubPlan {
log.warn(
String.format(
"%s checkpoint have error, cancel the pipeline",
getPipelineFullName()));
- this.cancelPipeline();
+ if (!getPipelineState().isEndState()) {
+ updatePipelineState(PipelineStatus.CANCELING);
+ }
}
public void startSubPlanStateProcess() {