This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 83b147c10 [Feature][Zeta] Fix CheckpointPlanTest (#4187)
83b147c10 is described below
commit 83b147c106d1f0809a9c8e76a6db1e7d6f3c9653
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 22 20:33:27 2023 +0800
[Feature][Zeta] Fix CheckpointPlanTest (#4187)
---
.../engine/server/dag/execution/ExecutionPlanGenerator.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index c344ab08a..cea095d9f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -49,6 +49,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -117,7 +119,7 @@ public class ExecutionPlanGenerator {
}
private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge>
logicalEdges) {
- Set<ExecutionEdge> executionEdges = new HashSet<>();
+ Set<ExecutionEdge> executionEdges = new LinkedHashSet<>();
Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new
HashMap();
@@ -153,7 +155,7 @@ public class ExecutionPlanGenerator {
@SuppressWarnings("MagicNumber")
private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge>
executionEdges) {
- Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+ Map<Long, List<ExecutionVertex>> targetVerticesMap = new
LinkedHashMap<>();
Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
executionEdges.forEach(edge -> {
ExecutionVertex leftVertex = edge.getLeftVertex();
@@ -180,7 +182,7 @@ public class ExecutionPlanGenerator {
.findFirst();
checkArgument(!hasOtherAction.isPresent());
- Set<ExecutionEdge> newExecutionEdges = new HashSet<>();
+ Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
ShuffleStrategy shuffleStrategy = ShuffleMultipleRowStrategy.builder()
.jobId(jobImmutableInformation.getJobId())
.inputPartitions(sourceAction.getParallelism())
@@ -243,7 +245,7 @@ public class ExecutionPlanGenerator {
}
}
- Set<ExecutionEdge> transformChainEdges = new HashSet<>();
+ Set<ExecutionEdge> transformChainEdges = new LinkedHashSet<>();
for (ExecutionEdge executionEdge : executionEdges) {
ExecutionVertex leftVertex = executionEdge.getLeftVertex();
ExecutionVertex rightVertex = executionEdge.getRightVertex();
@@ -332,7 +334,7 @@ public class ExecutionPlanGenerator {
}
private List<Pipeline> generatePipelines(Set<ExecutionEdge>
executionEdges) {
- Set<ExecutionVertex> executionVertices = new HashSet<>();
+ Set<ExecutionVertex> executionVertices = new LinkedHashSet<>();
for (ExecutionEdge edge : executionEdges) {
executionVertices.add(edge.getLeftVertex());
executionVertices.add(edge.getRightVertex());