This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new da8a37f0c [Improve][Zeta] job clean before JobMaster future complete 
(#4087)
da8a37f0c is described below

commit da8a37f0c3c361152559ce80b1fedb96ad559436
Author: Eric <[email protected]>
AuthorDate: Wed Feb 8 19:16:59 2023 +0800

    [Improve][Zeta] job clean before JobMaster future complete (#4087)
    
    * improve job clean code
    
    * Improve clean job
---
 .../engine/server/CoordinatorService.java          | 58 ++++++++--------------
 .../seatunnel/engine/server/master/JobMaster.java  | 39 +++++++++++++--
 2 files changed, 55 insertions(+), 42 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 5e68e661b..9957b2437 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -212,6 +212,7 @@ public class CoordinatorService {
                 runningJobStateIMap,
                 runningJobStateTimestampsIMap,
                 ownedSlotProfilesIMap,
+                runningJobInfoIMap,
                 engineConfig);
 
         
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
@@ -227,7 +228,7 @@ public class CoordinatorService {
             logger.info(String.format(
                 "The restore %s is in an end state %s, store the job info to 
JobHistory and clear the job running time info",
                 jobFullName, jobStatus));
-            removeJobIMap(jobMaster);
+            jobMaster.cleanJob();
             return;
         }
 
@@ -250,8 +251,11 @@ public class CoordinatorService {
                     jobMaster.cancelJob();
                     jobMaster.run();
                 } finally {
-                    // storage job state info to HistoryStorage
-                    onJobDone(jobMaster, jobId);
+                    // voidCompletableFuture will be cancelled when zeta 
master node shutdown to simulate master failure,
+                    // don't update runningJobMasterMap is this case.
+                    if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) 
{
+                        runningJobMasterMap.remove(jobId);
+                    }
                 }
             });
             return;
@@ -265,7 +269,11 @@ public class CoordinatorService {
                     
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
                     jobMaster.run();
                 } finally {
-                    onJobDone(jobMaster, jobId);
+                    // voidCompletableFuture will be cancelled when zeta 
master node shutdown to simulate master failure,
+                    // don't update runningJobMasterMap is this case.
+                    if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) 
{
+                        runningJobMasterMap.remove(jobId);
+                    }
                 }
             });
         }
