This is an automated email from the ASF dual-hosted git repository.

xiaochenzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e400c2ca62 [Fix][Zeta] make the job failed when triggering checkpoint 
fails (apache#10442) (#10448)
e400c2ca62 is described below

commit e400c2ca62c3e7ad2f4919f50b0c2e1912ba1ea2
Author: Sephiroth <[email protected]>
AuthorDate: Mon Mar 23 20:13:05 2026 +0800

    [Fix][Zeta] make the job failed when triggering checkpoint fails 
(apache#10442) (#10448)
---
 .../server/checkpoint/CheckpointCoordinator.java   |  11 +-
 .../CheckpointBarrierTriggerErrorTest.java         | 125 +++++++++++++++++++++
 ...o_console_checkpoint_barrier_trigger_error.conf |  53 +++++++++
 3 files changed, 187 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 17985fc9c9..726b7c9d97 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -683,9 +683,16 @@ public class CheckpointCoordinator {
                     try {
                         CompletableFuture.allOf(completableFutureArray).get();
                     } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
+                        handleCoordinatorError(
+                                "triggering checkpoint barrier has been 
interrupted",
+                                e,
+                                CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
+                        return;
                     } catch (Exception e) {
-                        LOG.error(ExceptionUtils.getMessage(e));
+                        handleCoordinatorError(
+                                "triggering checkpoint barrier failed",
+                                e,
+                                CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
                         return;
                     }
                     if (coordinatorConfig.isCheckpointEnable()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
new file mode 100644
index 0000000000..ebcf282724
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointBarrierTriggerErrorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+import 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointBarrierTriggerOperation;
+import org.apache.seatunnel.engine.server.master.JobMaster;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+
+import com.hazelcast.internal.serialization.Data;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
+
+public class CheckpointBarrierTriggerErrorTest extends 
AbstractSeaTunnelServerTest {
+
+    private static final String CONF_PATH =
+            "stream_fake_to_console_checkpoint_barrier_trigger_error.conf";
+    private static final AtomicInteger COUNTER = new AtomicInteger(0);
+
+    @Test
+    public void testCheckpointBarrierTriggerError()
+            throws NoSuchFieldException, IllegalAccessException {
+        long jobId = System.currentTimeMillis();
+        startJob(jobId, CONF_PATH);
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.RUNNING));
+
+        CheckpointManager spiedCheckpointManager = 
spy(getCheckpointManager(jobId));
+        setCheckpointManager(spiedCheckpointManager);
+
+        doAnswer(this::mockException)
+                .when(spiedCheckpointManager)
+                
.sendOperationToMemberNode(Mockito.any(CheckpointBarrierTriggerOperation.class));
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertEquals(
+                                    
server.getCoordinatorService().getJobStatus(jobId),
+                                    JobStatus.FAILED);
+                        });
+    }
+
+    private void startJob(Long jobid, String path) {
+        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobid.toString(), jobid);
+
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        jobid,
+                        "Test",
+                        false,
+                        nodeEngine.getSerializationService(),
+                        testLogicalDag,
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                server.getCoordinatorService()
+                        .submitJob(jobid, data, 
jobImmutableInformation.isStartWithSavePoint());
+        voidPassiveCompletableFuture.join();
+    }
+
+    private Object mockException(InvocationOnMock invocation) throws Throwable 
{
+        if (COUNTER.incrementAndGet() == 1) {
+            throw new RuntimeException(
+                    "An exception occurred while sending 
CheckpointBarrierTriggerOperation.");
+        }
+        return invocation.callRealMethod();
+    }
+
+    private CheckpointManager getCheckpointManager(Long jobId)
+            throws NoSuchFieldException, IllegalAccessException {
+        JobMaster jobMaster = 
server.getCoordinatorService().getJobMaster(jobId);
+        Field checkpointManagerField = 
JobMaster.class.getDeclaredField("checkpointManager");
+        checkpointManagerField.setAccessible(true);
+        return (CheckpointManager) checkpointManagerField.get(jobMaster);
+    }
+
+    private void setCheckpointManager(CheckpointManager checkpointManager)
+            throws NoSuchFieldException, IllegalAccessException {
+        CheckpointCoordinator checkpointCoordinator = 
checkpointManager.getCheckpointCoordinator(1);
+        Field checkpointManagerField =
+                
CheckpointCoordinator.class.getDeclaredField("checkpointManager");
+        checkpointManagerField.setAccessible(true);
+        checkpointManagerField.set(checkpointCoordinator, checkpointManager);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
new file mode 100644
index 0000000000..b9f00bbaf4
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpoint_barrier_trigger_error.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  job.retry.times = 0
+  checkpoint.interval = 1000
+  checkpoint.timeout = 60000
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      plugin_output = "fake1"
+       row.num = 1000
+       split.num = 100
+       split.read-interval = 3000
+       parallelism = 1
+       schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+    }
+}
+
+transform {
+}
+
+sink {
+  console {
+
+  }
+}

Reply via email to