This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 57e5627e89 [Hotfix][Zeta] Fix that batch tasks cannot be stopped after 
retrying (#7083)
57e5627e89 is described below

commit 57e5627e89e870f26c1b0774cc225eaabe7f3c9a
Author: hailin0 <[email protected]>
AuthorDate: Wed Jul 10 11:50:14 2024 +0800

    [Hotfix][Zeta] Fix that batch tasks cannot be stopped after retrying (#7083)
---
 .../sink/inmemory/InMemoryAggregatedCommitter.java |  9 ++++
 .../seatunnel/e2e/sink/inmemory/InMemorySink.java  |  2 +-
 .../e2e/sink/inmemory/InMemorySinkFactory.java     |  9 +++-
 .../server/checkpoint/CheckpointCoordinator.java   |  3 ++
 .../server/checkpoint/CompletedCheckpoint.java     |  2 +-
 .../checkpoint/CheckpointErrorRestoreEndTest.java  | 63 ++++++++++++++++++++++
 ...h_fakesource_to_inmemory_with_commit_error.conf | 52 ++++++++++++++++++
 7 files changed, 137 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
index adc4c1cf2f..e63175c4f6 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.e2e.sink.inmemory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
@@ -32,6 +33,11 @@ public class InMemoryAggregatedCommitter
     private static final List<String> events = new ArrayList<>();
     private static final List<InMemoryMultiTableResourceManager> 
resourceManagers =
             new ArrayList<>();
+    private ReadonlyConfig config;
+
+    public InMemoryAggregatedCommitter(ReadonlyConfig config) {
+        this.config = config;
+    }
 
     public static List<String> getEvents() {
         return events;
@@ -62,6 +68,9 @@ public class InMemoryAggregatedCommitter
     @Override
     public List<InMemoryAggregatedCommitInfo> commit(
             List<InMemoryAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
+        if (config.get(InMemorySinkFactory.THROW_EXCEPTION_OF_COMMITTER)) {
+            throw new IOException("commit failed");
+        }
         return new ArrayList<>();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 8f1eba9af4..9e3852fb3c 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -68,7 +68,7 @@ public class InMemorySink
     @Override
     public Optional<SinkAggregatedCommitter<InMemoryCommitInfo, 
InMemoryAggregatedCommitInfo>>
             createAggregatedCommitter() throws IOException {
-        return Optional.of(new InMemoryAggregatedCommitter());
+        return Optional.of(new InMemoryAggregatedCommitter(config));
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 7b06ec99d9..1ab973652f 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -41,6 +41,9 @@ public class InMemorySinkFactory
     public static final Option<Boolean> CHECKPOINT_SLEEP =
             Options.key("checkpoint_sleep").booleanType().defaultValue(false);
 
+    public static final Option<Boolean> THROW_EXCEPTION_OF_COMMITTER =
+            
Options.key("throw_exception_of_committer").booleanType().defaultValue(false);
+
     @Override
     public String factoryIdentifier() {
         return "InMemory";
@@ -49,7 +52,11 @@ public class InMemorySinkFactory
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY, 
CHECKPOINT_SLEEP)
+                .optional(
+                        THROW_EXCEPTION,
+                        THROW_OUT_OF_MEMORY,
+                        CHECKPOINT_SLEEP,
+                        THROW_EXCEPTION_OF_COMMITTER)
                 .build();
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 2b7498f485..8735048eac 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -293,6 +293,9 @@ public class CheckpointCoordinator {
     private void restoreTaskState(TaskLocation taskLocation) {
         List<ActionSubtaskState> states = new ArrayList<>();
         if (latestCompletedCheckpoint != null) {
+            if (!latestCompletedCheckpoint.isRestored()) {
+                latestCompletedCheckpoint.setRestored(true);
+            }
             final Integer currentParallelism = 
pipelineTasks.get(taskLocation.getTaskVertexId());
             plan.getSubtaskActions()
                     .get(taskLocation)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 74be795205..7865b9c4dc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -46,7 +46,7 @@ public class CompletedCheckpoint implements Checkpoint, 
Serializable {
 
     private final Map<Long, TaskStatistics> taskStatistics;
 
-    @Getter @Setter private boolean isRestored = false;
+    @Getter @Setter private volatile boolean isRestored = false;
 
     public CompletedCheckpoint(
             long jobId,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
new file mode 100644
index 0000000000..4893bd2c2b
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointErrorRestoreEndTest
+        extends AbstractSeaTunnelServerTest<CheckpointErrorRestoreEndTest> {
+    public static String STREAM_CONF_WITH_ERROR_PATH =
+            "batch_fakesource_to_inmemory_with_commit_error.conf";
+
+    @Test
+    public void testCheckpointRestoreToFailEnd() {
+        long jobId = System.currentTimeMillis();
+        startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false);
+
+        JobMaster jobMaster = 
server.getCoordinatorService().getJobMaster(jobId);
+        Assertions.assertEquals(1, 
jobMaster.getPhysicalPlan().getPipelineList().size());
+        await().atMost(120, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        3,
+                                        jobMaster
+                                                .getPhysicalPlan()
+                                                .getPipelineList()
+                                                .get(0)
+                                                .getPipelineRestoreNum()));
+        await().atMost(120, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FAILED));
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
new file mode 100644
index 0000000000..b89ee138e2
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+       row.num = 100
+       split.num = 5
+       split.read-interval = 3000
+       parallelism = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  InMemory {
+    source_table_name="fake"
+    throw_exception_of_committer=true
+  }
+}
\ No newline at end of file

Reply via email to