This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8c8122d2fe [Fix][Zeta] Fix job restore failure after master switch
when IMap still loading from S3 (#10562)
8c8122d2fe is described below
commit 8c8122d2fe5976e332c3c314929b74faccb0f543
Author: Ricky Makhija <[email protected]>
AuthorDate: Tue Mar 17 19:16:36 2026 +0530
[Fix][Zeta] Fix job restore failure after master switch when IMap still
loading from S3 (#10562)
---
.../engine/common/utils/ExceptionUtil.java | 26 +++++++++++++-
.../engine/common/utils/ExceptionUtilTest.java | 24 +++++++++++++
.../engine/server/CoordinatorService.java | 42 +++++++++++++++++++---
3 files changed, 86 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
index 5148c10843..c812ba54f4 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
@@ -31,6 +31,7 @@ import
com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
+import com.hazelcast.spi.exception.RetryableHazelcastException;
import lombok.NonNull;
import java.lang.reflect.InvocationTargetException;
@@ -151,10 +152,33 @@ public final class ExceptionUtil {
throw new RuntimeException("Never throw here.");
}
+ /**
+ * Check if an exception indicates an operation that should be retried.
+ *
+ * <p>This method is used by {@link
org.apache.seatunnel.common.utils.RetryUtils} to determine
+ * if a failed operation should be retried. It extracts the root cause of
the exception chain
+ * and checks if it matches known transient exception types.
+ *
+ * <p>The following exception types are considered retryable:
+ *
+ * <ul>
+ * <li>{@link HazelcastInstanceNotActiveException} - Hazelcast instance
is shutting down
+ * <li>{@link InterruptedException} - Operation was interrupted
+ * <li>{@link OperationTimeoutException} - Operation timed out waiting
for a response
+ * <li>{@link RetryableHazelcastException} - Hazelcast explicitly marks
the operation as
+ * retryable, e.g., when an IMap partition is still loading data
from external storage
+ * (MapStore) during cluster startup or master switch
+ * </ul>
+ *
+ * @param e the exception to check (may be wrapped in CompletionException
/ ExecutionException)
+ * @return {@code true} if the root cause is a transient, retryable
exception; {@code false}
+ * otherwise
+ */
public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
Throwable exception = ExceptionUtils.getRootException(e);
return exception instanceof HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException
- || exception instanceof OperationTimeoutException;
+ || exception instanceof OperationTimeoutException
+ || exception instanceof RetryableHazelcastException;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
index ee2e03a503..6df194a8c0 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java
@@ -19,7 +19,11 @@ package org.apache.seatunnel.engine.common.utils;
import org.junit.jupiter.api.Test;
+import com.hazelcast.spi.exception.RetryableHazelcastException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ExceptionUtilTest {
@@ -45,4 +49,24 @@ public class ExceptionUtilTest {
void throwsNullPointerExceptionWhenNull() {
assertThrows(NullPointerException.class, () ->
ExceptionUtil.sneakyThrow(null));
}
+
+ @Test
+ void testIsOperationNeedRetryException_withRetryableHazelcastException() {
+ RetryableHazelcastException exception = new
RetryableHazelcastException("IMap loading");
+ assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
+ }
+
+ @Test
+ void
testIsOperationNeedRetryException_withWrappedRetryableHazelcastException() {
+ Throwable exception =
+ new Exception(
+ new RuntimeException(new
RetryableHazelcastException("IMap loading")));
+ assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
+ }
+
+ @Test
+ void testIsOperationNeedRetryException_withNonRetryableException() {
+ Exception exception = new Exception("Non-retryable error");
+ assertFalse(ExceptionUtil.isOperationNeedRetryException(exception));
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d32ded069e..1053a7a144 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.event.EventProcessor;
import org.apache.seatunnel.api.tracing.MDCExecutorService;
import org.apache.seatunnel.api.tracing.MDCTracer;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
@@ -39,6 +40,7 @@ import
org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobResult;
import org.apache.seatunnel.engine.common.job.JobStatus;
+import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
@@ -447,10 +449,26 @@ public class CoordinatorService {
}
private void restoreAllRunningJobFromMasterNodeSwitch() {
- List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs =
- runningJobInfoIMap.entrySet().stream()
- .filter(entry ->
!runningJobMasterMap.containsKey(entry.getKey()))
- .collect(Collectors.toList());
+ List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs;
+ try {
+ needRestoreFromMasterNodeSwitchJobs =
+ RetryUtils.retryWithException(
+ () ->
+ runningJobInfoIMap.entrySet().stream()
+ .filter(
+ entry ->
+
!runningJobMasterMap.containsKey(
+
entry.getKey()))
+ .collect(Collectors.toList()),
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+
ExceptionUtil::isOperationNeedRetryException,
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException(
+ "Failed to fetch running jobs from IMap during master
switch restore", e);
+ }
if (needRestoreFromMasterNodeSwitchJobs.isEmpty()) {
return;
}
@@ -504,7 +522,21 @@ public class CoordinatorService {
}
private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId,
@NonNull JobInfo jobInfo) {
- if (runningJobStateIMap.get(jobId) == null) {
+ Object jobState;
+ try {
+ jobState =
+ RetryUtils.retryWithException(
+ () -> runningJobStateIMap.get(jobId),
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ true,
+
ExceptionUtil::isOperationNeedRetryException,
+ Constant.OPERATION_RETRY_SLEEP));
+ } catch (Exception e) {
+ throw new SeaTunnelEngineException(
+ String.format("Job id %s restore failed, can not get job
state", jobId), e);
+ }
+ if (jobState == null) {
runningJobInfoIMap.remove(jobId);
return;
}