This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3536a7dad fix 
https://github.com/apache/incubator-seatunnel/issues/4042 (#4045)
3536a7dad is described below

commit 3536a7dad9c2f0b7814c3f970a8948897d118588
Author: Eric <[email protected]>
AuthorDate: Fri Feb 3 15:50:46 2023 +0800

    fix https://github.com/apache/incubator-seatunnel/issues/4042 (#4045)
---
 .../engine/server/CoordinatorService.java          |  7 +++--
 .../engine/server/dag/physical/PhysicalPlan.java   |  7 +++--
 .../engine/server/dag/physical/SubPlan.java        |  4 ++-
 .../seatunnel/engine/server/master/JobMaster.java  | 35 ++++++++++++++--------
 4 files changed, 36 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index f5e502201..5e68e661b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -214,10 +214,12 @@ public class CoordinatorService {
                 ownedSlotProfilesIMap,
                 engineConfig);
 
+        
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
         try {
-            
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+            jobMaster.initCheckPointManager();
         } catch (Exception e) {
-            throw new SeaTunnelEngineException(String.format("Job id %s init 
JobMaster failed", jobId), e);
+            jobMaster.cancelJob();
+            throw new SeaTunnelEngineException(String.format("Job id %s init 
CheckPointManager failed", jobId), e);
         }
 
         String jobFullName = jobMaster.getPhysicalPlan().getJobFullName();
@@ -339,6 +341,7 @@ public class CoordinatorService {
                 runningJobInfoIMap.put(jobId, new 
JobInfo(System.currentTimeMillis(), jobImmutableInformation));
                 runningJobMasterMap.put(jobId, jobMaster);
                 
jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
+                jobMaster.initCheckPointManager();
             } catch (Throwable e) {
                 logger.severe(String.format("submit job %s error %s ", jobId, 
ExceptionUtils.getMessage(e)));
                 voidCompletableFuture.completeExceptionally(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index dee54f555..29b5328a2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -144,8 +144,11 @@ public class PhysicalPlan {
         future.thenAcceptAsync(pipelineState -> {
             try {
                 // Notify checkpoint manager when the pipeline end, Whether 
the pipeline will be restarted or not
-                jobMaster.getCheckpointManager()
-                    
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), 
subPlan.getPipelineState()).join();
+                if (jobMaster.getCheckpointManager() != null) {
+                    jobMaster.getCheckpointManager()
+                        
.listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), 
subPlan.getPipelineState())
+                        .join();
+                }
                 if 
(PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) {
                     if (canRestorePipeline(subPlan)) {
                         subPlan.restorePipeline();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index b88224458..f21d49c26 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -262,7 +262,9 @@ public class SubPlan {
     }
 
     private void cancelCheckpointCoordinator() {
-        jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, 
PipelineStatus.CANCELING).join();
+        if (jobMaster.getCheckpointManager() != null) {
+            jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, 
PipelineStatus.CANCELING).join();
+        }
     }
 
     private void cancelPipelineTasks() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 0e967a2bc..c9352461f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
@@ -130,6 +131,8 @@ public class JobMaster {
 
     private boolean isRunning = true;
 
+    private Map<Integer, CheckpointPlan> checkpointPlanMap;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
@@ -151,7 +154,7 @@ public class JobMaster {
         this.engineConfig = engineConfig;
     }
 
-    public void init(long initializationTimestamp) throws Exception {
+    public void init(long initializationTimestamp) {
         jobImmutableInformation = 
nodeEngine.getSerializationService().toObject(
             jobImmutableInformationData);
         LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", 
jobImmutableInformation.getJobConfig().getName(),
@@ -162,8 +165,6 @@ public class JobMaster {
         classLoader = new 
SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls());
         logicalDag = 
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
             classLoader, jobImmutableInformation.getLogicalDag());
-        CheckpointConfig checkpointConfig = 
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
-            jobImmutableInformation.getJobConfig().getEnvOptions());
 
         final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple = 
PlanUtils.fromLogicalDAG(logicalDag,
             nodeEngine,
@@ -176,14 +177,20 @@ public class JobMaster {
             engineConfig.getQueueType());
         this.physicalPlan = planTuple.f0();
         this.physicalPlan.setJobMaster(this);
+        this.checkpointPlanMap = planTuple.f1();
+        this.initStateFuture();
+    }
+
+    public void initCheckPointManager() throws CheckpointStorageException {
+        CheckpointConfig checkpointConfig = 
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
+            jobImmutableInformation.getJobConfig().getEnvOptions());
         this.checkpointManager = new CheckpointManager(
             jobImmutableInformation.getJobId(),
             jobImmutableInformation.isStartWithSavePoint(),
             nodeEngine,
             this,
-            planTuple.f1(),
+            checkpointPlanMap,
             checkpointConfig);
-        this.initStateFuture();
     }
 
     // TODO replace it after ReadableConfig Support parse yaml format, then 
use only one config to read engine and env config.
@@ -323,8 +330,9 @@ public class JobMaster {
             groupLocation.forEach((taskGroupLocation, slotProfile) -> {
                 if (taskGroupLocation.getJobId() == 
this.getJobImmutableInformation().getJobId()) {
                     try {
-                        RawJobMetrics rawJobMetrics = (RawJobMetrics) 
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
-                            new 
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
+                        RawJobMetrics rawJobMetrics =
+                            (RawJobMetrics) 
NodeEngineUtil.sendOperationToMemberNode(nodeEngine,
+                                new 
GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get();
                         metrics.add(rawJobMetrics);
                     } catch (Exception e) {
                         throw new SeaTunnelException(e.getMessage());
@@ -336,7 +344,8 @@ public class JobMaster {
     }
 
     public void savePipelineMetricsToHistory(PipelineLocation 
pipelineLocation) {
-        List<RawJobMetrics> currJobMetrics = 
this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
+        List<RawJobMetrics> currJobMetrics =
+            
this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation)));
         JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics);
         long jobId = this.getJobImmutableInformation().getJobId();
         synchronized (this) {
@@ -346,8 +355,9 @@ public class JobMaster {
         this.cleanTaskGroupContext(pipelineLocation);
     }
 
-    public void removeMetricsContext(PipelineLocation pipelineLocation, 
PipelineStatus pipelineStatus){
-        if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd() || 
pipelineStatus.equals(PipelineStatus.CANCELED)){
+    public void removeMetricsContext(PipelineLocation pipelineLocation, 
PipelineStatus pipelineStatus) {
+        if (pipelineStatus.equals(PipelineStatus.FINISHED) && 
!checkpointManager.isSavePointEnd() ||
+            pipelineStatus.equals(PipelineStatus.CANCELED)) {
             IMap<TaskLocation, MetricsContext> map =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
             map.keySet().stream().filter(taskLocation -> {
@@ -399,7 +409,7 @@ public class JobMaster {
     /**
      * Execute savePoint, which will cause the job to end.
      */
-    public CompletableFuture<Void> savePoint(){
+    public CompletableFuture<Void> savePoint() {
         PassiveCompletableFuture<CompletedCheckpoint>[] 
passiveCompletableFutures =
             checkpointManager.triggerSavepoints();
         return CompletableFuture.allOf(passiveCompletableFutures);
@@ -414,7 +424,8 @@ public class JobMaster {
                                      @NonNull Map<TaskGroupLocation, 
SlotProfile> pipelineOwnedSlotProfiles) {
         ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
         try {
-            RetryUtils.retryWithException(() -> 
pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
+            RetryUtils.retryWithException(
+                () -> 
pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)),
                 new RetryUtils.RetryMaterial(20, true,
                     exception -> exception instanceof NullPointerException && 
isRunning, 1000));
         } catch (Exception e) {

Reply via email to