This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 59f60b9f73 [Hotfix][Zeta] Fix job can not restore when last checkpoint 
failed (#6193)
59f60b9f73 is described below

commit 59f60b9f737ecbe40c7c477a07abd11810b6a186
Author: Eric <[email protected]>
AuthorDate: Wed Jan 17 18:38:45 2024 +0800

    [Hotfix][Zeta] Fix job can not restore when last checkpoint failed (#6193)
---
 .../MultiTableSinkAggregatedCommitter.java         | 15 +++-
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 27 +++++++
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java |  8 +-
 .../resources/batch_last_checkpoint_error.conf     | 91 ++++++++++++++++++++++
 .../server/checkpoint/CheckpointCoordinator.java   | 26 ++++++-
 .../server/checkpoint/CheckpointManager.java       | 10 +--
 .../engine/server/dag/physical/SubPlan.java        |  4 +-
 7 files changed, 164 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
index cd86a44d92..31dd91f1ee 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -73,6 +73,7 @@ public class MultiTableSinkAggregatedCommitter
     @Override
     public List<MultiTableAggregatedCommitInfo> commit(
             List<MultiTableAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
+        List<MultiTableAggregatedCommitInfo> errorList = new ArrayList<>();
         for (String sinkIdentifier : aggCommitters.keySet()) {
             SinkAggregatedCommitter<?, ?> sinkCommitter = 
aggCommitters.get(sinkIdentifier);
             if (sinkCommitter != null) {
@@ -85,10 +86,20 @@ public class MultiTableSinkAggregatedCommitter
                                                         .get(sinkIdentifier))
                                 .filter(Objects::nonNull)
                                 .collect(Collectors.toList());
-                sinkCommitter.commit(commitInfo);
+                List errCommitList = sinkCommitter.commit(commitInfo);
+                if (errCommitList.size() == 0) {
+                    continue;
+                }
+
+                for (int i = 0; i < errCommitList.size(); i++) {
+                    if (errorList.size() < i + 1) {
+                        errorList.add(i, new 
MultiTableAggregatedCommitInfo(new HashMap<>()));
+                    }
+                    errorList.get(i).getCommitInfo().put(sinkIdentifier, 
errCommitList.get(i));
+                }
             }
         }
-        return new ArrayList<>();
+        return errorList;
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 26d021992a..0dc4d7ba12 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -201,4 +201,31 @@ public class JobExecutionIT {
             hazelcastInstance.shutdown();
         }
     }
+
+    @Test
+    public void testLastCheckpointErrorJob() throws Exception {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("batch_last_checkpoint_error.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("batch_last_checkpoint_error");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        ClientJobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(filePath, jobConfig, 
SEATUNNEL_CONFIG);
+
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+        CompletableFuture<JobStatus> objectCompletableFuture =
+                
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+
+        await().atMost(600000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        objectCompletableFuture.isDone()
+                                                && JobStatus.FAILED.equals(
+                                                        
objectCompletableFuture.get())));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index c3e9c55849..e64243be79 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -62,6 +62,8 @@ public class RestApiIT {
 
     private static HazelcastInstanceImpl node2;
 
+    private static SeaTunnelClient engineClient;
+
     private static final String jobName = "test测试";
     private static final String paramJobName = "param_test测试";
 
@@ -80,7 +82,7 @@ public class RestApiIT {
 
         ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
         clientConfig.setClusterName(testClusterName);
-        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        engineClient = new SeaTunnelClient(clientConfig);
         ClientJobExecutionEnvironment jobExecutionEnv =
                 engineClient.createExecutionContext(filePath, jobConfig, 
seaTunnelConfig);
 
@@ -456,6 +458,10 @@ public class RestApiIT {
 
     @AfterEach
     void afterClass() {
+        if (engineClient != null) {
+            engineClient.close();
+        }
+
         if (node1 != null) {
             node1.shutdown();
         }
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
new file mode 100644
index 0000000000..84356210ea
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    parallelism = 1
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, array<int>>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, map<string, string>>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_decimal = "decimal(30, 8)"
+          c_null = "null"
+          c_bytes = bytes
+          c_date = date
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/hive/warehouse/test1"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["c_string"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    save_mode = "error"
+  }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 6a88f169fc..a27bed8102 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -470,18 +470,24 @@ public class CheckpointCoordinator {
                 .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().size()));
     }
 
+    @SneakyThrows
     public PassiveCompletableFuture<CompletedCheckpoint> startSavepoint() {
         LOG.info(String.format("Start save point for Job (%s)", jobId));
         if (!isAllTaskReady) {
-            CompletableFuture savepointFuture = new CompletableFuture();
+            CompletableFuture<CompletedCheckpoint> savepointFuture = new 
CompletableFuture<>();
             savepointFuture.completeExceptionally(
                     new CheckpointException(
                             
CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT));
             return new PassiveCompletableFuture<>(savepointFuture);
         }
-        CompletableFuture<PendingCheckpoint> savepoint =
-                createPendingCheckpoint(Instant.now().toEpochMilli(), 
SAVEPOINT_TYPE);
-        startTriggerPendingCheckpoint(savepoint);
+        CompletableFuture<PendingCheckpoint> savepoint;
+        synchronized (lock) {
+            while (pendingCounter.get() > 0) {
+                Thread.sleep(500);
+            }
+            savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), 
SAVEPOINT_TYPE);
+            startTriggerPendingCheckpoint(savepoint);
+        }
         PendingCheckpoint savepointPendingCheckpoint = savepoint.join();
         LOG.info(
                 String.format(
@@ -827,6 +833,18 @@ public class CheckpointCoordinator {
                 && !latestCompletedCheckpoint.isRestored();
     }
 
+    public boolean isNoErrorCompleted() {
+        if (latestCompletedCheckpoint == null) {
+            return false;
+        }
+        CheckpointCoordinatorStatus status =
+                (CheckpointCoordinatorStatus) 
runningJobStateIMap.get(checkpointStateImapKey);
+        return 
latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint()
+                && (status.equals(CheckpointCoordinatorStatus.FINISHED)
+                        || status.equals(CheckpointCoordinatorStatus.SUSPEND))
+                && !latestCompletedCheckpoint.isRestored();
+    }
+
     public boolean isEndOfSavePoint() {
         if (latestCompletedCheckpoint == null) {
             return false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 7f2b25b955..9ef2e6623b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -164,14 +164,6 @@ public class CheckpointManager {
                 .toArray(PassiveCompletableFuture[]::new);
     }
 
-    /**
-     * Called by the JobMaster, actually triggered by the user. <br>
-     * After the savepoint is triggered, it will cause the pipeline to stop 
automatically.
-     */
-    public PassiveCompletableFuture<CompletedCheckpoint> triggerSavepoint(int 
pipelineId) {
-        return getCheckpointCoordinator(pipelineId).startSavepoint();
-    }
-
     public void reportedPipelineRunning(int pipelineId, boolean 
alreadyStarted) {
         log.info(
                 "reported pipeline running stack: "
@@ -253,7 +245,7 @@ public class CheckpointManager {
      * the pipeline has been completed;
      */
     public boolean isCompletedPipeline(int pipelineId) {
-        return getCheckpointCoordinator(pipelineId).isCompleted();
+        return getCheckpointCoordinator(pipelineId).isNoErrorCompleted();
     }
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index c1e7f975c4..a2623d5b2c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -560,7 +560,9 @@ public class SubPlan {
         log.warn(
                 String.format(
                         "%s checkpoint have error, cancel the pipeline", 
getPipelineFullName()));
-        this.cancelPipeline();
+        if (!getPipelineState().isEndState()) {
+            updatePipelineState(PipelineStatus.CANCELING);
+        }
     }
 
     public void startSubPlanStateProcess() {

Reply via email to