@@ -297,7 +305,6 @@ public class CoordinatorService {
 
         try {
             executorService.awaitTermination(20, TimeUnit.SECONDS);
-            runningJobMasterMap = new ConcurrentHashMap<>();
         } catch (InterruptedException e) {
             throw new SeaTunnelEngineException("wait clean executor service 
error", e);
         }
@@ -335,7 +342,9 @@ public class CoordinatorService {
             getJobHistoryService(),
             runningJobStateIMap,
             runningJobStateTimestampsIMap,
-            ownedSlotProfilesIMap, engineConfig);
+            ownedSlotProfilesIMap,
+            runningJobInfoIMap,
+            engineConfig);
         executorService.submit(() -> {
             try {
                 runningJobInfoIMap.put(jobId, new 
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
@@ -353,7 +362,11 @@ public class CoordinatorService {
             try {
                 jobMaster.run();
             } finally {
-                onJobDone(jobMaster, jobId);
+                // voidCompletableFuture will be cancelled when zeta master 
node shutdown to simulate master failure,
+                // don't update runningJobMasterMap is this case.
+                if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) {
+                    runningJobMasterMap.remove(jobId);
+                }
             }
         });
         return new PassiveCompletableFuture<>(voidCompletableFuture);
@@ -372,37 +385,6 @@ public class CoordinatorService {
         return new PassiveCompletableFuture<>(voidCompletableFuture);
     }
 
-    private void onJobDone(JobMaster jobMaster, long jobId) {
-        // storage job state and metrics to HistoryStorage
-        jobHistoryService.storeJobInfo(jobId, 
runningJobMasterMap.get(jobId).getJobDAGInfo());
-        jobHistoryService.storeFinishedJobState(jobMaster);
-        removeJobIMap(jobMaster);
-        runningJobMasterMap.remove(jobId);
-    }
-
-    private void removeJobIMap(JobMaster jobMaster) {
-        Long jobId = jobMaster.getJobImmutableInformation().getJobId();
-        runningJobStateTimestampsIMap.remove(jobId);
-
-        jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
-            runningJobStateIMap.remove(pipeline.getPipelineLocation());
-            
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
-            pipeline.getCoordinatorVertexList().forEach(coordinator -> {
-                runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
-                
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
-            });
-
-            pipeline.getPhysicalVertexList().forEach(task -> {
-                runningJobStateIMap.remove(task.getTaskGroupLocation());
-                
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
-            });
-        });
-
-        // These should be deleted at the end.
-        runningJobStateIMap.remove(jobId);
-        runningJobInfoIMap.remove(jobId);
-    }
-
     public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
         JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
         if (runningJobMaster == null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 06af3d84f..a5e9b78ab 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
@@ -134,6 +135,8 @@ public class JobMaster {
 
     private Map<Integer, CheckpointPlan> checkpointPlanMap;
 
+    private final IMap<Long, JobInfo> runningJobInfoIMap;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
@@ -141,7 +144,9 @@ public class JobMaster {
                      @NonNull JobHistoryService jobHistoryService,
                      @NonNull IMap runningJobStateIMap,
                      @NonNull IMap runningJobStateTimestampsIMap,
-                     @NonNull IMap ownedSlotProfilesIMap, EngineConfig 
engineConfig) {
+                     @NonNull IMap ownedSlotProfilesIMap,
+                     @NonNull IMap<Long, JobInfo> runningJobInfoIMap,
+                     EngineConfig engineConfig) {
         this.jobImmutableInformationData = jobImmutableInformationData;
         this.nodeEngine = nodeEngine;
         this.executorService = executorService;
@@ -152,6 +157,7 @@ public class JobMaster {
         this.jobHistoryService = jobHistoryService;
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+        this.runningJobInfoIMap = runningJobInfoIMap;
         this.engineConfig = engineConfig;
     }
 
@@ -217,10 +223,11 @@ public class JobMaster {
         jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
             // We need not handle t, Because we will not return t from 
physicalPlan
             if (JobStatus.FAILING.equals(v.getStatus())) {
-                cleanJob();
                 physicalPlan.updateJobState(JobStatus.FAILING, 
JobStatus.FAILED);
             }
-            jobMasterCompleteFuture.complete(new 
JobResult(physicalPlan.getJobStatus(), v.getError()));
+            JobResult jobResult = new JobResult(physicalPlan.getJobStatus(), 
v.getError());
+            cleanJob();
+            jobMasterCompleteFuture.complete(jobResult);
         }));
     }
 
@@ -256,6 +263,28 @@ public class JobMaster {
         });
     }
 
+    private void removeJobIMap() {
+        Long jobId = getJobImmutableInformation().getJobId();
+        runningJobStateTimestampsIMap.remove(jobId);
+
+        getPhysicalPlan().getPipelineList().forEach(pipeline -> {
+            runningJobStateIMap.remove(pipeline.getPipelineLocation());
+            
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
+            pipeline.getCoordinatorVertexList().forEach(coordinator -> {
+                runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
+                
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
+            });
+
+            pipeline.getPhysicalVertexList().forEach(task -> {
+                runningJobStateIMap.remove(task.getTaskGroupLocation());
+                
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
+            });
+        });
+
+        runningJobStateIMap.remove(jobId);
+        runningJobInfoIMap.remove(jobId);
+    }
+
     public JobDAGInfo getJobDAGInfo() {
         if (jobDAGInfo == null) {
             jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, 
jobImmutableInformation, isPhysicalDAGIInfo);
@@ -277,7 +306,9 @@ public class JobMaster {
     }
 
     public void cleanJob() {
-        // TODO Add some job clean operation
+        jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), 
getJobDAGInfo());
+        jobHistoryService.storeFinishedJobState(this);
+        removeJobIMap();
     }
 
     public Address queryTaskGroupAddress(long taskGroupId) {

Reply via email to