This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new a7d1296fe [Engine][Task] Add fixed taskId when parallelism changed. 
(#2455)
a7d1296fe is described below

commit a7d1296fe6c73de9cb8b2963bfb4191088b79dbc
Author: Hisoka <[email protected]>
AuthorDate: Thu Aug 18 18:59:28 2022 +0800

    [Engine][Task] Add fixed taskId when parallelism changed. (#2455)
    
    * [Engine][Task] Add fixed taskId when parallelism changed.
    
    * [Engine][Task] fix checkstyle
---
 .../server/dag/physical/PhysicalPlanGenerator.java | 26 +++++++++++++++-------
 .../engine/server/dag/physical/flow/Flow.java      |  2 ++
 .../physical/flow/IntermediateExecutionFlow.java   |  5 +++++
 .../dag/physical/flow/PhysicalExecutionFlow.java   |  5 +++++
 .../server/task/statemachine/EnumeratorState.java  | 13 +++++------
 .../seatunnel/engine/server/dag/TaskTest.java      |  9 +++++---
 6 files changed, 42 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 3e6ca9acf..d2b2e1950 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -185,7 +185,7 @@ public class PhysicalPlanGenerator {
                     long taskGroupID = idGenerator.getNextId();
                     SinkAggregatedCommitterTask<?> t =
                             new 
SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(),
-                                    new TaskLocation(taskGroupID, 
idGenerator.getNextId()), s,
+                                    new TaskLocation(taskGroupID, 
convertToTaskID(idGenerator.getNextId(), 1)), s,
                                     sinkAggregatedCommitter.get());
                     committerTaskIDMap.put(s, new TaskLocation(taskGroupID, 
t.getTaskID()));
                     CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
@@ -220,11 +220,12 @@ public class PhysicalPlanGenerator {
             .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
             .flatMap(flow -> {
                 List<PhysicalVertex> t = new ArrayList<>();
+                long taskIDPrefix = idGenerator.getNextId();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     long taskGroupID = idGenerator.getNextId();
                     setFlowConfig(flow, i);
                     SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                            new TaskLocation(taskGroupID, 
idGenerator.getNextId()), i, flow);
+                            new TaskLocation(taskGroupID, 
convertToTaskID(taskIDPrefix, i)), i, flow);
 
                     CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
                     waitForCompleteByPhysicalVertexList.add(new 
NonCompletableFuture<>(taskFuture));
@@ -257,7 +258,7 @@ public class PhysicalPlanGenerator {
         return sources.stream().map(s -> {
             long taskGroupID = idGenerator.getNextId();
             SourceSplitEnumeratorTask<?> t = new 
SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(),
-                    new TaskLocation(taskGroupID, idGenerator.getNextId()), s);
+                    new TaskLocation(taskGroupID, 
convertToTaskID(idGenerator.getNextId(), 1)), s);
             enumeratorTaskIDMap.put(s, new TaskLocation(taskGroupID, 
t.getTaskID()));
             CompletableFuture<TaskExecutionState> taskFuture = new 
CompletableFuture<>();
             waitForCompleteByPhysicalVertexList.add(new 
NonCompletableFuture<>(taskFuture));
@@ -292,18 +293,22 @@ public class PhysicalPlanGenerator {
                 if (sourceWithSink(flow)) {
                     flows.addAll(splitSinkFromFlow(flow));
                 }
+                Map<Long, Long> flowTaskIDPrefixMap = new HashMap<>();
                 for (int i = 0; i < flow.getAction().getParallelism(); i++) {
                     int finalParallelismIndex = i;
                     List<SeaTunnelTask> taskList =
                             flows.stream().map(f -> {
                                 setFlowConfig(f, finalParallelismIndex);
+                                long taskIDPrefix = 
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> 
idGenerator.getNextId());
                                 if (f instanceof PhysicalExecutionFlow) {
                                     return new 
SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
-                                            new TaskLocation(taskGroupID, 
idGenerator.getNextId()),
+                                            new TaskLocation(taskGroupID,
+                                                    
convertToTaskID(taskIDPrefix, finalParallelismIndex)),
                                             finalParallelismIndex, f);
                                 } else {
                                     return new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(),
-                                            new TaskLocation(taskGroupID, 
idGenerator.getNextId()),
+                                            new TaskLocation(taskGroupID,
+                                                    
convertToTaskID(taskIDPrefix, finalParallelismIndex)),
                                             finalParallelismIndex, f);
                                 }
                             }).collect(Collectors.toList());
@@ -428,12 +433,17 @@ public class PhysicalPlanGenerator {
                         .collect(Collectors.toList()).contains(true);
     }
 
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private long convertToTaskID(long taskTypeID, int index) {
+        return taskTypeID * 10000 + index;
+    }
+
     private List<Flow> getNextWrapper(List<ExecutionEdge> edges, Action start) 
{
         List<Action> actions = edges.stream().filter(e -> 
e.getLeftVertex().getAction().equals(start))
-            .map(e -> 
e.getRightVertex().getAction()).collect(Collectors.toList());
+                .map(e -> 
e.getRightVertex().getAction()).collect(Collectors.toList());
         List<Flow> wrappers = actions.stream()
-            .filter(a -> a instanceof PartitionTransformAction || a instanceof 
SinkAction)
-            .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
+                .filter(a -> a instanceof PartitionTransformAction || a 
instanceof SinkAction)
+                .map(PhysicalExecutionFlow::new).collect(Collectors.toList());
         wrappers.addAll(actions.stream()
                 .filter(a -> !(a instanceof PartitionTransformAction || a 
instanceof SinkAction))
                 .map(a -> new PhysicalExecutionFlow<>(a, getNextWrapper(edges, 
a))).collect(Collectors.toList()));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
index 0aa7149eb..bf5529ec3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/Flow.java
@@ -31,4 +31,6 @@ public abstract class Flow implements Serializable {
     public List<Flow> getNext() {
         return next;
     }
+
+    public abstract long getFlowID();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
index 59ccccd80..b3946d898 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/IntermediateExecutionFlow.java
@@ -50,4 +50,9 @@ public class IntermediateExecutionFlow<F extends FlowConfig> 
extends Flow {
     public IntermediateQueue getQueue() {
         return queue;
     }
+
+    @Override
+    public long getFlowID() {
+        return queue.getId();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
index db81256df..2a7accfec 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/flow/PhysicalExecutionFlow.java
@@ -49,4 +49,9 @@ public class PhysicalExecutionFlow<T extends Action, F 
extends FlowConfig> exten
     public T getAction() {
         return action;
     }
+
+    @Override
+    public long getFlowID() {
+        return action.getId();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
index 483c6c62d..72a37b6a2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/statemachine/EnumeratorState.java
@@ -23,18 +23,17 @@ import java.io.Serializable;
  * The state of {@link  
org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask},
  * The task usually start in the state {@code CREATED} and switch states 
according to this diagram:
  * <p>
- * CREATED -> INIT  -> READY -> READER_REGISTER_COMPLETE  -> ASSIGN -> 
WAITING_FEEDBACK -> READER_CLOSED -> CLOSED
- * |        |          |                         |              |
- * |        |          |                         |              |
- * |        |          |                         |              |
- * |        |          |                         |              |
- * +--------+----------+-------------------------+--------------+--> 
CANCELLING ----> CANCELED
+ * CREATED -> INIT  -> READER_REGISTER_COMPLETE  -> ASSIGN -> WAITING_FEEDBACK 
-> READER_CLOSED -> CLOSED
+ * |        |          |                            |              |
+ * |        |          |                            |              |
+ * |        |          |                            |              |
+ * |        |          |                            |              |
+ * +--------+----------+----------------------------+--------------+--> 
CANCELLING ----> CANCELED
  *                                                  ... -> FAILED
  */
 public enum EnumeratorState implements Serializable {
     CREATED,
     INIT,
-    READY,
     READER_REGISTER_COMPLETE,
     ASSIGN,
     WAITING_FEEDBACK,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 26ab0c51c..17c004ff2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -82,19 +82,22 @@ public class TaskTest {
 
         Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource,
                 Collections.singletonList(new URL("file:///fake.jar")));
-        LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 1);
+        fake.setParallelism(3);
+        LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 3);
 
         FakeSource fakeSource2 = new FakeSource();
         fakeSource2.setSeaTunnelContext(SeaTunnelContext.getContext());
         Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource2,
                 Collections.singletonList(new URL("file:///fake.jar")));
-        LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 1);
+        fake2.setParallelism(3);
+        LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 3);
 
         ConsoleSink consoleSink = new ConsoleSink();
         consoleSink.setSeaTunnelContext(SeaTunnelContext.getContext());
         Action console = new SinkAction<>(idGenerator.getNextId(), "console", 
consoleSink,
                 Collections.singletonList(new URL("file:///console.jar")));
-        LogicalVertex consoleVertex = new LogicalVertex(console.getId(), 
console, 1);
+        console.setParallelism(3);
+        LogicalVertex consoleVertex = new LogicalVertex(console.getId(), 
console, 3);
 
         LogicalEdge edge = new LogicalEdge(fakeVertex, consoleVertex);
 

Reply via email to