This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 8e77d01c7 [engine] Generate fixed ID in pipeline phase (#2447)
8e77d01c7 is described below
commit 8e77d01c75195e81db9300b627d161b219e2dbde
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Aug 18 15:07:38 2022 +0800
[engine] Generate fixed ID in pipeline phase (#2447)
---
.../engine/server/dag/execution/Pipeline.java | 11 +++++++++-
.../server/dag/execution/PipelineGenerator.java | 6 ++++--
.../server/dag/physical/PhysicalPlanGenerator.java | 24 ++++++++++++----------
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index c3bf85cb2..36d61ee5d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -21,15 +21,24 @@ import java.util.List;
import java.util.Map;
public class Pipeline {
+
+ /** The ID of the pipeline. */
+ private final Integer id;
+
private final List<ExecutionEdge> edges;
private final Map<Long, ExecutionVertex> vertexes;
- Pipeline(List<ExecutionEdge> edges, Map<Long, ExecutionVertex> vertexes) {
+ Pipeline(Integer id, List<ExecutionEdge> edges, Map<Long, ExecutionVertex>
vertexes) {
+ this.id = id;
this.edges = edges;
this.vertexes = vertexes;
}
+ public Integer getId() {
+ return id;
+ }
+
public List<ExecutionEdge> getEdges() {
return edges;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 313c06194..6f12352c4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.dag.execution;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,7 +34,7 @@ public class PipelineGenerator {
// just convert execution plan to pipeline at now. We should split it
to multi pipeline with
// cache in the future
-
+ IdGenerator idGenerator = new IdGenerator();
return edgesList.stream().map(e -> {
Map<Long, ExecutionVertex> vertexes = new HashMap<>();
List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
@@ -46,7 +48,7 @@ public class PipelineGenerator {
ExecutionVertex destination =
vertexes.get(edge.getRightVertexId());
return new ExecutionEdge(source, destination);
}).collect(Collectors.toList());
- return new Pipeline(pipelineEdges, vertexes);
+ return new Pipeline((int) idGenerator.getNextId(), pipelineEdges,
vertexes);
}).collect(Collectors.toList());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index fb1069e2a..3e6ca9acf 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -74,7 +74,7 @@ import java.util.stream.Stream;
public class PhysicalPlanGenerator {
- private final List<List<ExecutionEdge>> edgesList;
+ private final List<Pipeline> pipelines;
private final IdGenerator idGenerator = new IdGenerator();
@@ -103,7 +103,7 @@ public class PhysicalPlanGenerator {
long initializationTimestamp,
@NonNull ExecutorService executorService,
@NonNull FlakeIdGenerator flakeIdGenerator) {
- edgesList =
executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
+ this.pipelines = executionPlan.getPipelines();
this.nodeEngine = nodeEngine;
this.jobImmutableInformation = jobImmutableInformation;
this.initializationTimestamp = initializationTimestamp;
@@ -114,32 +114,34 @@ public class PhysicalPlanGenerator {
public PhysicalPlan generate() {
// TODO Determine which tasks do not need to be restored according to
state
- AtomicInteger index = new AtomicInteger(-1);
CopyOnWriteArrayList<NonCompletableFuture<PipelineState>>
waitForCompleteBySubPlanList =
new CopyOnWriteArrayList<>();
- Stream<SubPlan> subPlanStream = edgesList.stream().map(edges -> {
- int currIndex = index.incrementAndGet();
+ final int totalPipelineNum = pipelines.size();
+ Stream<SubPlan> subPlanStream = pipelines.stream().map(pipeline -> {
+ final int pipelineId = pipeline.getId();
+ final List<ExecutionEdge> edges = pipeline.getEdges();
+
CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>>
waitForCompleteByPhysicalVertexList =
new CopyOnWriteArrayList<>();
List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
List<PhysicalVertex> coordinatorVertexList =
- getEnumeratorTask(sources, currIndex, edgesList.size(),
waitForCompleteByPhysicalVertexList);
+ getEnumeratorTask(sources, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList);
coordinatorVertexList.addAll(
- getCommitterTask(edges, currIndex, edgesList.size(),
waitForCompleteByPhysicalVertexList));
+ getCommitterTask(edges, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList));
List<PhysicalVertex> physicalVertexList =
- getSourceTask(edges, sources, currIndex, edgesList.size(),
waitForCompleteByPhysicalVertexList);
+ getSourceTask(edges, sources, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList);
physicalVertexList.addAll(
- getPartitionTask(edges, currIndex, edgesList.size(),
waitForCompleteByPhysicalVertexList));
+ getPartitionTask(edges, pipelineId, totalPipelineNum,
waitForCompleteByPhysicalVertexList));
CompletableFuture<PipelineState> pipelineFuture = new
CompletableFuture<>();
waitForCompleteBySubPlanList.add(new
NonCompletableFuture<>(pipelineFuture));
- return new SubPlan(currIndex,
- edgesList.size(),
+ return new SubPlan(pipelineId,
+ totalPipelineNum,
initializationTimestamp,
physicalVertexList,
coordinatorVertexList,