This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 73ddfc4f2 [Improve][Zeta] Improve Zeta operation max count and ignore 
NPE (#4787)
73ddfc4f2 is described below

commit 73ddfc4f28423b35856625f59c39404afe1ec6d1
Author: Jia Fan <[email protected]>
AuthorDate: Sun May 21 21:36:16 2023 +0800

    [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
    
    * [Improve][Zeta] Improve Zeta operation max count and ignore NPE
    
    * [Improve][Zeta] Improve Zeta operation max count and ignore NPE
---
 config/hazelcast.yaml                              |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  | 50 +++++++++++-----------
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/config/hazelcast.yaml b/config/hazelcast.yaml
index 84a6e2457..87f607960 100644
--- a/config/hazelcast.yaml
+++ b/config/hazelcast.yaml
@@ -37,5 +37,5 @@ hazelcast:
     hazelcast.invocation.max.retry.count: 20
     hazelcast.tcp.join.port.try.count: 30
     hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 100
+    hazelcast.operation.generic.thread.count: 1000
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d13fcbbd1..de9da0a1b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -553,31 +553,31 @@ public class JobMaster {
     }
 
     private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
-        ownedSlotProfilesIMap
-                .get(pipelineLocation)
-                .forEach(
-                        (taskGroupLocation, slotProfile) -> {
-                            try {
-                                if (nodeEngine
-                                                .getClusterService()
-                                                
.getMember(slotProfile.getWorker())
-                                        != null) {
-                                    NodeEngineUtil.sendOperationToMemberNode(
-                                                    nodeEngine,
-                                                    new 
CleanTaskGroupContextOperation(
-                                                            taskGroupLocation),
-                                                    slotProfile.getWorker())
-                                            .get();
-                                }
-                            } catch (HazelcastInstanceNotActiveException e) {
-                                LOGGER.warning(
-                                        String.format(
-                                                "%s clean TaskGroupContext 
with exception: %s.",
-                                                taskGroupLocation, 
ExceptionUtils.getMessage(e)));
-                            } catch (Exception e) {
-                                throw new SeaTunnelException(e.getMessage());
-                            }
-                        });
+        Map<TaskGroupLocation, SlotProfile> slotProfileMap =
+                ownedSlotProfilesIMap.get(pipelineLocation);
+        if (slotProfileMap == null) {
+            return;
+        }
+        slotProfileMap.forEach(
+                (taskGroupLocation, slotProfile) -> {
+                    try {
+                        if 
(nodeEngine.getClusterService().getMember(slotProfile.getWorker())
+                                != null) {
+                            NodeEngineUtil.sendOperationToMemberNode(
+                                            nodeEngine,
+                                            new 
CleanTaskGroupContextOperation(taskGroupLocation),
+                                            slotProfile.getWorker())
+                                    .get();
+                        }
+                    } catch (HazelcastInstanceNotActiveException e) {
+                        LOGGER.warning(
+                                String.format(
+                                        "%s clean TaskGroupContext with 
exception: %s.",
+                                        taskGroupLocation, 
ExceptionUtils.getMessage(e)));
+                    } catch (Exception e) {
+                        throw new SeaTunnelException(e.getMessage());
+                    }
+                });
     }
 
     public PhysicalPlan getPhysicalPlan() {

Reply via email to