This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 57e5627e89 [Hotfix][Zeta] Fix that batch tasks cannot be stopped after
retrying (#7083)
57e5627e89 is described below
commit 57e5627e89e870f26c1b0774cc225eaabe7f3c9a
Author: hailin0 <[email protected]>
AuthorDate: Wed Jul 10 11:50:14 2024 +0800
[Hotfix][Zeta] Fix that batch tasks cannot be stopped after retrying (#7083)
---
.../sink/inmemory/InMemoryAggregatedCommitter.java | 9 ++++
.../seatunnel/e2e/sink/inmemory/InMemorySink.java | 2 +-
.../e2e/sink/inmemory/InMemorySinkFactory.java | 9 +++-
.../server/checkpoint/CheckpointCoordinator.java | 3 ++
.../server/checkpoint/CompletedCheckpoint.java | 2 +-
.../checkpoint/CheckpointErrorRestoreEndTest.java | 63 ++++++++++++++++++++++
...h_fakesource_to_inmemory_with_commit_error.conf | 52 ++++++++++++++++++
7 files changed, 137 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
index adc4c1cf2f..e63175c4f6 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemoryAggregatedCommitter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.e2e.sink.inmemory;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
@@ -32,6 +33,11 @@ public class InMemoryAggregatedCommitter
private static final List<String> events = new ArrayList<>();
private static final List<InMemoryMultiTableResourceManager>
resourceManagers =
new ArrayList<>();
+ private ReadonlyConfig config;
+
+ public InMemoryAggregatedCommitter(ReadonlyConfig config) {
+ this.config = config;
+ }
public static List<String> getEvents() {
return events;
@@ -62,6 +68,9 @@ public class InMemoryAggregatedCommitter
@Override
public List<InMemoryAggregatedCommitInfo> commit(
List<InMemoryAggregatedCommitInfo> aggregatedCommitInfo) throws
IOException {
+ if (config.get(InMemorySinkFactory.THROW_EXCEPTION_OF_COMMITTER)) {
+ throw new IOException("commit failed");
+ }
return new ArrayList<>();
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
index 8f1eba9af4..9e3852fb3c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySink.java
@@ -68,7 +68,7 @@ public class InMemorySink
@Override
public Optional<SinkAggregatedCommitter<InMemoryCommitInfo,
InMemoryAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
- return Optional.of(new InMemoryAggregatedCommitter());
+ return Optional.of(new InMemoryAggregatedCommitter(config));
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index 7b06ec99d9..1ab973652f 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -41,6 +41,9 @@ public class InMemorySinkFactory
public static final Option<Boolean> CHECKPOINT_SLEEP =
Options.key("checkpoint_sleep").booleanType().defaultValue(false);
+ public static final Option<Boolean> THROW_EXCEPTION_OF_COMMITTER =
+
Options.key("throw_exception_of_committer").booleanType().defaultValue(false);
+
@Override
public String factoryIdentifier() {
return "InMemory";
@@ -49,7 +52,11 @@ public class InMemorySinkFactory
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY,
CHECKPOINT_SLEEP)
+ .optional(
+ THROW_EXCEPTION,
+ THROW_OUT_OF_MEMORY,
+ CHECKPOINT_SLEEP,
+ THROW_EXCEPTION_OF_COMMITTER)
.build();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 2b7498f485..8735048eac 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -293,6 +293,9 @@ public class CheckpointCoordinator {
private void restoreTaskState(TaskLocation taskLocation) {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
+ if (!latestCompletedCheckpoint.isRestored()) {
+ latestCompletedCheckpoint.setRestored(true);
+ }
final Integer currentParallelism =
pipelineTasks.get(taskLocation.getTaskVertexId());
plan.getSubtaskActions()
.get(taskLocation)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 74be795205..7865b9c4dc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -46,7 +46,7 @@ public class CompletedCheckpoint implements Checkpoint,
Serializable {
private final Map<Long, TaskStatistics> taskStatistics;
- @Getter @Setter private boolean isRestored = false;
+ @Getter @Setter private volatile boolean isRestored = false;
public CompletedCheckpoint(
long jobId,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
new file mode 100644
index 0000000000..4893bd2c2b
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointErrorRestoreEndTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointErrorRestoreEndTest
+ extends AbstractSeaTunnelServerTest<CheckpointErrorRestoreEndTest> {
+ public static String STREAM_CONF_WITH_ERROR_PATH =
+ "batch_fakesource_to_inmemory_with_commit_error.conf";
+
+ @Test
+ public void testCheckpointRestoreToFailEnd() {
+ long jobId = System.currentTimeMillis();
+ startJob(jobId, STREAM_CONF_WITH_ERROR_PATH, false);
+
+ JobMaster jobMaster =
server.getCoordinatorService().getJobMaster(jobId);
+ Assertions.assertEquals(1,
jobMaster.getPhysicalPlan().getPipelineList().size());
+ await().atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 3,
+ jobMaster
+ .getPhysicalPlan()
+ .getPipelineList()
+ .get(0)
+ .getPipelineRestoreNum()));
+ await().atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FAILED));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
new file mode 100644
index 0000000000..b89ee138e2
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_inmemory_with_commit_error.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ InMemory {
+ source_table_name="fake"
+ throw_exception_of_committer=true
+ }
+}
\ No newline at end of file