This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 83b147c10 [Feature][Zeta] Fix CheckpointPlanTest (#4187)
83b147c10 is described below

commit 83b147c106d1f0809a9c8e76a6db1e7d6f3c9653
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 22 20:33:27 2023 +0800

    [Feature][Zeta] Fix CheckpointPlanTest (#4187)
---
 .../engine/server/dag/execution/ExecutionPlanGenerator.java  | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index c344ab08a..cea095d9f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -49,6 +49,8 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -117,7 +119,7 @@ public class ExecutionPlanGenerator {
     }
 
     private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> 
logicalEdges) {
-        Set<ExecutionEdge> executionEdges = new HashSet<>();
+        Set<ExecutionEdge> executionEdges = new LinkedHashSet<>();
 
         Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new 
HashMap();
 
@@ -153,7 +155,7 @@ public class ExecutionPlanGenerator {
 
     @SuppressWarnings("MagicNumber")
     private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> 
executionEdges) {
-        Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+        Map<Long, List<ExecutionVertex>> targetVerticesMap = new 
LinkedHashMap<>();
         Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
         executionEdges.forEach(edge -> {
             ExecutionVertex leftVertex = edge.getLeftVertex();
@@ -180,7 +182,7 @@ public class ExecutionPlanGenerator {
             .findFirst();
         checkArgument(!hasOtherAction.isPresent());
 
-        Set<ExecutionEdge> newExecutionEdges = new HashSet<>();
+        Set<ExecutionEdge> newExecutionEdges = new LinkedHashSet<>();
         ShuffleStrategy shuffleStrategy = ShuffleMultipleRowStrategy.builder()
             .jobId(jobImmutableInformation.getJobId())
             .inputPartitions(sourceAction.getParallelism())
@@ -243,7 +245,7 @@ public class ExecutionPlanGenerator {
             }
         }
 
-        Set<ExecutionEdge> transformChainEdges = new HashSet<>();
+        Set<ExecutionEdge> transformChainEdges = new LinkedHashSet<>();
         for (ExecutionEdge executionEdge : executionEdges) {
             ExecutionVertex leftVertex = executionEdge.getLeftVertex();
             ExecutionVertex rightVertex = executionEdge.getRightVertex();
@@ -332,7 +334,7 @@ public class ExecutionPlanGenerator {
     }
 
     private List<Pipeline> generatePipelines(Set<ExecutionEdge> 
executionEdges) {
-        Set<ExecutionVertex> executionVertices = new HashSet<>();
+        Set<ExecutionVertex> executionVertices = new LinkedHashSet<>();
         for (ExecutionEdge edge : executionEdges) {
             executionVertices.add(edge.getLeftVertex());
             executionVertices.add(edge.getRightVertex());

Reply via email to