This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new f445f8d2b [Feature][ST-Engine] Add initFuture to PhysicalPlan (#2560)
f445f8d2b is described below
commit f445f8d2bc8ec3fdfe599d84c175b82c53c6344f
Author: Eric <[email protected]>
AuthorDate: Tue Aug 30 09:22:48 2022 +0800
[Feature][ST-Engine] Add initFuture to PhysicalPlan (#2560)
* Add initFuture to PhysicalPlan
* fix review error
* fix CI error
---
.github/workflows/backend.yml | 2 +-
.github/workflows/engine_backend.yml | 2 +-
.../engine/e2e/engine/JobExecutionIT.java | 16 ++--
.../engine/client/SeaTunnelClientTest.java | 11 ++-
.../seatunnel/engine/server/SeaTunnelServer.java | 1 +
.../engine/server/dag/physical/PhysicalPlan.java | 30 ++++----
.../server/dag/physical/PhysicalPlanGenerator.java | 53 ++++---------
.../engine/server/dag/physical/PhysicalVertex.java | 7 +-
.../engine/server/dag/physical/SubPlan.java | 87 +++++++++++-----------
.../seatunnel/engine/server/master/JobMaster.java | 4 +
.../seatunnel/engine/server/dag/TaskTest.java | 8 +-
11 files changed, 106 insertions(+), 115 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 26e9b43eb..c3a460bb9 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -24,7 +24,7 @@ on:
- '**/*.md'
- 'seatunnel-ui/**'
- 'seatunnel-engine/**'
- - 'seatunnel-engine/seatunnel-engine-e2e/**'
+ - 'seatunnel-e2e/seatunnel-engine-e2e/**'
- 'seatunnel-examples/seatunnel-engine-examples/**'
- 'seatunnel-core/seatunnel-seatunnel-starter/**'
- 'generate_client_protocol.sh'
diff --git a/.github/workflows/engine_backend.yml
b/.github/workflows/engine_backend.yml
index a57f6d9ac..1c9ed07ac 100644
--- a/.github/workflows/engine_backend.yml
+++ b/.github/workflows/engine_backend.yml
@@ -21,7 +21,7 @@ on:
pull_request:
paths:
- 'seatunnel-engine/**'
- - 'seatunnel-engine/seatunnel-engine-e2e/**'
+ - 'seatunnel-e2e/seatunnel-engine-e2e/**'
- 'seatunnel-examples/seatunnel-engine-examples/**'
- 'seatunnel-core/seatunnel-seatunnel-starter/**'
- 'generate_client_protocol.sh'
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
index 8b286c482..f67042748 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.engine.e2e.engine;
import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
@@ -84,8 +83,15 @@ public class JobExecutionIT {
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> assertEquals(JobStatus.FINISHED,
clientJobProxy.waitForJobComplete()));
+
+ CompletableFuture<Object> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ JobStatus jobStatus = clientJobProxy.waitForJobComplete();
+ Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+ return null;
+ });
+
+ await().atMost(20000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -118,9 +124,7 @@ public class JobExecutionIT {
clientJobProxy.cancelJob();
await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- Assert.assertTrue(objectCompletableFuture.isDone());
- });
+ .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
} catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index a4745687c..540ff2074 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.engine.client;
import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertEquals;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
@@ -40,6 +39,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -83,8 +83,15 @@ public class SeaTunnelClientTest {
try {
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<Object> objectCompletableFuture =
CompletableFuture.supplyAsync(() -> {
+ JobStatus jobStatus = clientJobProxy.waitForJobComplete();
+ Assert.assertEquals(JobStatus.FINISHED, jobStatus);
+ return null;
+ });
+
await().atMost(10000, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> assertEquals(JobStatus.FINISHED,
clientJobProxy.waitForJobComplete()));
+ .untilAsserted(() ->
Assert.assertTrue(objectCompletableFuture.isDone()));
+
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 38eb6205f..08560c7db 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -138,6 +138,7 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
executorService.submit(() -> {
try {
jobMaster.init();
+ jobMaster.getPhysicalPlan().initStateFuture();
runningJobMasterMap.put(jobId, jobMaster);
} catch (Throwable e) {
LOGGER.severe(String.format("submit job %s error %s ", jobId,
ExceptionUtils.getMessage(e)));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
index dc79fceec..5a1daecf4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
@@ -27,7 +27,6 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -64,13 +63,6 @@ public class PhysicalPlan {
* in {@link org.apache.seatunnel.engine.server.scheduler.JobScheduler}
whenComplete method will be called.
*/
private final CompletableFuture<JobStatus> jobEndFuture;
- private final PassiveCompletableFuture<JobStatus> passiveCompletableFuture;
-
- /**
- * This future only can completion by the {@link SubPlan } subPlanFuture.
- * When subPlanFuture completed, this NonCompletableFuture's whenComplete
method will be called.
- */
- private final PassiveCompletableFuture<PipelineState>[]
waitForCompleteBySubPlan;
private final ExecutorService executorService;
@@ -79,8 +71,7 @@ public class PhysicalPlan {
public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
@NonNull ExecutorService executorService,
@NonNull JobImmutableInformation
jobImmutableInformation,
- long initializationTimestamp,
- @NonNull PassiveCompletableFuture<PipelineState>[]
waitForCompleteBySubPlan) {
+ long initializationTimestamp) {
this.executorService = executorService;
this.jobImmutableInformation = jobImmutableInformation;
stateTimestamps = new long[JobStatus.values().length];
@@ -88,15 +79,18 @@ public class PhysicalPlan {
this.jobStatus.set(JobStatus.CREATED);
this.stateTimestamps[JobStatus.CREATED.ordinal()] =
System.currentTimeMillis();
this.jobEndFuture = new CompletableFuture<>();
- this.passiveCompletableFuture = new
PassiveCompletableFuture<>(jobEndFuture);
- this.waitForCompleteBySubPlan = waitForCompleteBySubPlan;
this.pipelineList = pipelineList;
if (pipelineList.isEmpty()) {
throw new UnknownPhysicalPlanException("The physical plan didn't
have any can execute pipeline");
}
- this.jobFullName = String.format("Job %s (%s)",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId());
- Arrays.stream(this.waitForCompleteBySubPlan).forEach(x -> {
- x.whenComplete((v, t) -> {
+ this.jobFullName = String.format("Job %s (%s)",
jobImmutableInformation.getJobConfig().getName(),
+ jobImmutableInformation.getJobId());
+ }
+
+ public void initStateFuture() {
+ pipelineList.stream().forEach(subPlan -> {
+ PassiveCompletableFuture<PipelineState> future =
subPlan.initStateFuture();
+ future.whenComplete((v, t) -> {
// We need not handle t, Because we will not return t from
Pipeline
if (PipelineState.CANCELED.equals(v)) {
canceledPipelineNum.incrementAndGet();
@@ -108,6 +102,7 @@ public class PhysicalPlan {
LOGGER.severe(
"Pipeline Failed with Unknown PipelineState, Begin to
cancel other pipelines in this job.");
failedPipelineNum.incrementAndGet();
+ cancelJob();
}
if (finishedPipelineNum.incrementAndGet() ==
this.pipelineList.size()) {
@@ -130,7 +125,8 @@ public class PhysicalPlan {
if (updateJobState(JobStatus.RUNNING, JobStatus.CANCELLING)) {
cancelRunningJob();
} else {
- LOGGER.info(String.format("%s in a non cancellable state: %s,
skip cancel", jobFullName, jobStatus.get()));
+ LOGGER.info(
+ String.format("%s in a non cancellable state: %s, skip
cancel", jobFullName, jobStatus.get()));
}
}
}
@@ -212,7 +208,7 @@ public class PhysicalPlan {
}
public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
- return this.passiveCompletableFuture;
+ return new PassiveCompletableFuture<>(jobEndFuture);
}
public JobImmutableInformation getJobImmutableInformation() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index e07d85f8a..18a187913 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -41,7 +41,6 @@ import
org.apache.seatunnel.engine.server.dag.physical.flow.IntermediateExecutio
import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
import
org.apache.seatunnel.engine.server.dag.physical.flow.UnknownFlowException;
import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -124,20 +123,18 @@ public class PhysicalPlanGenerator {
final int pipelineId = pipeline.getId();
final List<ExecutionEdge> edges = pipeline.getEdges();
- CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList =
- new CopyOnWriteArrayList<>();
List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
List<PhysicalVertex> coordinatorVertexList =
- getEnumeratorTask(sources, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList);
+ getEnumeratorTask(sources, pipelineId, totalPipelineNum);
coordinatorVertexList.addAll(
- getCommitterTask(edges, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList));
+ getCommitterTask(edges, pipelineId, totalPipelineNum));
List<PhysicalVertex> physicalVertexList =
- getSourceTask(edges, sources, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList);
+ getSourceTask(edges, sources, pipelineId, totalPipelineNum);
physicalVertexList.addAll(
- getPartitionTask(edges, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList));
+ getPartitionTask(edges, pipelineId, totalPipelineNum));
CompletableFuture<PipelineState> pipelineFuture = new
CompletableFuture<>();
waitForCompleteBySubPlanList.add(new
PassiveCompletableFuture<>(pipelineFuture));
@@ -147,17 +144,13 @@ public class PhysicalPlanGenerator {
initializationTimestamp,
physicalVertexList,
coordinatorVertexList,
- pipelineFuture,
- waitForCompleteByPhysicalVertexList.toArray(
- new
PassiveCompletableFuture[waitForCompleteByPhysicalVertexList.size()]),
jobImmutableInformation);
});
PhysicalPlan physicalPlan = new
PhysicalPlan(subPlanStream.collect(Collectors.toList()),
executorService,
jobImmutableInformation,
- initializationTimestamp,
- waitForCompleteBySubPlanList.toArray(new
PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
+ initializationTimestamp);
return Tuple2.tuple2(physicalPlan, null);
}
@@ -169,8 +162,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getCommitterTask(List<ExecutionEdge> edges,
int pipelineIndex,
- int totalPipelineNum,
-
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList) {
+ int totalPipelineNum) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
List<ExecutionEdge> collect = edges.stream().filter(s ->
s.getRightVertex().getAction() instanceof SinkAction)
.collect(Collectors.toList());
@@ -191,8 +183,6 @@ public class PhysicalPlanGenerator {
new TaskLocation(taskGroupID,
mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s,
sinkAggregatedCommitter.get());
committerTaskIDMap.put(s, new TaskLocation(taskGroupID,
t.getTaskID()));
- CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new
PassiveCompletableFuture<>(taskFuture));
return new PhysicalVertex(idGenerator.getNextId(),
atomicInteger.incrementAndGet(),
@@ -200,7 +190,6 @@ public class PhysicalPlanGenerator {
collect.size(),
new TaskGroupDefaultImpl(taskGroupID, s.getName() +
"-AggregatedCommitterTask",
Lists.newArrayList(t)),
- taskFuture,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
@@ -216,8 +205,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getPartitionTask(List<ExecutionEdge> edges,
int pipelineIndex,
- int totalPipelineNum,
-
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList) {
+ int totalPipelineNum) {
return edges.stream().filter(s -> s.getLeftVertex().getAction()
instanceof PartitionTransformAction)
.map(q -> (PartitionTransformAction) q.getLeftVertex().getAction())
.map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
@@ -231,17 +219,13 @@ public class PhysicalPlanGenerator {
SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
new TaskLocation(taskGroupID,
mixIDPrefixAndIndex(taskIDPrefix, i)), i, flow);
- CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new
PassiveCompletableFuture<>(taskFuture));
-
t.add(new PhysicalVertex(idGenerator.getNextId(),
i,
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(taskGroupID,
flow.getAction().getName() +
- "-PartitionTransformTask",
+ "-PartitionTransformTask",
Lists.newArrayList(seaTunnelTask)),
- taskFuture,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
@@ -256,8 +240,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getEnumeratorTask(List<SourceAction<?, ?, ?>>
sources,
int pipelineIndex,
- int totalPipelineNum,
-
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList) {
+ int totalPipelineNum) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
return sources.stream().map(s -> {
@@ -265,16 +248,13 @@ public class PhysicalPlanGenerator {
SourceSplitEnumeratorTask<?> t = new
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
new TaskLocation(taskGroupID,
mixIDPrefixAndIndex(idGenerator.getNextId(), 0)), s);
enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID,
t.getTaskID()));
- CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new
PassiveCompletableFuture<>(taskFuture));
return new PhysicalVertex(idGenerator.getNextId(),
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
new TaskGroupDefaultImpl(taskGroupID, s.getName() +
"-SplitEnumerator",
- Lists.newArrayList(t)),
- taskFuture,
+ Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
@@ -288,8 +268,7 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getSourceTask(List<ExecutionEdge> edges,
List<SourceAction<?, ?, ?>>
sources,
int pipelineIndex,
- int totalPipelineNum,
-
CopyOnWriteArrayList<PassiveCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList) {
+ int totalPipelineNum) {
return sources.stream()
.map(s -> new PhysicalExecutionFlow(s, getNextWrapper(edges, s)))
.flatMap(flow -> {
@@ -323,10 +302,6 @@ public class PhysicalPlanGenerator {
Set<URL> jars =
taskList.stream().flatMap(task ->
task.getJarsUrl().stream()).collect(Collectors.toSet());
- CompletableFuture<TaskExecutionState> taskFuture = new
CompletableFuture<>();
- waitForCompleteByPhysicalVertexList.add(new
PassiveCompletableFuture<>(taskFuture));
-
- // TODO We need give every task a appropriate name
if
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
// contains IntermediateExecutionFlow in task group
t.add(new PhysicalVertex(idGenerator.getNextId(),
@@ -334,9 +309,8 @@ public class PhysicalPlanGenerator {
executorService,
flow.getAction().getParallelism(),
new TaskGroupWithIntermediateQueue(taskGroupID,
flow.getAction().getName() +
- "-SourceTask",
+ "-SourceTask",
taskList.stream().map(task -> (Task)
task).collect(Collectors.toList())),
- taskFuture,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
@@ -350,9 +324,8 @@ public class PhysicalPlanGenerator {
executorService,
flow.getAction().getParallelism(),
new TaskGroupDefaultImpl(taskGroupID,
flow.getAction().getName() +
- "-SourceTask",
+ "-SourceTask",
taskList.stream().map(task -> (Task)
task).collect(Collectors.toList())),
- taskFuture,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
index d63c87859..d1fe0844e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java
@@ -111,7 +111,6 @@ public class PhysicalVertex {
@NonNull ExecutorService executorService,
int parallelism,
@NonNull TaskGroupDefaultImpl taskGroup,
- @NonNull CompletableFuture<TaskExecutionState>
taskFuture,
@NonNull FlakeIdGenerator flakeIdGenerator,
int pipelineIndex,
int totalPipelineNum,
@@ -145,7 +144,11 @@ public class PhysicalVertex {
taskGroup.getTaskGroupName(),
subTaskGroupIndex + 1,
parallelism);
- this.taskFuture = taskFuture;
+ this.taskFuture = new CompletableFuture<>();
+ }
+
+ public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
+ return new PassiveCompletableFuture<>(this.taskFuture);
}
@SuppressWarnings("checkstyle:MagicNumber")
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index fbccceef8..dc7974518 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -28,7 +28,6 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,26 +71,17 @@ public class SubPlan {
*/
private final CompletableFuture<PipelineState> pipelineFuture;
- /**
- * This future only can completion by the {@link PhysicalVertex }
taskFuture.
- * When the taskFuture in {@link PhysicalVertex} completed, The
NonCompletableFuture's whenComplete method will be called
- */
- private final PassiveCompletableFuture<TaskExecutionState>[]
waitForCompleteByPhysicalVertex;
-
public SubPlan(int pipelineIndex,
int totalPipelineNum,
long initializationTimestamp,
@NonNull List<PhysicalVertex> physicalVertexList,
@NonNull List<PhysicalVertex> coordinatorVertexList,
- @NonNull CompletableFuture<PipelineState> pipelineFuture,
- @NonNull PassiveCompletableFuture<TaskExecutionState>[]
waitForCompleteByPhysicalVertex,
@NonNull JobImmutableInformation jobImmutableInformation) {
this.pipelineIndex = pipelineIndex;
- this.pipelineFuture = pipelineFuture;
+ this.pipelineFuture = new CompletableFuture<>();
this.totalPipelineNum = totalPipelineNum;
this.physicalVertexList = physicalVertexList;
this.coordinatorVertexList = coordinatorVertexList;
- this.waitForCompleteByPhysicalVertex = waitForCompleteByPhysicalVertex;
stateTimestamps = new long[PipelineState.values().length];
this.stateTimestamps[PipelineState.INITIALIZING.ordinal()] =
initializationTimestamp;
this.pipelineState.set(PipelineState.CREATED);
@@ -103,39 +93,51 @@ public class SubPlan {
jobImmutableInformation.getJobId(),
pipelineIndex,
totalPipelineNum);
+ }
- Arrays.stream(this.waitForCompleteByPhysicalVertex).forEach(x -> {
- x.whenComplete((v, t) -> {
- // We need not handle t, Because we will not return t from
PhysicalVertex
- if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
- canceledTaskNum.incrementAndGet();
- } else if
(ExecutionState.FAILED.equals(v.getExecutionState())) {
- LOGGER.severe(String.format("Task Failed in %s, Begin to
cancel other tasks in this pipeline.",
- this.getPipelineFullName()));
- failedTaskNum.incrementAndGet();
- cancelPipeline();
- } else if
(!ExecutionState.FINISHED.equals(v.getExecutionState())) {
- LOGGER.severe(String.format(
- "Task Failed in %s, with Unknown ExecutionState, Begin
to cancel other tasks in this pipeline.",
- this.getPipelineFullName()));
- failedTaskNum.incrementAndGet();
- cancelPipeline();
- }
+ public PassiveCompletableFuture<PipelineState> initStateFuture() {
+ physicalVertexList.stream().forEach(m -> {
+ addPhysicalVertexCallBack(m.initStateFuture());
+ });
+
+ coordinatorVertexList.stream().forEach(m -> {
+ addPhysicalVertexCallBack(m.initStateFuture());
+ });
+
+ return new PassiveCompletableFuture<>(pipelineFuture);
+ }
- if (finishedTaskNum.incrementAndGet() ==
(physicalVertexList.size() + coordinatorVertexList.size())) {
- if (failedTaskNum.get() > 0) {
- turnToEndState(PipelineState.FAILED);
- LOGGER.info(String.format("%s end with state FAILED",
this.pipelineFullName));
- } else if (canceledTaskNum.get() > 0) {
- turnToEndState(PipelineState.CANCELED);
- LOGGER.info(String.format("%s end with state
CANCELED", this.pipelineFullName));
- } else {
- turnToEndState(PipelineState.FINISHED);
- LOGGER.info(String.format("%s end with state
FINISHED", this.pipelineFullName));
- }
- pipelineFuture.complete(pipelineState.get());
+ private void
addPhysicalVertexCallBack(CompletableFuture<TaskExecutionState> future) {
+ future.whenComplete((v, t) -> {
+ // We need not handle t, Because we will not return t from
PhysicalVertex
+ if (ExecutionState.CANCELED.equals(v.getExecutionState())) {
+ canceledTaskNum.incrementAndGet();
+ } else if (ExecutionState.FAILED.equals(v.getExecutionState())) {
+ LOGGER.severe(String.format("Task Failed in %s, Begin to
cancel other tasks in this pipeline.",
+ this.getPipelineFullName()));
+ failedTaskNum.incrementAndGet();
+ cancelPipeline();
+ } else if (!ExecutionState.FINISHED.equals(v.getExecutionState()))
{
+ LOGGER.severe(String.format(
+ "Task Failed in %s, with Unknown ExecutionState, Begin to
cancel other tasks in this pipeline.",
+ this.getPipelineFullName()));
+ failedTaskNum.incrementAndGet();
+ cancelPipeline();
+ }
+
+ if (finishedTaskNum.incrementAndGet() ==
(physicalVertexList.size() + coordinatorVertexList.size())) {
+ if (failedTaskNum.get() > 0) {
+ turnToEndState(PipelineState.FAILED);
+ LOGGER.info(String.format("%s end with state FAILED",
this.pipelineFullName));
+ } else if (canceledTaskNum.get() > 0) {
+ turnToEndState(PipelineState.CANCELED);
+ LOGGER.info(String.format("%s end with state CANCELED",
this.pipelineFullName));
+ } else {
+ turnToEndState(PipelineState.FINISHED);
+ LOGGER.info(String.format("%s end with state FINISHED",
this.pipelineFullName));
}
- });
+ pipelineFuture.complete(pipelineState.get());
+ }
});
}
@@ -206,7 +208,8 @@ public class SubPlan {
cancelRunningPipeline();
} else {
LOGGER.info(
- String.format("%s in a non cancellable state: %s, skip
cancel", pipelineFullName, pipelineState.get()));
+ String.format("%s in a non cancellable state: %s, skip
cancel", pipelineFullName,
+ pipelineState.get()));
}
} else {
pipelineFuture.complete(PipelineState.CANCELED);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1383797f9..fa517e500 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -160,4 +160,8 @@ public class JobMaster implements Runnable {
public JobStatus getJobStatus() {
return physicalPlan.getJobStatus();
}
+
+ public PhysicalPlan getPhysicalPlan() {
+ return physicalPlan;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index a36bc4871..fbced23a2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -119,10 +119,10 @@ public class TaskTest extends AbstractSeaTunnelServerTest
{
nodeEngine.getSerializationService().toData(logicalDag), config,
Collections.emptyList());
PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
- jobImmutableInformation,
- System.currentTimeMillis(),
- Executors.newCachedThreadPool(),
-
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
+ jobImmutableInformation,
+ System.currentTimeMillis(),
+ Executors.newCachedThreadPool(),
+
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
1);