This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ddab43897 [HotFix][Zeta] Fix clean TaskGroupContext Error when target
node is offline (#4086)
ddab43897 is described below
commit ddab438970546de4e7ff5ef650c708c163d37a04
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 8 16:44:20 2023 +0800
[HotFix][Zeta] Fix clean TaskGroupContext Error when target node is offline
(#4086)
---
.../seatunnel/engine/server/master/JobMaster.java | 33 ++++++++++++++++------
1 file changed, 25 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c9352461f..06af3d84f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -62,6 +62,7 @@ import
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.google.common.collect.Lists;
import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.datamodel.Tuple2;
@@ -330,11 +331,20 @@ public class JobMaster {
groupLocation.forEach((taskGroupLocation, slotProfile) -> {
if (taskGroupLocation.getJobId() ==
this.getJobImmutableInformation().getJobId()) {
try {
- RawJobMetrics rawJobMetrics =
- (RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
- new
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
- metrics.add(rawJobMetrics);
- } catch (Exception e) {
+ if
(nodeEngine.getClusterService().getMember(slotProfile.getWorker()) != null) {
+ RawJobMetrics rawJobMetrics =
+ (RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+ new
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
+ metrics.add(rawJobMetrics);
+ }
+ }
+ // HazelcastInstanceNotActiveException. It means that the
node is offline, so waiting for the taskGroup to restore can be successful
+ catch (HazelcastInstanceNotActiveException e) {
+ LOGGER.warning(
+ String.format("%s get current job metrics with
exception: %s.", taskGroupLocation,
+ ExceptionUtils.getMessage(e)));
+ }
+ catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}
}
@@ -369,9 +379,16 @@ public class JobMaster {
private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation,
slotProfile) -> {
try {
- NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
- new CleanTaskGroupContextOperation(taskGroupLocation),
slotProfile.getWorker()).get();
- } catch (Exception e) {
+ if
(nodeEngine.getClusterService().getMember(slotProfile.getWorker()) != null) {
+ NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+ new CleanTaskGroupContextOperation(taskGroupLocation),
slotProfile.getWorker()).get();
+ }
+ }
+ catch (HazelcastInstanceNotActiveException e) {
+ LOGGER.warning(String.format("%s clean TaskGroupContext with
exception: %s.", taskGroupLocation,
+ ExceptionUtils.getMessage(e)));
+ }
+ catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}
});