This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 8e77d01c7 [engine] Generate fixed ID in pipeline phase (#2447)
8e77d01c7 is described below

commit 8e77d01c75195e81db9300b627d161b219e2dbde
Author: Zongwen Li <[email protected]>
AuthorDate: Thu Aug 18 15:07:38 2022 +0800

    [engine] Generate fixed ID in pipeline phase (#2447)
---
 .../engine/server/dag/execution/Pipeline.java      | 11 +++++++++-
 .../server/dag/execution/PipelineGenerator.java    |  6 ++++--
 .../server/dag/physical/PhysicalPlanGenerator.java | 24 ++++++++++++----------
 3 files changed, 27 insertions(+), 14 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index c3bf85cb2..36d61ee5d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -21,15 +21,24 @@ import java.util.List;
 import java.util.Map;
 
 public class Pipeline {
+
+    /** The ID of the pipeline. */
+    private final Integer id;
+
     private final List<ExecutionEdge> edges;
 
     private final Map<Long, ExecutionVertex> vertexes;
 
-    Pipeline(List<ExecutionEdge> edges, Map<Long, ExecutionVertex> vertexes) {
+    Pipeline(Integer id, List<ExecutionEdge> edges, Map<Long, ExecutionVertex> 
vertexes) {
+        this.id = id;
         this.edges = edges;
         this.vertexes = vertexes;
     }
 
+    public Integer getId() {
+        return id;
+    }
+
     public List<ExecutionEdge> getEdges() {
         return edges;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 313c06194..6f12352c4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.dag.execution;
 
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -32,7 +34,7 @@ public class PipelineGenerator {
 
         // just convert execution plan to pipeline at now. We should split it 
to multi pipeline with
         // cache in the future
-
+        IdGenerator idGenerator = new IdGenerator();
         return edgesList.stream().map(e -> {
             Map<Long, ExecutionVertex> vertexes = new HashMap<>();
             List<ExecutionEdge> pipelineEdges = e.stream().map(edge -> {
@@ -46,7 +48,7 @@ public class PipelineGenerator {
                 ExecutionVertex destination = 
vertexes.get(edge.getRightVertexId());
                 return new ExecutionEdge(source, destination);
             }).collect(Collectors.toList());
-            return new Pipeline(pipelineEdges, vertexes);
+            return new Pipeline((int) idGenerator.getNextId(), pipelineEdges, 
vertexes);
         }).collect(Collectors.toList());
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index fb1069e2a..3e6ca9acf 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -74,7 +74,7 @@ import java.util.stream.Stream;
 
 public class PhysicalPlanGenerator {
 
-    private final List<List<ExecutionEdge>> edgesList;
+    private final List<Pipeline> pipelines;
 
     private final IdGenerator idGenerator = new IdGenerator();
 
@@ -103,7 +103,7 @@ public class PhysicalPlanGenerator {
                                  long initializationTimestamp,
                                  @NonNull ExecutorService executorService,
                                  @NonNull FlakeIdGenerator flakeIdGenerator) {
-        edgesList = 
executionPlan.getPipelines().stream().map(Pipeline::getEdges).collect(Collectors.toList());
+        this.pipelines = executionPlan.getPipelines();
         this.nodeEngine = nodeEngine;
         this.jobImmutableInformation = jobImmutableInformation;
         this.initializationTimestamp = initializationTimestamp;
@@ -114,32 +114,34 @@ public class PhysicalPlanGenerator {
     public PhysicalPlan generate() {
 
         // TODO Determine which tasks do not need to be restored according to 
state
-        AtomicInteger index = new AtomicInteger(-1);
         CopyOnWriteArrayList<NonCompletableFuture<PipelineState>> 
waitForCompleteBySubPlanList =
             new CopyOnWriteArrayList<>();
 
-        Stream<SubPlan> subPlanStream = edgesList.stream().map(edges -> {
-            int currIndex = index.incrementAndGet();
+        final int totalPipelineNum = pipelines.size();
+        Stream<SubPlan> subPlanStream = pipelines.stream().map(pipeline -> {
+            final int pipelineId = pipeline.getId();
+            final List<ExecutionEdge> edges = pipeline.getEdges();
+
             CopyOnWriteArrayList<NonCompletableFuture<TaskExecutionState>> 
waitForCompleteByPhysicalVertexList =
                 new CopyOnWriteArrayList<>();
             List<SourceAction<?, ?, ?>> sources = findSourceAction(edges);
 
             List<PhysicalVertex> coordinatorVertexList =
-                getEnumeratorTask(sources, currIndex, edgesList.size(), 
waitForCompleteByPhysicalVertexList);
+                getEnumeratorTask(sources, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList);
             coordinatorVertexList.addAll(
-                    getCommitterTask(edges, currIndex, edgesList.size(), 
waitForCompleteByPhysicalVertexList));
+                    getCommitterTask(edges, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList));
 
             List<PhysicalVertex> physicalVertexList =
-                getSourceTask(edges, sources, currIndex, edgesList.size(), 
waitForCompleteByPhysicalVertexList);
+                getSourceTask(edges, sources, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList);
 
             physicalVertexList.addAll(
-                getPartitionTask(edges, currIndex, edgesList.size(), 
waitForCompleteByPhysicalVertexList));
+                getPartitionTask(edges, pipelineId, totalPipelineNum, 
waitForCompleteByPhysicalVertexList));
 
             CompletableFuture<PipelineState> pipelineFuture = new 
CompletableFuture<>();
             waitForCompleteBySubPlanList.add(new 
NonCompletableFuture<>(pipelineFuture));
 
-            return new SubPlan(currIndex,
-                    edgesList.size(),
+            return new SubPlan(pipelineId,
+                    totalPipelineNum,
                     initializationTimestamp,
                     physicalVertexList,
                     coordinatorVertexList,

Reply via email to