This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 8348f1a10 [Feature][Zeta] Support shuffle multiple rows by tableId 
(#4147)
8348f1a10 is described below

commit 8348f1a10895460cba2d65489be4905c5099289b
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 22 16:59:03 2023 +0800

    [Feature][Zeta] Support shuffle multiple rows by tableId (#4147)
---
 pom.xml                                            |   2 +-
 .../seatunnel/api/table/type/MultipleRowType.java  |  15 +-
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java |  13 +-
 .../source/MySqlIncrementalSourceFactory.java      |   2 +-
 .../seatunnel/console/sink/ConsoleSinkWriter.java  |   4 +-
 .../engine/common/config/EngineConfig.java         |   3 +-
 .../engine/core/dag/actions/AbstractAction.java    |  31 +-
 .../seatunnel/engine/core/dag/actions/Action.java  |   2 +
 .../seatunnel/engine/core/dag/actions/Config.java} |  28 +-
 .../engine/core/dag/actions/ShuffleAction.java     |  29 +-
 .../actions/{Action.java => ShuffleConfig.java}    |  50 +--
 .../dag/actions/ShuffleMultipleRowStrategy.java    |  83 +++++
 .../core/dag/actions/ShufflePartitionStrategy.java |  88 +++++
 .../engine/core/dag/actions/ShuffleStrategy.java   |  71 ++++
 .../engine/core/dag/actions/SinkAction.java        |  22 +-
 .../dag/actions/{Action.java => SinkConfig.java}   |  34 +-
 .../seatunnel/engine/server/dag/DAGUtils.java      |   9 +-
 .../dag/execution/ExecutionPlanGenerator.java      | 383 +++++++++++++--------
 .../server/dag/physical/PhysicalPlanGenerator.java | 136 +++++---
 .../engine/server/dag/physical/PlanUtils.java      |   9 +-
 .../server/execution/TaskExecutionContext.java     |   4 +
 .../seatunnel/engine/server/master/JobMaster.java  |  44 ++-
 .../engine/server/task/SeaTunnelTask.java          |  13 +-
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java | 124 ++-----
 .../task/flow/ShuffleSourceFlowLifeCycle.java      |  13 +-
 .../apache/seatunnel/engine/server/TestUtils.java  |  10 +
 .../server/checkpoint/CheckpointPlanTest.java      |  13 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |  22 +-
 28 files changed, 830 insertions(+), 427 deletions(-)

diff --git a/pom.xml b/pom.xml
index 28a61e1e1..e2b1b5b39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,7 +142,7 @@
         <javax.servlet.jap.version>2.1</javax.servlet.jap.version>
         <hadoop.binary.version>2.7</hadoop.binary.version>
         <jackson.version>2.12.6</jackson.version>
-        <lombok.version>1.18.0</lombok.version>
+        <lombok.version>1.18.24</lombok.version>
         <commons-compress.version>1.20</commons-compress.version>
         <skip.pmd.check>false</skip.pmd.check>
         <maven.deploy.skip>false</maven.deploy.skip>
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
index 5d7fe330f..8602e4205 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
@@ -17,21 +17,28 @@
 
 package org.apache.seatunnel.api.table.type;
 
-import lombok.RequiredArgsConstructor;
+import lombok.Getter;
 
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
-@RequiredArgsConstructor
 public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow>, 
Iterable<Map.Entry<String, SeaTunnelRowType>> {
     private final Map<String, SeaTunnelRowType> rowTypeMap;
+    @Getter
+    private String[] tableIds;
 
     public MultipleRowType(String[] tableIds, SeaTunnelRowType[] rowTypes) {
-        Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+        Map<String, SeaTunnelRowType> rowTypeMap = new LinkedHashMap<>();
         for (int i = 0; i < tableIds.length; i++) {
             rowTypeMap.put(tableIds[i], rowTypes[i]);
         }
+        this.tableIds = tableIds;
+        this.rowTypeMap = rowTypeMap;
+    }
+
+    public MultipleRowType(Map<String, SeaTunnelRowType> rowTypeMap) {
+        this.tableIds = rowTypeMap.keySet().toArray(new String[0]);
         this.rowTypeMap = rowTypeMap;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index c94d59064..2c0087213 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.cdc.debezium.row;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
 
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.MultipleRowType;
@@ -114,34 +113,32 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         Schema valueSchema = record.valueSchema();
 
         Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
-        String database = sourceStruct.getString(DATABASE_NAME_KEY);
         String tableName = 
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
-        String tableId = database + ":" + tableName;
-        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableId, singleRowConverter);
+        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableName, singleRowConverter);
 
         if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
             SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             insert.setRowKind(RowKind.INSERT);
-            insert.setTableId(tableId);
+            insert.setTableId(tableName);
             validator.validate(insert, RowKind.INSERT);
             collector.collect(insert);
         } else if (operation == Envelope.Operation.DELETE) {
             SeaTunnelRow delete = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
-            delete.setTableId(tableId);
+            delete.setTableId(tableName);
             collector.collect(delete);
         } else {
             SeaTunnelRow before = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(before, RowKind.UPDATE_BEFORE);
             before.setRowKind(RowKind.UPDATE_BEFORE);
-            before.setTableId(tableId);
+            before.setTableId(tableName);
             collector.collect(before);
 
             SeaTunnelRow after = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
-            after.setTableId(tableId);
+            after.setTableId(tableName);
             collector.collect(after);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index f97894522..b6ad57e3d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -84,7 +84,7 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory, Suppor
             } else {
                 Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
                 for (CatalogTable catalogTable : context.getCatalogTables()) {
-                    String tableId = 
catalogTable.getTableId().getDatabaseName() + ":" + 
catalogTable.getTableId().getTableName();
+                    String tableId = catalogTable.getTableId().getTableName();
                     rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
                 }
                 dataType = new MultipleRowType(rowTypeMap);
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 84bad94b9..d3ade7480 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    public static final AtomicLong CNT = new AtomicLong(0);
+    public final AtomicLong rowCounter = new AtomicLong(0);
     public SinkWriter.Context context;
 
     public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, 
SinkWriter.Context context) {
@@ -54,7 +54,7 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         for (int i = 0; i < fieldTypes.length; i++) {
             arr[i] = fieldToString(fieldTypes[i], fields[i]);
         }
-        log.info("subtaskIndex={}  rowIndex={}:  SeaTunnelRow#tableId={} 
SeaTunnelRow#kind={} : {}", context.getIndexOfSubtask(), CNT.incrementAndGet(), 
element.getTableId(), element.getRowKind(), StringUtils.join(arr, ", "));
+        log.info("subtaskIndex={}  rowIndex={}:  SeaTunnelRow#tableId={} 
SeaTunnelRow#kind={} : {}", context.getIndexOfSubtask(), 
rowCounter.incrementAndGet(), element.getTableId(), element.getRowKind(), 
StringUtils.join(arr, ", "));
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index a51b8a128..e0898aafd 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -64,8 +64,9 @@ public class EngineConfig {
         this.jobMetricsBackupInterval = jobMetricsBackupInterval;
     }
 
-    public void setQueueType(QueueType queueType) {
+    public EngineConfig setQueueType(QueueType queueType) {
         checkNotNull(queueType);
         this.queueType = queueType;
+        return this;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 33c94cea5..8b2cf3670 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -34,17 +34,31 @@ public abstract class AbstractAction implements Action {
 
     private final Set<URL> jarUrls;
 
-    protected AbstractAction(long id, @NonNull String name, @NonNull 
List<Action> upstreams, @NonNull Set<URL> jarUrls) {
-        this.id = id;
-        this.name = name;
-        this.upstreams = upstreams;
-        this.jarUrls = jarUrls;
+    private final Config config;
+
+    protected AbstractAction(long id,
+                             @NonNull String name,
+                             @NonNull Set<URL> jarUrls) {
+        this(id, name, new ArrayList<>(), jarUrls);
     }
 
-    protected AbstractAction(long id, @NonNull String name, @NonNull Set<URL> 
jarUrls) {
+    protected AbstractAction(long id,
+                             @NonNull String name,
+                             @NonNull List<Action> upstreams,
+                             @NonNull Set<URL> jarUrls) {
+        this(id, name, upstreams, jarUrls, null);
+    }
+
+    protected AbstractAction(long id,
+                             @NonNull String name,
+                             @NonNull List<Action> upstreams,
+                             @NonNull Set<URL> jarUrls,
+                             Config config) {
         this.id = id;
         this.name = name;
+        this.upstreams = upstreams;
         this.jarUrls = jarUrls;
+        this.config = config;
     }
 
     @NonNull
@@ -88,4 +102,9 @@ public abstract class AbstractAction implements Action {
     public Set<URL> getJarUrls() {
         return jarUrls;
     }
+
+    @Override
+    public Config getConfig() {
+        return config;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index b06fcb7a7..bf8ebd1df 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -42,4 +42,6 @@ public interface Action extends Serializable {
     long getId();
 
     Set<URL> getJarUrls();
+
+    Config getConfig();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
similarity index 54%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
rename to 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
index 74f81e7f5..d053a6e7f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
@@ -15,31 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.dag.physical.config;
+package org.apache.seatunnel.engine.core.dag.actions;
 
-public class PartitionConfig implements FlowConfig {
+import java.io.Serializable;
 
-    private final int partitionCount;
-
-    private final int targetCount;
-
-    private final int parallelismIndex;
-
-    public PartitionConfig(int partitionCount, int targetCount, int 
parallelismIndex) {
-        this.partitionCount = partitionCount;
-        this.targetCount = targetCount;
-        this.parallelismIndex = parallelismIndex;
-    }
-
-    public int getPartitionCount() {
-        return partitionCount;
-    }
-
-    public int getTargetCount() {
-        return targetCount;
-    }
-
-    public int getParallelismIndex() {
-        return parallelismIndex;
-    }
+public interface Config extends Serializable {
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
index 8c6654659..18858b42e 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
@@ -17,36 +17,21 @@
 
 package org.apache.seatunnel.engine.core.dag.actions;
 
-import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
-
 import lombok.NonNull;
 
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
 
 public class ShuffleAction extends AbstractAction {
 
-    private final PartitionSeaTunnelTransform partitionTransformation;
-
-    public ShuffleAction(long id,
-                         @NonNull String name,
-                         @NonNull List<Action> upstreams,
-                         @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
-                         @NonNull Set<URL> jarUrls) {
-        super(id, name, upstreams, jarUrls);
-        this.partitionTransformation = partitionTransformation;
-    }
-
     public ShuffleAction(long id,
                          @NonNull String name,
-                         @NonNull PartitionSeaTunnelTransform 
partitionTransformation,
-                         @NonNull Set<URL> jarUrls) {
-        super(id, name, jarUrls);
-        this.partitionTransformation = partitionTransformation;
+                         @NonNull ShuffleConfig shuffleConfig) {
+        super(id, name, new ArrayList<>(), new HashSet<>(), shuffleConfig);
     }
 
-    public PartitionSeaTunnelTransform getPartitionTransformation() {
-        return partitionTransformation;
+    @Override
+    public ShuffleConfig getConfig() {
+        return (ShuffleConfig) super.getConfig();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
similarity index 56%
copy from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
copy to 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
index b06fcb7a7..3b145e611 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
@@ -17,29 +17,29 @@
 
 package org.apache.seatunnel.engine.core.dag.actions;
 
-import lombok.NonNull;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
-
-public interface Action extends Serializable {
-    @NonNull
-    String getName();
-
-    void setName(@NonNull String name);
-
-    @NonNull
-    List<Action> getUpstream();
-
-    void addUpstream(@NonNull Action action);
-
-    int getParallelism();
-
-    void setParallelism(int parallelism);
-
-    long getId();
-
-    Set<URL> getJarUrls();
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Tolerate;
+
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+@ToString
+@Builder(toBuilder = true)
+public class ShuffleConfig implements Config {
+    public static final int DEFAULT_BATCH_SIZE = 1024;
+    public static final long DEFAULT_BATCH_FLUSH_INTERVAL = 
TimeUnit.SECONDS.toMillis(3);
+
+    @Builder.Default
+    private int batchSize = DEFAULT_BATCH_SIZE;
+    @Builder.Default
+    private long batchFlushInterval = DEFAULT_BATCH_FLUSH_INTERVAL;
+    private ShuffleStrategy shuffleStrategy;
+
+    @Tolerate
+    public ShuffleConfig() {
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
new file mode 100644
index 000000000..c84ea04ec
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@SuperBuilder(toBuilder = true)
+@Getter
+@Setter
+@ToString
+public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
+    private MultipleRowType inputRowType;
+    private String targetTableId;
+
+    @Tolerate
+    public ShuffleMultipleRowStrategy() {
+    }
+
+    @Override
+    public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance 
hazelcast, int pipelineId, int inputIndex) {
+        Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+        for (Map.Entry<String, SeaTunnelRowType> entry : inputRowType) {
+            String tableId = entry.getKey();
+            String queueName = generateQueueName(pipelineId, inputIndex, 
tableId);
+            IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+            // clear old data when job restore
+            queue.clear();
+            shuffleMap.put(queueName, queue);
+        }
+        return shuffleMap;
+    }
+
+    @Override
+    public String createShuffleKey(Record<?> record, int pipelineId, int 
inputIndex) {
+        String tableId = ((SeaTunnelRow) record.getData()).getTableId();
+        return generateQueueName(pipelineId, inputIndex, tableId);
+    }
+
+    @Override
+    public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast, int 
pipelineId, int targetIndex) {
+        IQueue<Record<?>>[] queues = new IQueue[getInputPartitions()];
+        for (int inputIndex = 0; inputIndex < getInputPartitions(); 
inputIndex++) {
+            Objects.requireNonNull(targetTableId);
+            String queueName = generateQueueName(pipelineId, inputIndex, 
targetTableId);
+            queues[inputIndex] = getIQueue(hazelcast, queueName);
+        }
+        return queues;
+    }
+
+    private String generateQueueName(int pipelineId, int inputIndex, String 
tableId) {
+        return "ShuffleMultipleRow-Queue[" + getJobId() + "-" + pipelineId + 
"-" + inputIndex + "-" + tableId + "]";
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
new file mode 100644
index 000000000..e77f59709
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+@SuperBuilder
+@Getter
+@Setter
+@ToString
+public class ShufflePartitionStrategy extends ShuffleStrategy {
+    private final Map<Integer, String[]> inputQueueMapping = new HashMap<>();
+    private int targetPartitions;
+
+    @Tolerate
+    public ShufflePartitionStrategy() {
+    }
+
+    @Override
+    public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance 
hazelcast, int pipelineId, int inputIndex) {
+        checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
+        Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+        for (int targetIndex = 0; targetIndex < targetPartitions; 
targetIndex++) {
+            String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
+            IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+            // clear old data when job restore
+            queue.clear();
+            shuffleMap.put(queueName, queue);
+        }
+        return shuffleMap;
+    }
+
+    @Override
+    public String createShuffleKey(Record<?> record, int pipelineId, int 
inputIndex) {
+        String[] inputQueueNames = 
inputQueueMapping.computeIfAbsent(inputIndex, key -> {
+            String[] queueNames = new String[targetPartitions];
+            for (int targetIndex = 0; targetIndex < targetPartitions; 
targetIndex++) {
+                queueNames[targetIndex] = generateQueueName(pipelineId, key, 
targetIndex);
+            }
+            return queueNames;
+        });
+        return 
inputQueueNames[ThreadLocalRandom.current().nextInt(targetPartitions)];
+    }
+
+    @Override
+    public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast, int 
pipelineId, int targetIndex) {
+        checkArgument(targetIndex >= 0 && targetIndex < targetPartitions);
+        IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
+        for (int inputIndex = 0; inputIndex < getInputPartitions(); 
inputIndex++) {
+            String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
+            shuffles[inputIndex] = getIQueue(hazelcast, queueName);
+        }
+        return shuffles;
+    }
+
+    private String generateQueueName(int pipelineId, int inputIndex, int 
targetIndex) {
+        return String.format("ShufflePartition-Queue[%s-%s-%s-%s]", 
getJobId(), pipelineId, inputIndex, targetIndex);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
new file mode 100644
index 000000000..36cbe117c
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@SuperBuilder(toBuilder = true)
+@Getter
+@Setter
+@ToString
+public abstract class ShuffleStrategy implements Serializable {
+    private static final int DEFAULT_QUEUE_SIZE = 2048;
+    private static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
+    private static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
+
+    protected long jobId;
+    protected int inputPartitions;
+    @Builder.Default
+    protected int queueMaxSize = DEFAULT_QUEUE_SIZE;
+    @Builder.Default
+    protected int queueBackupCount = DEFAULT_QUEUE_BACKUP_COUNT;
+    @Builder.Default
+    protected int queueAsyncBackupCount = DEFAULT_QUEUE_ASYNC_BACKUP_COUNT;
+    protected int queueEmptyQueueTtl;
+
+    @Tolerate
+    public ShuffleStrategy(){}
+
+    public abstract Map<String, IQueue<Record<?>>> 
createShuffles(HazelcastInstance hazelcast, int pipelineId, int inputIndex);
+
+    public abstract String createShuffleKey(Record<?> record, int pipelineId, 
int inputIndex);
+
+    public abstract IQueue<Record<?>>[] getShuffles(HazelcastInstance 
hazelcast, int pipelineId, int targetIndex);
+
+    protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast, String 
queueName) {
+        QueueConfig targetQueueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
+        targetQueueConfig.setMaxSize(queueMaxSize);
+        targetQueueConfig.setBackupCount(queueBackupCount);
+        targetQueueConfig.setAsyncBackupCount(queueAsyncBackupCount);
+        targetQueueConfig.setEmptyQueueTtl(queueEmptyQueueTtl);
+        return hazelcast.getQueue(queueName);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index e9b83ead3..411bfcd73 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import lombok.NonNull;
 
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -31,22 +32,35 @@ public class SinkAction<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT> extends
 
     public SinkAction(long id,
                       @NonNull String name,
-                      @NonNull List<Action> upstreams,
                       @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
                       @NonNull Set<URL> jarUrls) {
-        super(id, name, upstreams, jarUrls);
-        this.sink = sink;
+        this(id, name, new ArrayList<>(), sink, jarUrls);
     }
 
     public SinkAction(long id,
                       @NonNull String name,
+                      @NonNull List<Action> upstreams,
                       @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
                       @NonNull Set<URL> jarUrls) {
-        super(id, name, jarUrls);
+        this(id, name, upstreams, sink, jarUrls, null);
+    }
+
+    public SinkAction(long id,
+                      @NonNull String name,
+                      @NonNull List<Action> upstreams,
+                      @NonNull SeaTunnelSink<IN, StateT, CommitInfoT, 
AggregatedCommitInfoT> sink,
+                      @NonNull Set<URL> jarUrls,
+                      SinkConfig config) {
+        super(id, name, upstreams, jarUrls, config);
         this.sink = sink;
     }
 
     public SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> 
getSink() {
         return sink;
     }
+
+    @Override
+    public SinkConfig getConfig() {
+        return (SinkConfig) super.getConfig();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
similarity index 65%
copy from 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
copy to 
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
index b06fcb7a7..17d925452 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
@@ -17,29 +17,13 @@
 
 package org.apache.seatunnel.engine.core.dag.actions;
 
-import lombok.NonNull;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
-
-public interface Action extends Serializable {
-    @NonNull
-    String getName();
-
-    void setName(@NonNull String name);
-
-    @NonNull
-    List<Action> getUpstream();
-
-    void addUpstream(@NonNull Action action);
-
-    int getParallelism();
-
-    void setParallelism(int parallelism);
-
-    long getId();
-
-    Set<URL> getJarUrls();
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+public class SinkConfig implements Config {
+    private String multipleRowTableId;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e0552ae20..462ebe2a4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag;
 
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
@@ -35,8 +36,12 @@ import java.util.stream.Collectors;
 
 public class DAGUtils {
 
-    public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag, 
JobImmutableInformation jobImmutableInformation, boolean isPhysicalDAGIInfo) {
-        List<Pipeline> pipelines = new ExecutionPlanGenerator(logicalDag, 
jobImmutableInformation).generate().getPipelines();
+    public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag,
+                                           JobImmutableInformation 
jobImmutableInformation,
+                                           CheckpointConfig checkpointConfig,
+                                           boolean isPhysicalDAGIInfo) {
+        List<Pipeline> pipelines = new ExecutionPlanGenerator(
+            logicalDag, jobImmutableInformation, 
checkpointConfig).generate().getPipelines();
         if (isPhysicalDAGIInfo) {
             // Generate ExecutePlan DAG
             Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 1260b91fc..c344ab08a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -19,10 +19,17 @@ package org.apache.seatunnel.engine.server.dag.execution;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
@@ -34,189 +41,282 @@ import 
org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
+@Slf4j
 public class ExecutionPlanGenerator {
-    /**
-     * The action ID needs to be regenerated because of the source, sink, and 
chain.
-     */
-    private final IdGenerator idGenerator = new IdGenerator();
-
-    /**
-     * key: the id of the execution vertex.
-     * <br> value: the execution vertex.
-     */
-    private final Map<Long, ExecutionVertex> executionVertexMap = new 
HashMap<>();
-
-    /**
-     * key: the vertex id.
-     * <br> value: The input vertices of this vertex.
-     *
-     * <p>When chaining vertices, it need to query whether the vertex has 
multiple input vertices. </p>
-     */
-    private final Map<Long, List<LogicalVertex>> inputVerticesMap = new 
HashMap<>();
-
-    /**
-     * key: the vertex id.
-     * <br> value: The target vertices of this vertex.
-     *
-     * <p>When chaining vertices, it need to query whether the vertex has 
multiple target vertices. </p>
-     */
-    private final Map<Long, List<LogicalVertex>> targetVerticesMap = new 
HashMap<>();
-
-    /**
-     * key: logical vertex id.
-     * <br> value: execution vertex id.
-     *
-     * <p>The chaining will cause multiple {@link LogicalVertex} mapping to 
the same {@link ExecutionVertex}. </p>
-     */
-    private final Map<Long, Long> logicalToExecutionMap = new HashMap<>();
-
-    /**
-     * When chaining, the edge will be removed {@link #findChainedVertices} if 
it can be chained.
-     */
-    private final List<LogicalEdge> logicalEdges;
-
-    private final List<LogicalVertex> logicalVertices;
-
+    private final LogicalDag logicalPlan;
     private final JobImmutableInformation jobImmutableInformation;
+    private final CheckpointConfig checkpointConfig;
+    private final IdGenerator idGenerator = new IdGenerator();
 
     public ExecutionPlanGenerator(@NonNull LogicalDag logicalPlan,
-                                  @NonNull JobImmutableInformation 
jobImmutableInformation) {
-        checkArgument(logicalPlan.getEdges().size() > 0, "ExecutionPlan 
Builder must have LogicalPlan.");
-
-        this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
-        this.logicalVertices = new 
ArrayList<>(logicalPlan.getLogicalVertexMap().values());
+                                  @NonNull JobImmutableInformation 
jobImmutableInformation,
+                                  @NonNull CheckpointConfig checkpointConfig) {
+        checkArgument(logicalPlan.getEdges().size() > 0,
+            "ExecutionPlan Builder must have LogicalPlan.");
+        this.logicalPlan = logicalPlan;
         this.jobImmutableInformation = jobImmutableInformation;
+        this.checkpointConfig = checkpointConfig;
     }
 
     public ExecutionPlan generate() {
-        fillVerticesMap();
-        getSourceVertices().forEach(sourceVertex -> {
-            List<LogicalVertex> vertices = new ArrayList<>();
-            vertices.add(sourceVertex);
-            for (int index = 0; index < vertices.size(); index++) {
-                LogicalVertex vertex = vertices.get(index);
-                createExecutionVertex(vertex);
-                if (targetVerticesMap.containsKey(vertex.getVertexId())) {
-                    
vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
+        log.debug("Generate execution plan using logical plan:");
+
+        Set<ExecutionEdge> executionEdges = 
generateExecutionEdges(logicalPlan.getEdges());
+        log.debug("Phase 1: generate execution edge list {}", executionEdges);
+
+        executionEdges = generateShuffleEdges(executionEdges);
+        log.debug("Phase 2: generate shuffle edge list {}", executionEdges);
+
+        executionEdges = generateTransformChainEdges(executionEdges);
+        log.debug("Phase 3: generate transform chain edge list {}", 
executionEdges);
+
+        List<Pipeline> pipelines = generatePipelines(executionEdges);
+        log.debug("Phase 4: generate pipeline list {}", pipelines);
+
+        ExecutionPlan executionPlan = new ExecutionPlan(pipelines, 
jobImmutableInformation);
+        log.debug("Phase 5: generate execution plan: {}", executionPlan);
+
+        return executionPlan;
+    }
+
+    public static Action recreateAction(Action action, Long id, int 
parallelism) {
+        Action newAction;
+        if (action instanceof ShuffleAction) {
+            newAction = new ShuffleAction(id, action.getName(),
+                ((ShuffleAction) action).getConfig());
+        } else if (action instanceof SinkAction) {
+            newAction = new SinkAction<>(id, action.getName(),
+                ((SinkAction<?, ?, ?, ?>) action).getSink(), 
action.getJarUrls());
+        } else if (action instanceof SourceAction) {
+            newAction = new SourceAction<>(id, action.getName(),
+                ((SourceAction<?, ?, ?>) action).getSource(), 
action.getJarUrls());
+        } else if (action instanceof TransformAction) {
+            newAction = new TransformAction(id, action.getName(),
+                ((TransformAction) action).getTransform(), 
action.getJarUrls());
+        } else if (action instanceof TransformChainAction) {
+            newAction = new TransformChainAction(id, action.getName(),
+                action.getJarUrls(), ((TransformChainAction<?>) 
action).getTransforms());
+        } else {
+            throw new UnknownActionException(action);
+        }
+        newAction.setParallelism(parallelism);
+        return newAction;
+    }
+
+    private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> 
logicalEdges) {
+        Set<ExecutionEdge> executionEdges = new HashSet<>();
+
+        Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new 
HashMap();
+
+        List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
+        Collections.sort(sortedLogicalEdges, 
Comparator.comparingLong(LogicalEdge::getInputVertexId));
+        for (LogicalEdge logicalEdge : sortedLogicalEdges) {
+            LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
+            ExecutionVertex executionInputVertex = 
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
+                logicalInputVertex.getVertexId(),
+                vertexId -> {
+                    long newId = idGenerator.getNextId();
+                    Action newLogicalInputAction = recreateAction(
+                        logicalInputVertex.getAction(), newId, 
logicalInputVertex.getParallelism());
+                    return new ExecutionVertex(newId, newLogicalInputAction, 
logicalInputVertex.getParallelism());
+                });
+
+            LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();
+            ExecutionVertex executionTargetVertex = 
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
+                logicalTargetVertex.getVertexId(),
+                vertexId -> {
+                    long newId = idGenerator.getNextId();
+                    Action newLogicalTargetAction = recreateAction(
+                        logicalTargetVertex.getAction(), newId, 
logicalTargetVertex.getParallelism());
+                    return new ExecutionVertex(newId, newLogicalTargetAction, 
logicalTargetVertex.getParallelism());
                 }
+            );
+
+            ExecutionEdge executionEdge = new 
ExecutionEdge(executionInputVertex, executionTargetVertex);
+            executionEdges.add(executionEdge);
+        }
+        return executionEdges;
+    }
+
+    @SuppressWarnings("MagicNumber")
+    private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge> 
executionEdges) {
+        Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+        Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
+        executionEdges.forEach(edge -> {
+            ExecutionVertex leftVertex = edge.getLeftVertex();
+            ExecutionVertex rightVertex = edge.getRightVertex();
+            if (leftVertex.getAction() instanceof SourceAction) {
+                sourceExecutionVertices.add(leftVertex);
             }
+            targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id -> 
new ArrayList<>())
+                .add(rightVertex);
         });
-        List<ExecutionEdge> executionEdges = createExecutionEdges();
-        return new ExecutionPlan(new 
PipelineGenerator(executionVertexMap.values(), executionEdges)
-            .generatePipelines(), jobImmutableInformation);
-    }
+        if (sourceExecutionVertices.size() != 1) {
+            return executionEdges;
+        }
+        ExecutionVertex sourceExecutionVertex = 
sourceExecutionVertices.stream().findFirst().get();
+        SourceAction sourceAction = (SourceAction) 
sourceExecutionVertex.getAction();
+        SeaTunnelDataType sourceProducedType = 
sourceAction.getSource().getProducedType();
+        if (!SqlType.MULTIPLE_ROW.equals(sourceProducedType.getSqlType())) {
+            return executionEdges;
+        }
+
+        List<ExecutionVertex> sinkVertices = 
targetVerticesMap.get(sourceExecutionVertex.getVertexId());
+        Optional<ExecutionVertex> hasOtherAction = sinkVertices.stream()
+            .filter(vertex -> !(vertex.getAction() instanceof SinkAction))
+            .findFirst();
+        checkArgument(!hasOtherAction.isPresent());
+
+        Set<ExecutionEdge> newExecutionEdges = new HashSet<>();
+        ShuffleStrategy shuffleStrategy = ShuffleMultipleRowStrategy.builder()
+            .jobId(jobImmutableInformation.getJobId())
+            .inputPartitions(sourceAction.getParallelism())
+            .inputRowType(MultipleRowType.class.cast(sourceProducedType))
+            .queueEmptyQueueTtl((int) 
(checkpointConfig.getCheckpointInterval() * 3))
+            .build();
+        ShuffleConfig shuffleConfig = ShuffleConfig.builder()
+            .shuffleStrategy(shuffleStrategy)
+            .build();
+
+        long shuffleVertexId = idGenerator.getNextId();
+        String shuffleActionName = String.format("Shuffle [%s -> table[0~%s]]",
+            sourceAction.getName(), ((MultipleRowType) 
sourceProducedType).getTableIds().length - 1);
+        ShuffleAction shuffleAction = new ShuffleAction(shuffleVertexId, 
shuffleActionName, shuffleConfig);
+        // multiple-row shuffle default parallelism is 1
+        shuffleAction.setParallelism(1);
+        ExecutionVertex shuffleVertex = new ExecutionVertex(shuffleVertexId, 
shuffleAction, shuffleAction.getParallelism());
+        ExecutionEdge sourceToShuffleEdge = new 
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
+        newExecutionEdges.add(sourceToShuffleEdge);
 
-    public List<ExecutionEdge> createExecutionEdges() {
-        return logicalEdges.stream()
-            .map(logicalEdge -> new ExecutionEdge(
-                
executionVertexMap.get(logicalToExecutionMap.get(logicalEdge.getInputVertexId())),
-                
executionVertexMap.get(logicalToExecutionMap.get(logicalEdge.getTargetVertexId())))
-            ).collect(Collectors.toList());
+        for (ExecutionVertex sinkVertex : sinkVertices) {
+            ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex, 
sinkVertex);
+            newExecutionEdges.add(shuffleToSinkEdge);
+        }
+        return newExecutionEdges;
     }
 
-    private void fillVerticesMap() {
-        logicalEdges.forEach(logicalEdge -> {
-            inputVerticesMap.computeIfAbsent(logicalEdge.getTargetVertexId(), 
id -> new ArrayList<>())
-                .add(logicalEdge.getInputVertex());
-            targetVerticesMap.computeIfAbsent(logicalEdge.getInputVertexId(), 
id -> new ArrayList<>())
-                .add(logicalEdge.getTargetVertex());
+    private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge> 
executionEdges) {
+        Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
+        Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+        Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
+        executionEdges.forEach(edge -> {
+            ExecutionVertex leftVertex = edge.getLeftVertex();
+            ExecutionVertex rightVertex = edge.getRightVertex();
+            if (leftVertex.getAction() instanceof SourceAction) {
+                sourceExecutionVertices.add(leftVertex);
+            }
+            inputVerticesMap.computeIfAbsent(rightVertex.getVertexId(), id -> 
new ArrayList<>())
+                .add(leftVertex);
+            targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id -> 
new ArrayList<>())
+                .add(rightVertex);
         });
-    }
 
-    private List<LogicalVertex> getSourceVertices() {
-        List<LogicalVertex> sourceVertices = new ArrayList<>();
-        for (LogicalVertex logicalVertex : logicalVertices) {
-            List<LogicalVertex> inputVertices = 
inputVerticesMap.get(logicalVertex.getVertexId());
-            if (inputVertices == null || inputVertices.size() == 0) {
-                sourceVertices.add(logicalVertex);
+        Map<Long, ExecutionVertex> transformChainVertexMap = new HashMap<>();
+        Map<Long, Long> chainedTransformVerticesMapping = new HashMap<>();
+        for (ExecutionVertex sourceVertex : sourceExecutionVertices) {
+            List<ExecutionVertex> vertices = new ArrayList<>();
+            vertices.add(sourceVertex);
+            for (int index = 0; index < vertices.size(); index++) {
+                ExecutionVertex vertex = vertices.get(index);
+
+                fillChainedTransformExecutionVertex(vertex,
+                    chainedTransformVerticesMapping, transformChainVertexMap, 
executionEdges,
+                    Collections.unmodifiableMap(inputVerticesMap),
+                    Collections.unmodifiableMap(targetVerticesMap));
+
+                if (targetVerticesMap.containsKey(vertex.getVertexId())) {
+                    
vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
+                }
+            }
+        }
+
+        Set<ExecutionEdge> transformChainEdges = new HashSet<>();
+        for (ExecutionEdge executionEdge : executionEdges) {
+            ExecutionVertex leftVertex = executionEdge.getLeftVertex();
+            ExecutionVertex rightVertex = executionEdge.getRightVertex();
+            boolean needRebuild = false;
+            if 
(chainedTransformVerticesMapping.containsKey(leftVertex.getVertexId())) {
+                needRebuild = true;
+                leftVertex = 
transformChainVertexMap.get(chainedTransformVerticesMapping.get(leftVertex.getVertexId()));
+            }
+            if 
(chainedTransformVerticesMapping.containsKey(rightVertex.getVertexId())) {
+                needRebuild = true;
+                rightVertex = 
transformChainVertexMap.get(chainedTransformVerticesMapping.get(rightVertex.getVertexId()));
+            }
+            if (needRebuild) {
+                executionEdge = new ExecutionEdge(leftVertex, rightVertex);
             }
+            transformChainEdges.add(executionEdge);
         }
-        return sourceVertices;
+        return transformChainEdges;
     }
 
-    private void createExecutionVertex(LogicalVertex logicalVertex) {
-        if (logicalToExecutionMap.containsKey(logicalVertex.getVertexId())) {
+    private void fillChainedTransformExecutionVertex(ExecutionVertex 
currentVertex,
+                                                     Map<Long, Long> 
chainedTransformVerticesMapping,
+                                                     Map<Long, 
ExecutionVertex> transformChainVertexMap,
+                                                     Set<ExecutionEdge> 
executionEdges,
+                                                     Map<Long, 
List<ExecutionVertex>> inputVerticesMap,
+                                                     Map<Long, 
List<ExecutionVertex>> targetVerticesMap) {
+        if 
(chainedTransformVerticesMapping.containsKey(currentVertex.getVertexId())) {
             return;
         }
-        final ArrayList<LogicalVertex> chainedVertices = new ArrayList<>();
-        findChainedVertices(logicalVertex, chainedVertices);
-        final long newId = idGenerator.getNextId();
-        Action newAction;
-        if (chainedVertices.size() < 1) {
-            newAction = recreateAction(logicalVertex.getAction(), newId, 
logicalVertex.getParallelism());
-        } else {
-            List<SeaTunnelTransform> transforms = new 
ArrayList<>(chainedVertices.size());
-            List<String> names = new ArrayList<>(chainedVertices.size());
+
+        List<ExecutionVertex> transformChainedVertices = new ArrayList<>();
+        collectChainedVertices(currentVertex, transformChainedVertices, 
executionEdges, inputVerticesMap, targetVerticesMap);
+        if (transformChainedVertices.size() > 0) {
+            long newVertexId = idGenerator.getNextId();
+            List<SeaTunnelTransform> transforms = new 
ArrayList<>(transformChainedVertices.size());
+            List<String> names = new 
ArrayList<>(transformChainedVertices.size());
             Set<URL> jars = new HashSet<>();
-            chainedVertices.stream()
-                .peek(vertex -> 
logicalToExecutionMap.put(vertex.getVertexId(), newId))
-                .map(LogicalVertex::getAction)
+
+            transformChainedVertices.stream()
+                .peek(vertex -> 
chainedTransformVerticesMapping.put(vertex.getVertexId(), newVertexId))
+                .map(ExecutionVertex::getAction)
                 .map(action -> (TransformAction) action)
                 .forEach(action -> {
                     transforms.add(action.getTransform());
                     jars.addAll(action.getJarUrls());
                     names.add(action.getName());
                 });
-            newAction = new TransformChainAction(newId,
+            TransformChainAction transformChainAction = new 
TransformChainAction(newVertexId,
                 String.join("->", names),
                 jars,
                 transforms);
-            
newAction.setParallelism(logicalVertex.getAction().getParallelism());
-        }
-        ExecutionVertex executionVertex = new ExecutionVertex(newId, 
newAction, logicalVertex.getParallelism());
-        executionVertexMap.put(newId, executionVertex);
-        logicalToExecutionMap.put(logicalVertex.getVertexId(), 
executionVertex.getVertexId());
-    }
+            
transformChainAction.setParallelism(currentVertex.getAction().getParallelism());
 
-    public static Action recreateAction(Action action, Long id, int 
parallelism) {
-        Action newAction;
-        if (action instanceof ShuffleAction) {
-            newAction = new ShuffleAction(id,
-                action.getName(),
-                ((ShuffleAction) action).getPartitionTransformation(),
-                action.getJarUrls());
-        } else if (action instanceof SinkAction) {
-            newAction = new SinkAction<>(id,
-                action.getName(),
-                ((SinkAction<?, ?, ?, ?>) action).getSink(),
-                action.getJarUrls());
-        } else if (action instanceof SourceAction) {
-            newAction = new SourceAction<>(id,
-                action.getName(),
-                ((SourceAction<?, ?, ?>) action).getSource(),
-                action.getJarUrls());
-        } else if (action instanceof TransformChainAction) {
-            newAction = new TransformChainAction(id,
-                action.getName(),
-                action.getJarUrls(),
-                ((TransformChainAction<?>) action).getTransforms());
-        } else {
-            throw new UnknownActionException(action);
+            ExecutionVertex executionVertex = new ExecutionVertex(newVertexId, 
transformChainAction, currentVertex.getParallelism());
+            transformChainVertexMap.put(newVertexId, executionVertex);
+            chainedTransformVerticesMapping.put(currentVertex.getVertexId(), 
executionVertex.getVertexId());
         }
-        newAction.setParallelism(parallelism);
-        return newAction;
     }
 
-    private void findChainedVertices(LogicalVertex logicalVertex, 
List<LogicalVertex> chainedVertices) {
-        Action action = logicalVertex.getAction();
+    private void collectChainedVertices(ExecutionVertex currentVertex,
+                                        List<ExecutionVertex> chainedVertices,
+                                        Set<ExecutionEdge> executionEdges,
+                                        Map<Long, List<ExecutionVertex>> 
inputVerticesMap,
+                                        Map<Long, List<ExecutionVertex>> 
targetVerticesMap) {
+        Action action = currentVertex.getAction();
         // Currently only support Transform action chaining.
         if (action instanceof TransformAction) {
             if (chainedVertices.size() == 0) {
-                chainedVertices.add(logicalVertex);
-            } else if 
(inputVerticesMap.get(logicalVertex.getVertexId()).size() == 1) {
+                chainedVertices.add(currentVertex);
+            } else if 
(inputVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
                 // It cannot be chained to any input vertex if it has multiple 
input vertices.
-                logicalEdges.remove(new 
LogicalEdge(chainedVertices.get(chainedVertices.size() - 1), logicalVertex));
-                chainedVertices.add(logicalVertex);
+                executionEdges.remove(new 
ExecutionEdge(chainedVertices.get(chainedVertices.size() - 1), currentVertex));
+                chainedVertices.add(currentVertex);
             } else {
                 return;
             }
@@ -225,8 +325,19 @@ public class ExecutionPlanGenerator {
         }
 
         // It cannot chain to any target vertex if it has multiple target 
vertices.
-        if (targetVerticesMap.get(logicalVertex.getVertexId()).size() == 1) {
-            
findChainedVertices(targetVerticesMap.get(logicalVertex.getVertexId()).get(0), 
chainedVertices);
+        if (targetVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
+            
collectChainedVertices(targetVerticesMap.get(currentVertex.getVertexId()).get(0),
+                chainedVertices, executionEdges, inputVerticesMap, 
targetVerticesMap);
+        }
+    }
+
+    private List<Pipeline> generatePipelines(Set<ExecutionEdge> 
executionEdges) {
+        Set<ExecutionVertex> executionVertices = new HashSet<>();
+        for (ExecutionEdge edge : executionEdges) {
+            executionVertices.add(edge.getLeftVertex());
+            executionVertices.add(edge.getRightVertex());
         }
+        return new PipelineGenerator(executionVertices, new 
ArrayList<>(executionEdges))
+            .generatePipelines();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index e9bbf2a49..c55a14e7e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.engine.server.dag.physical;
 import static 
org.apache.seatunnel.engine.common.config.server.QueueType.BLOCKINGQUEUE;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -36,7 +40,6 @@ import 
org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
 import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
 import org.apache.seatunnel.engine.server.dag.physical.config.FlowConfig;
 import 
org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig;
-import org.apache.seatunnel.engine.server.dag.physical.config.PartitionConfig;
 import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
 import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
@@ -65,6 +68,7 @@ import lombok.NonNull;
 import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -270,41 +274,102 @@ public class PhysicalPlanGenerator {
     private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> edges,
                                                 int pipelineIndex,
                                                 int totalPipelineNum) {
-        return edges.stream().filter(s -> s.getLeftVertex().getAction() 
instanceof ShuffleAction)
+        return edges.stream()
+            .filter(s -> s.getLeftVertex().getAction() instanceof 
ShuffleAction)
             .map(q -> (ShuffleAction) q.getLeftVertex().getAction())
+            .collect(Collectors.toSet())
+            .stream()
             .map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
             .flatMap(flow -> {
-                List<PhysicalVertex> t = new ArrayList<>();
-                long taskIDPrefix = idGenerator.getNextId();
-                long taskGroupIDPrefix = idGenerator.getNextId();
-                for (int i = 0; i < flow.getAction().getParallelism(); i++) {
-                    long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix, 
i);
-                    TaskGroupLocation taskGroupLocation =
-                        new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
-                    TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, i);
-                    setFlowConfig(flow, i);
-                    SeaTunnelTask seaTunnelTask =
-                        new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, 
flow);
-                    // checkpoint
-                    fillCheckpointPlan(seaTunnelTask);
-                    t.add(new PhysicalVertex(
-                        i,
-                        executorService,
-                        flow.getAction().getParallelism(),
-                        new TaskGroupDefaultImpl(taskGroupLocation, 
flow.getAction().getName() +
-                            "-ShuffleTask",
-                            Lists.newArrayList(seaTunnelTask)),
-                        flakeIdGenerator,
-                        pipelineIndex,
-                        totalPipelineNum,
-                        seaTunnelTask.getJarsUrl(),
-                        jobImmutableInformation,
-                        initializationTimestamp,
-                        nodeEngine,
-                        runningJobStateIMap,
-                        runningJobStateTimestampsIMap));
+                List<PhysicalVertex> physicalVertices = new ArrayList<>();
+
+                ShuffleAction shuffleAction = (ShuffleAction) flow.getAction();
+                ShuffleConfig shuffleConfig = shuffleAction.getConfig();
+                ShuffleStrategy shuffleStrategy = 
shuffleConfig.getShuffleStrategy();
+                if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) {
+                    ShuffleMultipleRowStrategy shuffleMultipleRowStrategy = 
(ShuffleMultipleRowStrategy) shuffleStrategy;
+                    for (Flow nextFlow : flow.getNext()) {
+                        PhysicalExecutionFlow sinkFlow = 
(PhysicalExecutionFlow) nextFlow;
+                        SinkAction sinkAction = (SinkAction) 
sinkFlow.getAction();
+                        String sinkTableId = 
sinkAction.getConfig().getMultipleRowTableId();
+                        MultipleRowType multipleRowType = 
shuffleMultipleRowStrategy.getInputRowType();
+                        int sinkTableIndex = 
Arrays.asList(multipleRowType.getTableIds()).indexOf(sinkTableId);
+
+                        long taskIDPrefix = idGenerator.getNextId();
+                        long taskGroupIDPrefix = idGenerator.getNextId();
+                        for (int parallelismIndex = 0; parallelismIndex < 
flow.getAction().getParallelism(); parallelismIndex++) {
+                            ShuffleStrategy shuffleStrategyOfSinkFlow = 
shuffleMultipleRowStrategy.toBuilder()
+                                .targetTableId(sinkTableId)
+                                .build();
+                            ShuffleConfig shuffleConfigOfSinkFlow = 
shuffleConfig.toBuilder()
+                                .shuffleStrategy(shuffleStrategyOfSinkFlow)
+                                .build();
+                            long shuffleActionId = idGenerator.getNextId();
+                            String shuffleActionName = String.format("Shuffle 
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
+                            ShuffleAction shuffleActionOfSinkFlow = new 
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
+                            shuffleActionOfSinkFlow.setParallelism(1);
+                            PhysicalExecutionFlow shuffleFlow = new 
PhysicalExecutionFlow(shuffleActionOfSinkFlow, 
Collections.singletonList(sinkFlow));
+                            setFlowConfig(shuffleFlow, parallelismIndex);
+
+                            long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
+                            TaskGroupLocation taskGroupLocation = new 
TaskGroupLocation(
+                                jobImmutableInformation.getJobId(), 
pipelineIndex, taskGroupID);
+                            TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
+                            SeaTunnelTask seaTunnelTask = new 
TransformSeaTunnelTask(
+                                jobImmutableInformation.getJobId(), 
taskLocation, parallelismIndex, shuffleFlow);
+
+                            // checkpoint
+                            fillCheckpointPlan(seaTunnelTask);
+                            physicalVertices.add(new PhysicalVertex(
+                                parallelismIndex,
+                                executorService,
+                                shuffleFlow.getAction().getParallelism(),
+                                new TaskGroupDefaultImpl(taskGroupLocation, 
shuffleFlow.getAction().getName() +
+                                    "-ShuffleTask",
+                                    Collections.singletonList(seaTunnelTask)),
+                                flakeIdGenerator,
+                                pipelineIndex,
+                                totalPipelineNum,
+                                seaTunnelTask.getJarsUrl(),
+                                jobImmutableInformation,
+                                initializationTimestamp,
+                                nodeEngine,
+                                runningJobStateIMap,
+                                runningJobStateTimestampsIMap));
+                        }
+                    }
+                } else {
+                    long taskIDPrefix = idGenerator.getNextId();
+                    long taskGroupIDPrefix = idGenerator.getNextId();
+                    for (int i = 0; i < flow.getAction().getParallelism(); 
i++) {
+                        long taskGroupID = 
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+                        TaskGroupLocation taskGroupLocation =
+                            new 
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, 
taskGroupID);
+                        TaskLocation taskLocation = new 
TaskLocation(taskGroupLocation, taskIDPrefix, i);
+                        setFlowConfig(flow, i);
+                        SeaTunnelTask seaTunnelTask =
+                            new 
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, 
flow);
+                        // checkpoint
+                        fillCheckpointPlan(seaTunnelTask);
+                        physicalVertices.add(new PhysicalVertex(
+                            i,
+                            executorService,
+                            flow.getAction().getParallelism(),
+                            new TaskGroupDefaultImpl(taskGroupLocation, 
flow.getAction().getName() +
+                                "-ShuffleTask",
+                                Lists.newArrayList(seaTunnelTask)),
+                            flakeIdGenerator,
+                            pipelineIndex,
+                            totalPipelineNum,
+                            seaTunnelTask.getJarsUrl(),
+                            jobImmutableInformation,
+                            initializationTimestamp,
+                            nodeEngine,
+                            runningJobStateIMap,
+                            runningJobStateTimestampsIMap));
+                    }
                 }
-                return t.stream();
+                return physicalVertices.stream();
             }).collect(Collectors.toList());
     }
 
@@ -463,13 +528,6 @@ public class PhysicalPlanGenerator {
                     
config.setCommitterTask(committerTaskIDMap.get((SinkAction<?, ?, ?, ?>) 
flow.getAction()));
                 }
                 flow.setConfig(config);
-            } else if (flow.getAction() instanceof ShuffleAction) {
-                PartitionConfig config =
-                    new PartitionConfig(
-                        ((ShuffleAction) 
flow.getAction()).getPartitionTransformation().getPartitionCount(),
-                        ((ShuffleAction) 
flow.getAction()).getPartitionTransformation().getTargetCount(),
-                        parallelismIndex);
-                flow.setConfig(config);
             }
         } else if (f instanceof IntermediateExecutionFlow) {
             ((IntermediateExecutionFlow<IntermediateQueueConfig>) f)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index efced807a..3892b47e0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -36,16 +37,16 @@ public class PlanUtils {
 
     public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> 
fromLogicalDAG(@NonNull LogicalDag logicalDag,
                                                                                
     @NonNull NodeEngine nodeEngine,
-                                                                               
     @NonNull
-                                                                               
         JobImmutableInformation jobImmutableInformation,
+                                                                               
     @NonNull JobImmutableInformation jobImmutableInformation,
                                                                                
     long initializationTimestamp,
                                                                                
     @NonNull ExecutorService executorService,
                                                                                
     @NonNull FlakeIdGenerator flakeIdGenerator,
                                                                                
     @NonNull IMap runningJobStateIMap,
                                                                                
     @NonNull IMap runningJobStateTimestampsIMap,
-                                                                               
     @NonNull QueueType queueType) {
+                                                                               
     @NonNull QueueType queueType,
+                                                                               
     @NonNull CheckpointConfig checkpointConfig) {
         return new PhysicalPlanGenerator(
-            new ExecutionPlanGenerator(logicalDag, 
jobImmutableInformation).generate(),
+            new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, 
checkpointConfig).generate(),
             nodeEngine,
             jobImmutableInformation,
             initializationTimestamp,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 41bcbbbbc..1c852218e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.engine.server.metrics.MetricsContext;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -60,4 +61,7 @@ public class TaskExecutionContext {
         return (T) task;
     }
 
+    public HazelcastInstance getInstance() {
+        return nodeEngine.getHazelcastInstance();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c9352461f..6d53deffe 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -133,6 +133,8 @@ public class JobMaster {
 
     private Map<Integer, CheckpointPlan> checkpointPlanMap;
 
+    private CheckpointConfig jobCheckpointConfig;
+
     public JobMaster(@NonNull Data jobImmutableInformationData,
                      @NonNull NodeEngine nodeEngine,
                      @NonNull ExecutorService executorService,
@@ -157,6 +159,9 @@ public class JobMaster {
     public void init(long initializationTimestamp) {
         jobImmutableInformation = 
nodeEngine.getSerializationService().toObject(
             jobImmutableInformationData);
+        jobCheckpointConfig = 
createJobCheckpointConfig(engineConfig.getCheckpointConfig(),
+            jobImmutableInformation.getJobConfig().getEnvOptions());
+
         LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", 
jobImmutableInformation.getJobConfig().getName(),
             jobImmutableInformation.getJobId()));
         LOGGER.info(String.format("Job %s (%s) needed jar urls %s", 
jobImmutableInformation.getJobConfig().getName(),
@@ -174,7 +179,8 @@ public class JobMaster {
             flakeIdGenerator,
             runningJobStateIMap,
             runningJobStateTimestampsIMap,
-            engineConfig.getQueueType());
+            engineConfig.getQueueType(),
+            jobCheckpointConfig);
         this.physicalPlan = planTuple.f0();
         this.physicalPlan.setJobMaster(this);
         this.checkpointPlanMap = planTuple.f1();
@@ -182,32 +188,33 @@ public class JobMaster {
     }
 
     public void initCheckPointManager() throws CheckpointStorageException {
-        CheckpointConfig checkpointConfig = 
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
-            jobImmutableInformation.getJobConfig().getEnvOptions());
         this.checkpointManager = new CheckpointManager(
             jobImmutableInformation.getJobId(),
             jobImmutableInformation.isStartWithSavePoint(),
             nodeEngine,
             this,
             checkpointPlanMap,
-            checkpointConfig);
+            jobCheckpointConfig);
     }
 
     // TODO replace it after ReadableConfig Support parse yaml format, then 
use only one config to read engine and env config.
-    private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine, 
Map<String, Object> env) {
-        CheckpointConfig checkpointConfig = new CheckpointConfig();
-        if (env.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            checkpointConfig.setCheckpointInterval((Integer) 
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+    private CheckpointConfig createJobCheckpointConfig(CheckpointConfig 
defaultCheckpointConfig, Map<String, Object> jobEnv) {
+        CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
+        
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
+        
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
+        
jobCheckpointConfig.setMaxConcurrentCheckpoints(defaultCheckpointConfig.getMaxConcurrentCheckpoints());
+        
jobCheckpointConfig.setTolerableFailureCheckpoints(defaultCheckpointConfig.getTolerableFailureCheckpoints());
+
+        CheckpointStorageConfig jobCheckpointStorageConfig = new 
CheckpointStorageConfig();
+        
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
+        
jobCheckpointStorageConfig.setStoragePluginConfig(defaultCheckpointConfig.getStorage().getStoragePluginConfig());
+        
jobCheckpointStorageConfig.setMaxRetainedCheckpoints(defaultCheckpointConfig.getStorage().getMaxRetainedCheckpoints());
+        jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
+
+        if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            jobCheckpointConfig.setCheckpointInterval((Integer) 
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
         }
-        checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
-        
checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
-        
checkpointConfig.setMaxConcurrentCheckpoints(engine.getMaxConcurrentCheckpoints());
-        CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
-        
storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
-        storageConfig.setStorage(engine.getStorage().getStorage());
-        
storageConfig.setStoragePluginConfig(engine.getStorage().getStoragePluginConfig());
-        checkpointConfig.setStorage(storageConfig);
-        return checkpointConfig;
+        return jobCheckpointConfig;
     }
 
     public void initStateFuture() {
@@ -257,7 +264,8 @@ public class JobMaster {
 
     public JobDAGInfo getJobDAGInfo() {
         if (jobDAGInfo == null) {
-            jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, 
jobImmutableInformation, isPhysicalDAGIInfo);
+            jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag,
+                jobImmutableInformation, engineConfig.getCheckpointConfig(), 
isPhysicalDAGIInfo);
         }
         return jobDAGInfo;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e23dbc871..11691432d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -64,6 +65,7 @@ import 
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithInterm
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
+import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.internal.metrics.MetricDescriptor;
 import com.hazelcast.internal.metrics.MetricsCollectionContext;
 import lombok.NonNull;
@@ -213,12 +215,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
                     new 
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
                         new SeaTunnelTransformCollector(flowLifeCycles), 
completableFuture);
             } else if (f.getAction() instanceof ShuffleAction) {
-                // TODO use index and taskID to create ringbuffer list
+                ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
+                ShuffleConfig shuffleConfig = shuffleAction.getConfig();
+                HazelcastInstance hazelcastInstance = 
getExecutionContext().getInstance();
                 if (flow.getNext().isEmpty()) {
-                    lifeCycle = new ShuffleSinkFlowLifeCycle(this, 
completableFuture);
+                    lifeCycle = new ShuffleSinkFlowLifeCycle(
+                        this, indexID, shuffleConfig, hazelcastInstance, 
completableFuture);
                 } else {
-                    lifeCycle = new ShuffleSourceFlowLifeCycle(this, 
completableFuture);
+                    lifeCycle = new ShuffleSourceFlowLifeCycle(
+                        this, indexID, shuffleConfig, hazelcastInstance, 
completableFuture);
                 }
+                outputs = flowLifeCycles;
             } else {
                 throw new UnknownActionException(f.getAction());
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index ce2eff995..546ee782c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -17,20 +17,15 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
-import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.collection.IQueue;
-import com.hazelcast.config.QueueConfig;
 import com.hazelcast.core.HazelcastInstance;
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -44,20 +39,29 @@ import java.util.concurrent.TimeUnit;
 @SuppressWarnings("MagicNumber")
 @Slf4j
 public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>> {
-    private Map<String, IQueue<Record<?>>> shuffles;
-    private long shuffleBufferTimesMillis = 1000;
-    private int shuffleBufferSize = 1024;
-    private int shuffleBatchSize = 1024;
-    private Map<String, Queue<Record<?>>> shuffleBuffer;
-    private ShuffleStrategy shuffleStrategy;
+    private final int pipelineId;
+    private final int taskIndex;
+    private final Map<String, IQueue<Record<?>>> shuffles;
+    private final int shuffleBatchSize;
+    private final long shuffleBatchFlushInterval;
+    private final Map<String, Queue<Record<?>>> shuffleBuffer;
+    private final ShuffleStrategy shuffleStrategy;
+    private int shuffleBufferSize;
     private long lastModify;
 
     public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
+                                    int taskIndex,
+                                    ShuffleConfig shuffleConfig,
+                                    HazelcastInstance hazelcastInstance,
                                     CompletableFuture<Void> completableFuture) 
{
         super(runningTask, completableFuture);
-        // todo initialize shuffleStrategy
-        this.shuffleStrategy = null;
-        this.shuffles = shuffleStrategy.createShuffles();
+        this.pipelineId = 
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
+        this.taskIndex = taskIndex;
+        this.shuffleStrategy = shuffleConfig.getShuffleStrategy();
+        this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance, 
pipelineId, taskIndex);
+        this.shuffleBatchSize = shuffleConfig.getBatchSize();
+        this.shuffleBatchFlushInterval = shuffleConfig.getBatchFlushInterval();
+        this.shuffleBuffer = new HashMap<>();
     }
 
     @Override
@@ -92,8 +96,9 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
     @Override
     public void close() throws IOException {
         super.close();
-        for (Map.Entry<String, IQueue<Record<?>>> shuffleBatch : 
shuffles.entrySet()) {
-            shuffleBatch.getValue().destroy();
+        for (Map.Entry<String, IQueue<Record<?>>> shuffleItem : 
shuffles.entrySet()) {
+            log.info("destroy shuffle queue[{}]", shuffleItem.getKey());
+            shuffleItem.getValue().destroy();
         }
     }
 
@@ -105,7 +110,7 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
             public void run() {
                 if (!prepareClose
                     && shuffleBufferSize > 0
-                    && System.currentTimeMillis() - lastModify > 
shuffleBufferTimesMillis) {
+                    && System.currentTimeMillis() - lastModify > 
shuffleBatchFlushInterval) {
 
                     try {
                         shuffleFlush();
@@ -117,24 +122,24 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
                 // submit next task
                 if (!prepareClose) {
                     Runnable nextScheduleFlushTask = this;
-                    scheduledExecutorService.schedule(nextScheduleFlushTask, 
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+                    scheduledExecutorService.schedule(nextScheduleFlushTask, 
shuffleBatchFlushInterval, TimeUnit.MILLISECONDS);
                 } else {
                     completedFuture.complete(true);
                 }
             }
         };
-        scheduledExecutorService.schedule(scheduleFlushTask, 
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+        scheduledExecutorService.schedule(scheduleFlushTask, 
shuffleBatchFlushInterval, TimeUnit.MILLISECONDS);
         return completedFuture;
     }
 
     private synchronized void shuffleItem(Record<?> record) {
-        String shuffleKey = shuffleStrategy.extractShuffleKey(record);
-        shuffleBuffer.compute(shuffleKey, (key, records) -> new LinkedList<>())
+        String shuffleKey = shuffleStrategy.createShuffleKey(record, 
pipelineId, taskIndex);
+        shuffleBuffer.computeIfAbsent(shuffleKey, key -> new LinkedList<>())
             .add(record);
         shuffleBufferSize++;
 
         if (shuffleBufferSize >= shuffleBatchSize
-            || (shuffleBufferSize > 1 && System.currentTimeMillis() - 
lastModify > shuffleBufferTimesMillis)) {
+            || (shuffleBufferSize > 1 && System.currentTimeMillis() - 
lastModify > shuffleBatchFlushInterval)) {
             shuffleFlush();
         }
 
@@ -143,7 +148,7 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
 
     private synchronized void shuffleFlush() {
         for (Map.Entry<String, Queue<Record<?>>> shuffleBatch : 
shuffleBuffer.entrySet()) {
-            IQueue<Record<?>> shuffleQueue = 
shuffleQueue(shuffleBatch.getKey());
+            IQueue<Record<?>> shuffleQueue = 
shuffles.get(shuffleBatch.getKey());
             Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
             if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
                 for (; ;) {
@@ -162,73 +167,4 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle implements O
         }
         shuffleBufferSize = 0;
     }
-
-    private IQueue<Record<?>> shuffleQueue(String shuffleKey) {
-        return shuffles.get(shuffleKey);
-    }
-
-    interface ShuffleStrategy {
-        Map<String, IQueue<Record<?>>> createShuffles();
-
-        String extractShuffleKey(Record<?> record);
-    }
-
-    @RequiredArgsConstructor
-    @AllArgsConstructor
-    public static class PartitionShuffle implements ShuffleStrategy {
-        private final HazelcastInstance hazelcast;
-        private final String jobId;
-        private final int partitionNumber;
-        private QueueConfig queueConfig;
-
-        @Override
-        public Map<String, IQueue<Record<?>>> createShuffles() {
-            Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
-            for (int i = 0; i < partitionNumber; i++) {
-                String queueName = String.format("PartitionShuffle[%s-%s]", 
jobId, i);
-                QueueConfig queueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
-                queueConfig.setMaxSize(queueConfig.getMaxSize())
-                    .setBackupCount(queueConfig.getBackupCount())
-                    .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
-                    .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
-                shuffleMap.put(String.valueOf(i), 
hazelcast.getQueue(queueName));
-            }
-            return shuffleMap;
-        }
-
-        @Override
-        public String extractShuffleKey(Record<?> record) {
-            return RandomStringUtils.random(partitionNumber);
-        }
-    }
-
-    @RequiredArgsConstructor
-    @AllArgsConstructor
-    public static class MultipleRowShuffle implements ShuffleStrategy {
-        private final HazelcastInstance hazelcast;
-        private final String jobId;
-        private final int parallelismIndex;
-        private final MultipleRowType multipleRowType;
-        private QueueConfig queueConfig;
-
-        @Override
-        public Map<String, IQueue<Record<?>>> createShuffles() {
-            Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
-            for (Map.Entry<String, SeaTunnelRowType> entry : multipleRowType) {
-                String queueName = 
String.format("MultipleRowShuffle[%s-%s-%s]", jobId, parallelismIndex, 
entry.getKey());
-                QueueConfig queueConfig = 
hazelcast.getConfig().getQueueConfig(queueName);
-                queueConfig.setMaxSize(queueConfig.getMaxSize())
-                    .setBackupCount(queueConfig.getBackupCount())
-                    .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
-                    .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
-                shuffleMap.put(entry.getKey(), hazelcast.getQueue(queueName));
-            }
-            return shuffleMap;
-        }
-
-        @Override
-        public String extractShuffleKey(Record<?> record) {
-            return ((SeaTunnelRow) record.getData()).getTableId();
-        }
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index c95f421df..43c450f32 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,10 +19,12 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -33,17 +35,22 @@ import java.util.concurrent.CompletableFuture;
 
 @SuppressWarnings("MagicNumber")
 public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle 
implements OneOutputFlowLifeCycle<Record<?>> {
-    private int shuffleBatchSize = 1024;
-    private IQueue<Record<?>>[] shuffles;
+    private final int shuffleBatchSize;
+    private final IQueue<Record<?>>[] shuffles;
     private List<Record<?>> unsentBuffer;
     private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
     private long currentCheckpointId = Long.MAX_VALUE;
     private int alignedBarriersCounter = 0;
 
     public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
+                                      int taskIndex,
+                                      ShuffleConfig shuffleConfig,
+                                      HazelcastInstance hazelcastInstance,
                                       CompletableFuture<Void> 
completableFuture) {
         super(runningTask, completableFuture);
-        // todo initialize shuffles
+        int pipelineId = runningTask.getTaskLocation().getPipelineId();
+        this.shuffles = 
shuffleConfig.getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId, 
taskIndex);
+        this.shuffleBatchSize = shuffleConfig.getBatchSize();
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index c9f802489..7ffa535f0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -33,11 +33,16 @@ import 
org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.parse.JobConfigParser;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -51,6 +56,11 @@ public class TestUtils {
         IdGenerator idGenerator = new IdGenerator();
         FakeSource fakeSource = new FakeSource();
         fakeSource.setJobContext(jobContext);
+        Config fakeSourceConfig = ConfigFactory.parseMap(
+            Collections.singletonMap("schema",
+                Collections.singletonMap("fields",
+                    ImmutableMap.of("id", "int", "name", "string"))));
+        fakeSource.prepare(fakeSourceConfig);
 
         Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource,
             Sets.newHashSet(new URL("file:///fake.jar")));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 77da5b8f5..5c3b25697 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -35,6 +36,10 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
 import com.hazelcast.map.IMap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -69,7 +74,8 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
             instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
             runningJobStateTimestamp,
-            QueueType.BLOCKINGQUEUE).f1();
+            QueueType.BLOCKINGQUEUE,
+            new CheckpointConfig()).f1();
         Assertions.assertNotNull(checkpointPlans);
         Assertions.assertEquals(2, checkpointPlans.size());
         // enum(1) + reader(2) + writer(2)
@@ -90,6 +96,11 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(JobMode.BATCH);
         FakeSource fakeSource = new FakeSource();
+        Config fakeSourceConfig = ConfigFactory.parseMap(
+            Collections.singletonMap("schema",
+                Collections.singletonMap("fields",
+                    ImmutableMap.of("id", "int", "name", "string"))));
+        fakeSource.prepare(fakeSourceConfig);
         fakeSource.setJobContext(jobContext);
 
         Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
fakeSource, Collections.emptySet());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 7bec42e2b..bc0d5bedd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -38,6 +39,10 @@ import org.apache.seatunnel.engine.server.TestUtils;
 import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
 import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import com.hazelcast.map.IMap;
 import org.junit.jupiter.api.Assertions;
@@ -74,11 +79,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
 
         IdGenerator idGenerator = new IdGenerator();
 
-        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new 
FakeSource(),
+        Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", 
createFakeSource(),
             Sets.newHashSet(new URL("file:///fake.jar")));
         LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 2);
 
-        Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", new 
FakeSource(),
+        Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", 
createFakeSource(),
             Sets.newHashSet(new URL("file:///fake.jar")));
         LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2);
 
@@ -110,10 +115,21 @@ public class TaskTest extends AbstractSeaTunnelServerTest 
{
             instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
             runningJobStateTimestamp,
-            QueueType.BLOCKINGQUEUE).f0();
+            QueueType.BLOCKINGQUEUE,
+            new CheckpointConfig()).f0();
 
         Assertions.assertEquals(physicalPlan.getPipelineList().size(), 1);
         
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
 1);
         
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(),
 2);
     }
+
+    private static FakeSource createFakeSource() {
+        FakeSource fakeSource = new FakeSource();
+        Config fakeSourceConfig = ConfigFactory.parseMap(
+            Collections.singletonMap("schema",
+                Collections.singletonMap("fields",
+                    ImmutableMap.of("id", "int", "name", "string"))));
+        fakeSource.prepare(fakeSourceConfig);
+        return fakeSource;
+    }
 }

Reply via email to