This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ddab43897 [HotFix][Zeta] Fix clean TaskGroupContext Error when target 
node is offline (#4086)
ddab43897 is described below

commit ddab438970546de4e7ff5ef650c708c163d37a04
Author: ic4y <[email protected]>
AuthorDate: Wed Feb 8 16:44:20 2023 +0800

    [HotFix][Zeta] Fix clean TaskGroupContext Error when target node is offline 
(#4086)
---
 .../seatunnel/engine/server/master/JobMaster.java  | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c9352461f..06af3d84f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -62,6 +62,7 @@ import 
org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.jet.datamodel.Tuple2;
@@ -330,11 +331,20 @@ public class JobMaster {
             groupLocation.forEach((taskGroupLocation, slotProfile) -> {
                 if (taskGroupLocation.getJobId() == 
this.getJobImmutableInformation().getJobId()) {
                     try {
-                        RawJobMetrics rawJobMetrics =
-                            (RawJobMetrics) 
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
-                                new 
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
-                        metrics.add(rawJobMetrics);
-                    } catch (Exception e) {
+                        if 
(nodeEngine.getClusterService().getMember(slotProfile.getWorker()) != null) {
+                            RawJobMetrics rawJobMetrics =
+                                (RawJobMetrics) 
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+                                    new 
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
+                            metrics.add(rawJobMetrics);
+                        }
+                    }
+                    // HazelcastInstanceNotActiveException. It means that the 
node is offline, so waiting for the taskGroup to restore can be successful
+                    catch (HazelcastInstanceNotActiveException e) {
+                        LOGGER.warning(
+                            String.format("%s get current job metrics with 
exception: %s.", taskGroupLocation,
+                                ExceptionUtils.getMessage(e)));
+                    }
+                    catch (Exception e) {
                         throw new SeaTunnelException(e.getMessage());
                     }
                 }
@@ -369,9 +379,16 @@ public class JobMaster {
     private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
         
ownedSlotProfilesIMap.get(pipelineLocation).forEach((taskGroupLocation, 
slotProfile) -> {
             try {
-                NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
-                    new CleanTaskGroupContextOperation(taskGroupLocation), 
slotProfile.getWorker()).get();
-            } catch (Exception e) {
+                if 
(nodeEngine.getClusterService().getMember(slotProfile.getWorker()) != null) {
+                    NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+                        new CleanTaskGroupContextOperation(taskGroupLocation), 
slotProfile.getWorker()).get();
+                }
+            }
+            catch (HazelcastInstanceNotActiveException  e) {
+                LOGGER.warning(String.format("%s clean TaskGroupContext with 
exception: %s.", taskGroupLocation,
+                    ExceptionUtils.getMessage(e)));
+            }
+            catch (Exception e) {
                 throw new SeaTunnelException(e.getMessage());
             }
         });

Reply via email to