This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 7bc2ec540 [Engine][PhysicalPlan] Recreate action with parallelism
(#2638)
7bc2ec540 is described below
commit 7bc2ec5405c6b534c4d31e7febffb19a2bf743e3
Author: Hisoka <[email protected]>
AuthorDate: Mon Sep 5 00:45:00 2022 +0800
[Engine][PhysicalPlan] Recreate action with parallelism (#2638)
* [Engine][PhysicalPlan] Recreate action with parallelism
---
.../engine/server/dag/execution/ExecutionPlanGenerator.java | 6 +++---
.../seatunnel/engine/server/dag/execution/PipelineGenerator.java | 2 +-
.../test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index b95b3f631..b32f53b08 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -157,7 +157,7 @@ public class ExecutionPlanGenerator {
final long newId = idGenerator.getNextId();
Action newAction;
if (chainedVertices.size() < 1) {
- newAction = recreateAction(logicalVertex.getAction(), newId);
+ newAction = recreateAction(logicalVertex.getAction(), newId,
logicalVertex.getParallelism());
} else {
List<SeaTunnelTransform> transforms = new
ArrayList<>(chainedVertices.size());
List<String> names = new ArrayList<>(chainedVertices.size());
@@ -182,7 +182,7 @@ public class ExecutionPlanGenerator {
logicalToExecutionMap.put(logicalVertex.getVertexId(),
executionVertex.getVertexId());
}
- public static Action recreateAction(Action action, Long id) {
+ public static Action recreateAction(Action action, Long id, int
parallelism) {
Action newAction;
if (action instanceof PartitionTransformAction) {
newAction = new PartitionTransformAction(id,
@@ -209,7 +209,7 @@ public class ExecutionPlanGenerator {
} else {
throw new UnknownActionException(action);
}
- newAction.setParallelism(action.getParallelism());
+ newAction.setParallelism(parallelism);
return newAction;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
index 8e1f426cd..bc5e271f2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/PipelineGenerator.java
@@ -158,7 +158,7 @@ public class PipelineGenerator {
private ExecutionVertex recreateVertex(ExecutionVertex vertex, int
parallelism) {
long id = idGenerator.getNextId();
Action action = vertex.getAction();
- return new ExecutionVertex(id,
ExecutionPlanGenerator.recreateAction(action, id), action instanceof
PartitionTransformAction ? vertex.getParallelism() : parallelism);
+ return new ExecutionVertex(id,
ExecutionPlanGenerator.recreateAction(action, id, parallelism), action
instanceof PartitionTransformAction ? vertex.getParallelism() : parallelism);
}
private void fillVerticesMap(List<ExecutionEdge> edges) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index fbced23a2..6baf2d98b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -126,6 +126,6 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
1);
-
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(),
1);
+
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(),
2);
}
}