This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 556cb35ca [hotfix][engine] Fix local can not exit while job complete 
(#3068)
556cb35ca is described below

commit 556cb35ca89f95a3d5efbca43b6c150d3cd9b35f
Author: hailin0 <[email protected]>
AuthorDate: Wed Oct 12 14:21:50 2022 +0800

    [hotfix][engine] Fix local can not exit while job complete (#3068)
---
 .../org/apache/seatunnel/engine/server/CoordinatorService.java     | 7 ++++---
 .../java/org/apache/seatunnel/engine/server/SeaTunnelServer.java   | 6 ++++--
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5af9d47c9..2b4ef575a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -111,15 +111,15 @@ public class CoordinatorService {
     private volatile boolean isActive = false;
 
     private final ExecutorService executorService;
+    private final ScheduledExecutorService monitorService;
 
     @SuppressWarnings("checkstyle:MagicNumber")
     public CoordinatorService(@NonNull NodeEngineImpl nodeEngine, @NonNull 
ExecutorService executorService) {
         this.nodeEngine = nodeEngine;
         this.logger = nodeEngine.getLogger(getClass());
         this.executorService = executorService;
-
-        ScheduledExecutorService masterActiveListener = 
Executors.newSingleThreadScheduledExecutor();
-        masterActiveListener.scheduleAtFixedRate(() -> checkNewActiveMaster(), 
0, 100, TimeUnit.MILLISECONDS);
+        this.monitorService = Executors.newSingleThreadScheduledExecutor();
+        monitorService.scheduleAtFixedRate(() -> checkNewActiveMaster(), 0, 
100, TimeUnit.MILLISECONDS);
     }
 
     public JobMaster getJobMaster(Long jobId) {
@@ -390,6 +390,7 @@ public class CoordinatorService {
         if (resourceManager != null) {
             resourceManager.close();
         }
+        monitorService.shutdown();
     }
 
     /**
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index fe7ce55e0..90a87a770 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -58,6 +58,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
     private volatile SlotService slotService;
     private TaskExecutionService taskExecutionService;
     private CoordinatorService coordinatorService;
+    private ScheduledExecutorService monitorService;
 
     private final ExecutorService executorService;
 
@@ -100,8 +101,8 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         taskExecutionService.start();
         getSlotService();
         coordinatorService = new CoordinatorService(nodeEngine, 
executorService);
-        ScheduledExecutorService service = 
Executors.newSingleThreadScheduledExecutor();
-        service.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, 
TimeUnit.SECONDS);
+        monitorService = Executors.newSingleThreadScheduledExecutor();
+        monitorService.scheduleAtFixedRate(() -> printExecutionInfo(), 0, 60, 
TimeUnit.SECONDS);
     }
 
     @Override
@@ -119,6 +120,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         }
         executorService.shutdown();
         taskExecutionService.shutdown();
+        monitorService.shutdown();
     }
 
     @Override

Reply via email to