This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new f445f8d2b [Feature][ST-Engine] Add initFuture to PhysicalPlan (#2560)
f445f8d2b is described below

commit f445f8d2bc8ec3fdfe599d84c175b82c53c6344f
Author: Eric <[email protected]>
AuthorDate: Tue Aug 30 09:22:48 2022 +0800

    [Feature][ST-Engine] Add initFuture to PhysicalPlan (#2560)
    
    * Add initFuture to PhysicalPlan
    
    * fix review error
    
    * fix CI error
---
 .github/workflows/backend.yml                      |  2 +-
 .github/workflows/engine_backend.yml               |  2 +-
 .../engine/e2e/engine/JobExecutionIT.java          | 16 ++--
 .../engine/client/SeaTunnelClientTest.java         | 11 ++-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  1 +
 .../engine/server/dag/physical/PhysicalPlan.java   | 30 ++++----
 .../server/dag/physical/PhysicalPlanGenerator.java | 53 ++++---------
 .../engine/server/dag/physical/PhysicalVertex.java |  7 +-
 .../engine/server/dag/physical/SubPlan.java        | 87 +++++++++++-----------
 .../seatunnel/engine/server/master/JobMaster.java  |  4 +
 .../seatunnel/engine/server/dag/TaskTest.java      |  8 +-
 11 files changed, 106 insertions(+), 115 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 26e9b43eb..c3a460bb9 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -24,7 +24,7 @@ on:
       - '**/*.md'
       - 'seatunnel-ui/**'
       - 'seatunnel-engine/**'
-      - 'seatunnel-engine/seatunnel-engine-e2e/**'
+      - 'seatunnel-e2e/seatunnel-engine-e2e/**'
       - 'seatunnel-examples/seatunnel-engine-examples/**'
       - 'seatunnel-core/seatunnel-seatunnel-starter/**'
       - 'generate_client_protocol.sh'
diff --git a/.github/workflows/engine_backend.yml 
b/.github/workflows/engine_backend.yml
index a57f6d9ac..1c9ed07ac 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -21,7 +21,7 @@ on:
   pull_request:
     paths:
       - 'seatunnel-engine/**'
-      - 'seatunnel-engine/seatunnel-engine-e2e/**'
+      - 'seatunnel-e2e/seatunnel-engine-e2e/**'
       - 'seatunnel-examples/seatunnel-engine-examples/**'
       - 'seatunnel-core/seatunnel-seatunnel-starter/**'
       - 'generate_client_protocol.sh'
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index 8b286c482..f67042748 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.engine.e2e.engine;
 
 import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
@@ -84,8 +83,15 @@ public class JobExecutionIT {
 
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-            await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> assertEquals(JobStatus.FINISHED, 
clientJobProxy.waitForJobComplete()));
+
+            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                JobStatus jobStatus = clientJobProxy.waitForJobComplete();
+                Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+                return null;
+            });
+
+            await().atMost(20000, TimeUnit.MILLISECONDS)
+                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -118,9 +124,7 @@ public class JobExecutionIT {
             clientJobProxy.cancelJob();
 
             await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> {
-                    Assert.assertTrue(objectCompletableFuture.isDone());
-                });
+                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
 
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a4745687c..540ff2074 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.engine.client;
 
 import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
 
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
@@ -40,6 +39,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -83,8 +83,15 @@ public class SeaTunnelClientTest {
 
         try {
             final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            CompletableFuture<Object> objectCompletableFuture = 
CompletableFuture.supplyAsync(() -> {
+                JobStatus jobStatus = clientJobProxy.waitForJobComplete();
+                Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+                return null;
+            });
+
             await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> assertEquals(JobStatus.FINISHED, 
clientJobProxy.waitForJobComplete()));
+                .untilAsserted(() -> 
Assert.assertTrue(objectCompletableFuture.isDone()));
+
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 38eb6205f..08560c7db 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -138,6 +138,7 @@ public class SeaTunnelServer implements ManagedService, 
MembershipAwareService,
         executorService.submit(() -> {
             try {
                 jobMaster.init();
+                jobMaster.getPhysicalPlan().initStateFuture();
                 runningJobMasterMap.put(jobId, jobMaster);
             } catch (Throwable e) {
                 LOGGER.severe(String.format("submit job %s error %s ", jobId, 
ExceptionUtils.getMessage(e)));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index dc79fceec..5a1daecf4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -27,7 +27,6 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -64,13 +63,6 @@ public class PhysicalPlan {
      * in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler} 
whenComplete method will be called.
      */
     private final CompletableFuture<JobStatus> jobEndFuture;
-    private final PassiveCompletableFuture<JobStatus> passiveCompletableFuture;
-
-    /**
-     * This future only can completion by the {@link SubPlan } subPlanFuture.
-     * When subPlanFuture completed, this NonCompletableFuture's whenComplete 
method will be called.
-     */
-    private final PassiveCompletableFuture<PipelineState>[] 
waitForCompleteBySubPlan;
 
     private final ExecutorService executorService;
 
@@ -79,8 +71,7 @@ public class PhysicalPlan {
     public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
                         @NonNull ExecutorService executorService,
                         @NonNull JobImmutableInformation 
jobImmutableInformation,
-                        long initializationTimestamp,
-                        @NonNull PassiveCompletableFuture<PipelineState>[] 
waitForCompleteBySubPlan) {
+                        long initializationTimestamp) {
         this.executorService = executorService;
         this.jobImmutableInformation = jobImmutableInformation;
         stateTimestamps = new long[JobStatus.values().length];
@@ -88,15 +79,18 @@ public class PhysicalPlan {
         this.jobStatus.set(JobStatus.CREATED);
         this.stateTimestamps[JobStatus.CREATED.ordinal()] = 
System.currentTimeMillis();
         this.jobEndFuture = new CompletableFuture<>();
-        this.passiveCompletableFuture = new 
PassiveCompletableFuture<>(jobEndFuture);
-        this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
         this.pipelineList = pipelineList;
         if (pipelineList.isEmpty()) {
             throw new UnknownPhysicalPlanException("The physical plan didn't 
have any can execute pipeline");
         }
-        this.jobFullName = String.format("Job %s (%s)", 
jobImmutableInformation.getJobConfig().getName(), 
jobImmutableInformation.getJobId());
-        Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
-            x.whenComplete((v, t) -> {
+        this.jobFullName = String.format("Job %s (%s)", 
jobImmutableInformation.getJobConfig().getName(),
+            jobImmutableInformation.getJobId());
+    }
+
+    public void initStateFuture() {
+        pipelineList.stream().forEach(subPlan -> {
+            PassiveCompletableFuture<PipelineState> future = 
subPlan.initStateFuture();
+            future.whenComplete((v, t) -> {
                 // We need not handle t, Because we will not return t from 
Pipeline
                 if (PipelineState.CANCELED.equals(v)) {
                     canceledPipelineNum.incrementAndGet();
@@ -108,6 +102,7 @@ public class PhysicalPlan {
                     LOGGER.severe(
                         "Pipeline Failed with Unknown PipelineState, Begin to 
cancel other pipelines in this job.");
                     failedPipelineNum.incrementAndGet();
+                    cancelJob();
                 }
 
                 if (finishedPipelineNum.incrementAndGet() == 
this.pipelineList.size()) {
@@ -130,7 +125,8 @@ public class PhysicalPlan {
             if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
                 cancelRunningJob();
             } else {
-                LOGGER.info(String.format("%s in a non cancellable state: %s, 
skip cancel", jobFullName, jobStatus.get()));
+                LOGGER.info(
+                    String.format("%s in a non cancellable state: %s, skip 
cancel", jobFullName, jobStatus.get()));
             }
         }
     }
@@ -212,7 +208,7 @@ public class PhysicalPlan {
     }
 
     public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
-        return this.passiveCompletableFuture;
+        return new PassiveCompletableFuture<>(jobEndFuture);
     }
 
     public JobImmutableInformation getJobImmutableInformation() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index e07d85f8a..18a187913 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -41,7 +41,6 @@ import 
org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutio
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
 import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -124,20 +123,18 @@ public class PhysicalPlanGenerator {
             final int pipelineId = pipeline.getId();
             final List<ExecutionEdge> edges = pipeline.getEdges();
 
-            CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList =
-                new CopyOnWriteArrayList<>();
             List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
 
             List<PhysicalVertex> coordinatorVertexList =
-                getEnumeratorTask(sources, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList);
+                getEnumeratorTask(sources, pipelineId, totalPipelineNum);
             coordinatorVertexList.addAll(
-                getCommitterTask(edges, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList));
+                getCommitterTask(edges, pipelineId, totalPipelineNum));
 
             List<PhysicalVertex> physicalVertexList =
-                getSourceTask(edges, sources, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList);
+                getSourceTask(edges, sources, pipelineId, totalPipelineNum);
 
             physicalVertexList.addAll(
-                getPartitionTask(edges, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList));
+                getPartitionTask(edges, pipelineId, totalPipelineNum));
 
             CompletableFuture<PipelineState> pipelineFuture = new 
CompletableFuture<>();
             waitForCompleteBySubPlanList.add(new 
PassiveCompletableFuture<>(pipelineFuture));
@@ -147,17 +144,13 @@ public class PhysicalPlanGenerator {
                 initializationTimestamp,
                 physicalVertexList,
                 coordinatorVertexList,
-                pipelineFuture,
-                waitForCompleteByPhysicalVertexList.toArray(
-                    new 
PassiveCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
                 jobImmutableInformation);
         });
 
         PhysicalPlan physicalPlan = new 
PhysicalPlan(subPlanStream.collect(Collectors.toList()),
             executorService,
             jobImmutableInformation,
-            initializationTimestamp,
-            waitForCompleteBySubPlanList.toArray(new 
PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
+            initializationTimestamp);
         return Tuple2.tuple2(physicalPlan, null);
     }
 
@@ -169,8 +162,7 @@ public class PhysicalPlanGenerator {
 
     private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
                                                   int pipelineIndex,
-                                                  int totalPipelineNum,
-                                                  
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList) {
+                                                  int totalPipelineNum) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
         List<ExecutionEdge> collect = edges.stream().filter(s -> 
s.getRightVertex().getAction() instanceof SinkAction)
             .collect(Collectors.toList());
@@ -191,8 +183,6 @@ public class PhysicalPlanGenerator {
                             new TaskLocation(taskGroupID, 
mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
                             sinkAggregatedCommitter.get());
                     committerTaskIDMap.put(s, new TaskLocation(taskGroupID, 
t.getTaskID()));
-                    CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new 
PassiveCompletableFuture<>(taskFuture));
 
                     return new PhysicalVertex(idGenerator.getNextId(),
                         atomicInteger.incrementAndGet(),
@@ -200,7 +190,6 @@ public class PhysicalPlanGenerator {
                         collect.size(),
                         new TaskGroupDefaultImpl(taskGroupID, s.getName() + 
"-AggregatedCommitterTask",
                             Lists.newArrayList(t)),
-                        taskFuture,
                         flakeIdGenerator,
                         pipelineIndex,
                         totalPipelineNum,
@@ -216,8 +205,7 @@ public class PhysicalPlanGenerator {
 
     private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
                                                   int pipelineIndex,
-                                                  int totalPipelineNum,
-                                                  
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList) {
+                                                  int totalPipelineNum) {
         return edges.stream().filter(s -> s.getLeftVertex().getAction() 
instanceof PartitionTransformAction)
             .map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
             .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
@@ -231,17 +219,13 @@ public class PhysicalPlanGenerator {
                     SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
                         new TaskLocation(taskGroupID, 
mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
 
-                    CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new 
PassiveCompletableFuture<>(taskFuture));
-
                     t.add(new PhysicalVertex(idGenerator.getNextId(),
                         i,
                         executorService,
                         flow.getAction().getParallelism(),
                         new TaskGroupDefaultImpl(taskGroupID, 
flow.getAction().getName() +
-                                "-PartitionTransformTask",
+                            "-PartitionTransformTask",
                             Lists.newArrayList(seaTunnelTask)),
-                        taskFuture,
                         flakeIdGenerator,
                         pipelineIndex,
                         totalPipelineNum,
@@ -256,8 +240,7 @@ public class PhysicalPlanGenerator {
 
     private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>> 
sources,
                                                    int pipelineIndex,
-                                                   int totalPipelineNum,
-                                                   
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList) {
+                                                   int totalPipelineNum) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
 
         return sources.stream().map(s -> {
@@ -265,16 +248,13 @@ public class PhysicalPlanGenerator {
             SourceSplitEnumeratorTask<?> t = new 
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
                 new TaskLocation(taskGroupID, 
mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
             enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, 
t.getTaskID()));
-            CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
-            waitForCompleteByPhysicalVertexList.add(new 
PassiveCompletableFuture<>(taskFuture));
 
             return new PhysicalVertex(idGenerator.getNextId(),
                 atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
                 new TaskGroupDefaultImpl(taskGroupID, s.getName() + 
"-SplitEnumerator",
-                        Lists.newArrayList(t)),
-                taskFuture,
+                    Lists.newArrayList(t)),
                 flakeIdGenerator,
                 pipelineIndex,
                 totalPipelineNum,
@@ -288,8 +268,7 @@ public class PhysicalPlanGenerator {
     private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> edges,
                                                List<SourceAction<?, ?, ?>> 
sources,
                                                int pipelineIndex,
-                                               int totalPipelineNum,
-                                               
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList) {
+                                               int totalPipelineNum) {
         return sources.stream()
             .map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
             .flatMap(flow -> {
@@ -323,10 +302,6 @@ public class PhysicalPlanGenerator {
                     Set<URL> jars =
                         taskList.stream().flatMap(task -> 
task.getJarsUrl().stream()).collect(Collectors.toSet());
 
-                    CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
-                    waitForCompleteByPhysicalVertexList.add(new 
PassiveCompletableFuture<>(taskFuture));
-
-                    // TODO We need give every task a appropriate name
                     if 
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
                         // contains IntermediateExecutionFlow in task group
                         t.add(new PhysicalVertex(idGenerator.getNextId(),
@@ -334,9 +309,8 @@ public class PhysicalPlanGenerator {
                             executorService,
                             flow.getAction().getParallelism(),
                             new TaskGroupWithIntermediateQueue(taskGroupID, 
flow.getAction().getName() +
-                                    "-SourceTask",
+                                "-SourceTask",
                                 taskList.stream().map(task -> (Task) 
task).collect(Collectors.toList())),
-                            taskFuture,
                             flakeIdGenerator,
                             pipelineIndex,
                             totalPipelineNum,
@@ -350,9 +324,8 @@ public class PhysicalPlanGenerator {
                             executorService,
                             flow.getAction().getParallelism(),
                             new TaskGroupDefaultImpl(taskGroupID, 
flow.getAction().getName() +
-                                    "-SourceTask",
+                                "-SourceTask",
                                 taskList.stream().map(task -> (Task) 
task).collect(Collectors.toList())),
-                            taskFuture,
                             flakeIdGenerator,
                             pipelineIndex,
                             totalPipelineNum,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d63c87859..d1fe0844e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -111,7 +111,6 @@ public class PhysicalVertex {
                           @NonNull ExecutorService executorService,
                           int parallelism,
                           @NonNull TaskGroupDefaultImpl taskGroup,
-                          @NonNull CompletableFuture<TaskExecutionState> 
taskFuture,
                           @NonNull FlakeIdGenerator flakeIdGenerator,
                           int pipelineIndex,
                           int totalPipelineNum,
@@ -145,7 +144,11 @@ public class PhysicalVertex {
                 taskGroup.getTaskGroupName(),
                 subTaskGroupIndex + 1,
                 parallelism);
-        this.taskFuture = taskFuture;
+        this.taskFuture = new CompletableFuture<>();
+    }
+
+    public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+        return new PassiveCompletableFuture<>(this.taskFuture);
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index fbccceef8..dc7974518 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -28,7 +28,6 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -72,26 +71,17 @@ public class SubPlan {
      */
     private final CompletableFuture<PipelineState> pipelineFuture;
 
-    /**
-     * This future only can completion by the {@link PhysicalVertex } 
taskFuture.
-     * When the taskFuture in {@link PhysicalVertex} completed, The 
NonCompletableFuture's whenComplete method will be called
-     */
-    private final PassiveCompletableFuture<TaskExecutionState>[] 
waitForCompleteByPhysicalVertex;
-
     public SubPlan(int pipelineIndex,
                    int totalPipelineNum,
                    long initializationTimestamp,
                    @NonNull List<PhysicalVertex> physicalVertexList,
                    @NonNull List<PhysicalVertex> coordinatorVertexList,
-                   @NonNull CompletableFuture<PipelineState> pipelineFuture,
-                   @NonNull PassiveCompletableFuture<TaskExecutionState>[] 
waitForCompleteByPhysicalVertex,
                    @NonNull JobImmutableInformation jobImmutableInformation) {
         this.pipelineIndex = pipelineIndex;
-        this.pipelineFuture = pipelineFuture;
+        this.pipelineFuture = new CompletableFuture<>();
         this.totalPipelineNum = totalPipelineNum;
         this.physicalVertexList = physicalVertexList;
         this.coordinatorVertexList = coordinatorVertexList;
-        this.waitForCompleteByPhysicalVertex = waitForCompleteByPhysicalVertex;
         stateTimestamps = new long[PipelineState.values().length];
         this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] = 
initializationTimestamp;
         this.pipelineState.set(PipelineState.CREATED);
@@ -103,39 +93,51 @@ public class SubPlan {
             jobImmutableInformation.getJobId(),
             pipelineIndex,
             totalPipelineNum);
+    }
 
-        Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
-            x.whenComplete((v, t) -> {
-                // We need not handle t, Because we will not return t from 
PhysicalVertex
-                if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
-                    canceledTaskNum.incrementAndGet();
-                } else if 
(ExecutionState.FAILED.equals(v.getExecutionState())) {
-                    LOGGER.severe(String.format("Task Failed in %s, Begin to 
cancel other tasks in this pipeline.",
-                        this.getPipelineFullName()));
-                    failedTaskNum.incrementAndGet();
-                    cancelPipeline();
-                } else if 
(!ExecutionState.FINISHED.equals(v.getExecutionState())) {
-                    LOGGER.severe(String.format(
-                        "Task Failed in %s, with Unknown ExecutionState, Begin 
to cancel other tasks in this pipeline.",
-                        this.getPipelineFullName()));
-                    failedTaskNum.incrementAndGet();
-                    cancelPipeline();
-                }
+    public PassiveCompletableFuture<PipelineState> initStateFuture() {
+        physicalVertexList.stream().forEach(m -> {
+            addPhysicalVertexCallBack(m.initStateFuture());
+        });
+
+        coordinatorVertexList.stream().forEach(m -> {
+            addPhysicalVertexCallBack(m.initStateFuture());
+        });
+
+        return new PassiveCompletableFuture<>(pipelineFuture);
+    }
 
-                if (finishedTaskNum.incrementAndGet() == 
(physicalVertexList.size() + coordinatorVertexList.size())) {
-                    if (failedTaskNum.get() > 0) {
-                        turnToEndState(PipelineState.FAILED);
-                        LOGGER.info(String.format("%s end with state FAILED", 
this.pipelineFullName));
-                    } else if (canceledTaskNum.get() > 0) {
-                        turnToEndState(PipelineState.CANCELED);
-                        LOGGER.info(String.format("%s end with state 
CANCELED", this.pipelineFullName));
-                    } else {
-                        turnToEndState(PipelineState.FINISHED);
-                        LOGGER.info(String.format("%s end with state 
FINISHED", this.pipelineFullName));
-                    }
-                    pipelineFuture.complete(pipelineState.get());
+    private void 
addPhysicalVertexCallBack(CompletableFuture<TaskExecutionState> future) {
+        future.whenComplete((v, t) -> {
+            // We need not handle t, Because we will not return t from 
PhysicalVertex
+            if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
+                canceledTaskNum.incrementAndGet();
+            } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+                LOGGER.severe(String.format("Task Failed in %s, Begin to 
cancel other tasks in this pipeline.",
+                    this.getPipelineFullName()));
+                failedTaskNum.incrementAndGet();
+                cancelPipeline();
+            } else if (!ExecutionState.FINISHED.equals(v.getExecutionState())) 
{
+                LOGGER.severe(String.format(
+                    "Task Failed in %s, with Unknown ExecutionState, Begin to 
cancel other tasks in this pipeline.",
+                    this.getPipelineFullName()));
+                failedTaskNum.incrementAndGet();
+                cancelPipeline();
+            }
+
+            if (finishedTaskNum.incrementAndGet() == 
(physicalVertexList.size() + coordinatorVertexList.size())) {
+                if (failedTaskNum.get() > 0) {
+                    turnToEndState(PipelineState.FAILED);
+                    LOGGER.info(String.format("%s end with state FAILED", 
this.pipelineFullName));
+                } else if (canceledTaskNum.get() > 0) {
+                    turnToEndState(PipelineState.CANCELED);
+                    LOGGER.info(String.format("%s end with state CANCELED", 
this.pipelineFullName));
+                } else {
+                    turnToEndState(PipelineState.FINISHED);
+                    LOGGER.info(String.format("%s end with state FINISHED", 
this.pipelineFullName));
                 }
-            });
+                pipelineFuture.complete(pipelineState.get());
+            }
         });
     }
 
@@ -206,7 +208,8 @@ public class SubPlan {
                 cancelRunningPipeline();
             } else {
                 LOGGER.info(
-                    String.format("%s in a non cancellable state: %s, skip 
cancel", pipelineFullName, pipelineState.get()));
+                    String.format("%s in a non cancellable state: %s, skip 
cancel", pipelineFullName,
+                        pipelineState.get()));
             }
         } else {
             pipelineFuture.complete(PipelineState.CANCELED);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1383797f9..fa517e500 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -160,4 +160,8 @@ public class JobMaster implements Runnable {
     public JobStatus getJobStatus() {
         return physicalPlan.getJobStatus();
     }
+
+    public PhysicalPlan getPhysicalPlan() {
+        return physicalPlan;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index a36bc4871..fbced23a2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -119,10 +119,10 @@ public class TaskTest extends AbstractSeaTunnelServerTest 
{
             nodeEngine.getSerializationService().toData(logicalDag), config, 
Collections.emptyList());
 
         PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, 
nodeEngine,
-                jobImmutableInformation,
-                System.currentTimeMillis(),
-                Executors.newCachedThreadPool(),
-                
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
+            jobImmutableInformation,
+            System.currentTimeMillis(),
+            Executors.newCachedThreadPool(),
+            
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
 
         Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
         
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
 1);

Reply via email to