This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a01e4a243 [hotfix][engine] fix task operation retry timeout error
(#3009)
a01e4a243 is described below
commit a01e4a2431124d791cfa1192dd41aff29dcf81e2
Author: Eric <[email protected]>
AuthorDate: Fri Oct 7 00:01:53 2022 +0800
[hotfix][engine] fix task operation retry timeout error (#3009)
---
.../seatunnel/engine/server/SeaTunnelServer.java | 18 ++++++++++++++++++
.../operation/CheckpointBarrierTriggerOperation.java | 3 ++-
.../operation/CheckpointFinishedOperation.java | 3 ++-
.../operation/NotifyTaskRestoreOperation.java | 3 ++-
.../checkpoint/operation/NotifyTaskStartOperation.java | 3 ++-
.../operation/checkpoint/CloseRequestOperation.java | 3 ++-
.../task/operation/source/AssignSplitOperation.java | 3 ++-
.../task/operation/source/RequestSplitOperation.java | 3 ++-
.../task/operation/source/RestoredSplitOperation.java | 6 ++++--
.../operation/source/SourceNoMoreElementOperation.java | 3 ++-
.../task/operation/source/SourceRegisterOperation.java | 3 ++-
11 files changed, 40 insertions(+), 11 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 8c1f1ebae..ead141dac 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
@@ -30,6 +32,7 @@ import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.jet.impl.LiveOperationRegistry;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
+import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
@@ -172,6 +175,21 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
return taskExecutionService;
}
+ /**
+ * return whether task is end
+ * @param taskGroupLocation taskGroupLocation
+ * @return
+ */
+ public boolean taskIsEnded(@NonNull TaskGroupLocation taskGroupLocation) {
+ IMap<Object, Object> runningJobState =
nodeEngine.getHazelcastInstance().getMap("runningJobState");
+ if (runningJobState == null) {
+ return false;
+ }
+
+ Object taskState = runningJobState.get(taskGroupLocation);
+ return taskState == null ? false : ((ExecutionState)
taskState).isEndState();
+ }
+
private void printExecutionInfo() {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor)
executorService;
int activeCount = threadPoolExecutor.getActiveCount();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
index 52fefe1cd..9383e725a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointBarrierTriggerOperation.java
@@ -81,6 +81,7 @@ public class CheckpointBarrierTriggerOperation extends
TaskOperation {
}
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
index 00e82aa95..422190a7d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/CheckpointFinishedOperation.java
@@ -89,6 +89,7 @@ public class CheckpointFinishedOperation extends
TaskOperation {
}
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
index faad04120..9bed053ff 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskRestoreOperation.java
@@ -88,6 +88,7 @@ public class NotifyTaskRestoreOperation extends TaskOperation
{
}
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
index 662226f9c..ace26fb92 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/operation/NotifyTaskStartOperation.java
@@ -53,6 +53,7 @@ public class NotifyTaskStartOperation extends TaskOperation {
task.startCall();
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
index 3f005648e..859403b66 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/checkpoint/CloseRequestOperation.java
@@ -50,7 +50,8 @@ public class CloseRequestOperation extends Operation
implements IdentifiedDataSe
task.close();
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(readerLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index 9bcd5bda9..3eb20042b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -58,7 +58,8 @@ public class AssignSplitOperation<SplitT extends SourceSplit>
extends Operation
task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT)
i).collect(Collectors.toList()));
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index ba853833f..13536215d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -54,7 +54,8 @@ public class RequestSplitOperation extends Operation
implements IdentifiedDataSe
task.requestSplit(taskID.getTaskID());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 5dd330ca3..30df27151 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -75,7 +75,8 @@ public class RestoredSplitOperation extends TaskOperation {
@Override
public void run() throws Exception {
- TaskExecutionService taskExecutionService = ((SeaTunnelServer)
getService()).getTaskExecutionService();
+ SeaTunnelServer server = getService();
+ TaskExecutionService taskExecutionService =
server.getTaskExecutionService();
ClassLoader classLoader =
taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
List<SourceSplit> deserialize =
Arrays.asList(SerializationUtils.deserialize(splits, classLoader));
RetryUtils.retryWithException(() -> {
@@ -83,6 +84,7 @@ public class RestoredSplitOperation extends TaskOperation {
task.addSplitsBack(deserialize, subtaskIndex);
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(taskLocation.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 0cba1c1f7..1886d7be4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -53,7 +53,8 @@ public class SourceNoMoreElementOperation extends Operation
implements Identifie
task.readerFinished(currentTaskID.getTaskID());
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
Constant.OPERATION_RETRY_SLEEP));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index f3c88c25b..005cda41e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -61,7 +61,8 @@ public class SourceRegisterOperation extends Operation
implements IdentifiedData
task.receivedReader(readerTaskID, readerAddress);
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
- exception -> exception instanceof NullPointerException,
RETRY_TIME_OUT));
+ exception -> exception instanceof NullPointerException &&
+ !server.taskIsEnded(enumeratorTaskID.getTaskGroupLocation()),
RETRY_TIME_OUT));
}
@Override