This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new add75d7d5 [Feature][CDC] Support add & dorp tables when restore cdc 
jobs (#4254)
add75d7d5 is described below

commit add75d7d5d0af061664ff6fa42a7b3ef58eae81b
Author: hailin0 <[email protected]>
AuthorDate: Fri Mar 10 13:41:40 2023 +0800

    [Feature][CDC] Support add & dorp tables when restore cdc jobs (#4254)
---
 config/seatunnel.yaml                              |  2 +-
 .../api/table/catalog/CatalogTableUtil.java        |  2 +-
 .../cdc/base/source/IncrementalSource.java         | 70 +++++++++++++++---
 .../enumerator/IncrementalSplitAssigner.java       | 30 ++++++--
 .../source/reader/IncrementalSourceReader.java     | 24 +++----
 .../engine/client/JobConfigParserTest.java         | 13 ++--
 .../engine/client/LogicalDagGeneratorTest.java     |  2 +-
 .../dag/actions/ShuffleMultipleRowStrategy.java    | 29 ++++++--
 .../core/dag/actions/ShufflePartitionStrategy.java | 23 +++++-
 .../core/dag/logical/LogicalDagGenerator.java      | 13 ++--
 .../engine/core/parse/JobConfigParser.java         | 83 ++++++++++++++++++----
 .../core/parse/MultipleTableJobConfigParser.java   | 28 ++++++--
 .../engine/server/checkpoint/ActionState.java      | 12 ++--
 ...ActionSubtaskState.java => ActionStateKey.java} | 26 ++++---
 .../server/checkpoint/ActionSubtaskState.java      |  2 +-
 .../server/checkpoint/CheckpointCoordinator.java   |  6 +-
 .../engine/server/checkpoint/CheckpointPlan.java   | 16 ++---
 .../server/checkpoint/CompletedCheckpoint.java     |  6 +-
 .../server/checkpoint/PendingCheckpoint.java       |  8 +--
 .../dag/execution/ExecutionPlanGenerator.java      | 43 ++++++++---
 .../engine/server/dag/execution/Pipeline.java      |  7 +-
 .../server/dag/physical/PhysicalPlanGenerator.java | 23 +++---
 .../engine/server/task/SeaTunnelTask.java          | 15 ++--
 .../server/task/SinkAggregatedCommitterTask.java   |  4 +-
 .../server/task/SourceSplitEnumeratorTask.java     |  3 +-
 .../server/task/flow/ShuffleSinkFlowLifeCycle.java |  6 +-
 .../task/flow/ShuffleSourceFlowLifeCycle.java      |  8 ++-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  6 +-
 .../server/task/flow/SourceFlowLifeCycle.java      |  3 +-
 .../server/task/flow/TransformFlowLifeCycle.java   |  3 +-
 .../engine/server/checkpoint/StorageTest.java      |  5 +-
 31 files changed, 381 insertions(+), 140 deletions(-)

diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index add3996c8..8202d069c 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -34,4 +34,4 @@ seatunnel:
         plugin-config:
           namespace: /tmp/seatunnel/checkpoint_snapshot
           storage.type: hdfs
-          fs.defaultFS: file:/// # Ensure that the directory has written 
permission
+          fs.defaultFS: file:///tmp/ # Ensure that the directory has written 
permission
\ No newline at end of file
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index f5856a013..00603e3f5 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -111,7 +111,7 @@ public class CatalogTableUtil implements Serializable {
         // Get the list of specified tables
         List<String> tableNames = 
catalogConfig.get(CatalogOptions.TABLE_NAMES);
         List<CatalogTable> catalogTables = new ArrayList<>();
-        if (tableNames != null && tableNames.size() > 1) {
+        if (tableNames != null && tableNames.size() >= 1) {
             for (String tableName : tableNames) {
                 catalogTables.add(catalog.getTable(TablePath.of(tableName)));
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 98cc6eb4d..dbad777cc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -42,10 +42,12 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
 import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import 
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
 import 
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
@@ -54,15 +56,21 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmit
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
 
+import com.google.common.collect.Sets;
 import io.debezium.relational.TableId;
 import lombok.NoArgsConstructor;
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 @NoArgsConstructor
 public abstract class IncrementalSource<T, C extends SourceConfig>
@@ -218,16 +226,20 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
             PendingSplitsState checkpointState)
             throws Exception {
         C sourceConfig = configFactory.create(0);
-        final List<TableId> remainingTables =
-                dataSourceDialect.discoverDataCollections(sourceConfig);
-        SplitAssigner.Context<C> assignerContext =
-                new SplitAssigner.Context<>(
-                        sourceConfig,
-                        new HashSet<>(remainingTables),
-                        new HashMap<>(),
-                        new HashMap<>());
+        Set<TableId> capturedTables =
+                new 
HashSet<>(dataSourceDialect.discoverDataCollections(sourceConfig));
+
         final SplitAssigner splitAssigner;
         if (checkpointState instanceof HybridPendingSplitsState) {
+            checkpointState = restore(capturedTables, 
(HybridPendingSplitsState) checkpointState);
+            SnapshotPhaseState checkpointSnapshotState =
+                    ((HybridPendingSplitsState) 
checkpointState).getSnapshotPhaseState();
+            SplitAssigner.Context<C> assignerContext =
+                    new SplitAssigner.Context<>(
+                            sourceConfig,
+                            capturedTables,
+                            checkpointSnapshotState.getAssignedSplits(),
+                            
checkpointSnapshotState.getSplitCompletedOffsets());
             splitAssigner =
                     new HybridSplitAssigner<>(
                             assignerContext,
@@ -237,6 +249,9 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
                             dataSourceDialect,
                             offsetFactory);
         } else if (checkpointState instanceof IncrementalPhaseState) {
+            SplitAssigner.Context<C> assignerContext =
+                    new SplitAssigner.Context<>(
+                            sourceConfig, capturedTables, new HashMap<>(), new 
HashMap<>());
             splitAssigner =
                     new IncrementalSplitAssigner<>(
                             assignerContext, incrementalParallelism, 
offsetFactory);
@@ -246,4 +261,43 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig>
         }
         return new IncrementalSourceEnumerator(enumeratorContext, 
splitAssigner);
     }
+
+    private HybridPendingSplitsState restore(
+            Set<TableId> capturedTables, HybridPendingSplitsState 
checkpointState) {
+        SnapshotPhaseState checkpointSnapshotState = 
checkpointState.getSnapshotPhaseState();
+        Set<TableId> checkpointCapturedTables =
+                Stream.concat(
+                                
checkpointSnapshotState.getAlreadyProcessedTables().stream(),
+                                
checkpointSnapshotState.getRemainingTables().stream())
+                        .collect(Collectors.toSet());
+        Set<TableId> newTables = Sets.difference(capturedTables, 
checkpointCapturedTables);
+        Set<TableId> deletedTables = Sets.difference(checkpointCapturedTables, 
capturedTables);
+
+        checkpointSnapshotState.getRemainingTables().addAll(newTables);
+        checkpointSnapshotState.getRemainingTables().removeAll(deletedTables);
+        
checkpointSnapshotState.getAlreadyProcessedTables().removeAll(deletedTables);
+        Set<String> deletedSplitIds = new HashSet<>();
+        Iterator<SnapshotSplit> splitIterator =
+                checkpointSnapshotState.getRemainingSplits().iterator();
+        while (splitIterator.hasNext()) {
+            SnapshotSplit split = splitIterator.next();
+            if (deletedTables.contains(split.getTableId())) {
+                splitIterator.remove();
+                deletedSplitIds.add(split.splitId());
+            }
+        }
+        for (Map.Entry<String, SnapshotSplit> entry :
+                checkpointSnapshotState.getAssignedSplits().entrySet()) {
+            SnapshotSplit split = entry.getValue();
+            if (deletedTables.contains(split.getTableId())) {
+                deletedSplitIds.add(entry.getKey());
+            }
+        }
+        deletedSplitIds.forEach(
+                splitId -> {
+                    
checkpointSnapshotState.getAssignedSplits().remove(splitId);
+                    
checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
+                });
+        return checkpointState;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index b203ecd92..cf6caf87b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -69,6 +69,8 @@ public class IncrementalSplitAssigner<C extends SourceConfig> 
implements SplitAs
 
     private final Map<String, IncrementalSplit> assignedSplits = new 
HashMap<>();
 
+    private boolean startWithSnapshotMinimumOffset = true;
+
     public IncrementalSplitAssigner(
             SplitAssigner.Context<C> context,
             int incrementalParallelism,
@@ -94,7 +96,8 @@ public class IncrementalSplitAssigner<C extends SourceConfig> 
implements SplitAs
         if (splitAssigned) {
             return Optional.empty();
         }
-        List<IncrementalSplit> incrementalSplits = createIncrementalSplits();
+        List<IncrementalSplit> incrementalSplits =
+                createIncrementalSplits(startWithSnapshotMinimumOffset);
         remainingSplits.addAll(incrementalSplits);
         splitAssigned = true;
         return getNext();
@@ -136,15 +139,24 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
                             List<CompletedSnapshotSplitInfo> 
completedSnapshotSplitInfos =
                                     
incrementalSplit.getCompletedSnapshotSplitInfos();
                             for (CompletedSnapshotSplitInfo info : 
completedSnapshotSplitInfos) {
+                                if 
(!context.getCapturedTables().contains(info.getTableId())) {
+                                    continue;
+                                }
                                 context.getSplitCompletedOffsets()
                                         .put(info.getSplitId(), 
info.getWatermark());
                                 context.getAssignedSnapshotSplit()
                                         .put(info.getSplitId(), 
info.asSnapshotSplit());
                             }
                             for (TableId tableId : 
incrementalSplit.getTableIds()) {
+                                if 
(!context.getCapturedTables().contains(tableId)) {
+                                    continue;
+                                }
                                 tableWatermarks.put(tableId, startupOffset);
                             }
                         });
+        if (!tableWatermarks.isEmpty()) {
+            this.startWithSnapshotMinimumOffset = false;
+        }
     }
 
     @Override
@@ -159,7 +171,7 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
 
     // 
------------------------------------------------------------------------------------------
 
-    public List<IncrementalSplit> createIncrementalSplits() {
+    public List<IncrementalSplit> createIncrementalSplits(boolean 
startWithSnapshotMinimumOffset) {
         Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
         assignedSplits.values().forEach(split -> 
split.getTableIds().forEach(allTables::remove));
         List<TableId>[] capturedTables = new List[incrementalParallelism];
@@ -175,12 +187,14 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
         i = 0;
         List<IncrementalSplit> incrementalSplits = new ArrayList<>();
         for (List<TableId> capturedTable : capturedTables) {
-            incrementalSplits.add(createIncrementalSplit(capturedTable, i++));
+            incrementalSplits.add(
+                    createIncrementalSplit(capturedTable, i++, 
startWithSnapshotMinimumOffset));
         }
         return incrementalSplits;
     }
 
-    private IncrementalSplit createIncrementalSplit(List<TableId> 
capturedTables, int index) {
+    private IncrementalSplit createIncrementalSplit(
+            List<TableId> capturedTables, int index, boolean 
startWithSnapshotMinimumOffset) {
         final List<SnapshotSplit> assignedSnapshotSplit =
                 context.getAssignedSnapshotSplit().values().stream()
                         .filter(split -> 
capturedTables.contains(split.getTableId()))
@@ -191,10 +205,12 @@ public class IncrementalSplitAssigner<C extends 
SourceConfig> implements SplitAs
         final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos = 
new ArrayList<>();
         Offset minOffset = null;
         for (SnapshotSplit split : assignedSnapshotSplit) {
-            // find the min offset of change log
             Offset changeLogOffset = 
splitCompletedOffsets.get(split.splitId());
-            if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
-                minOffset = changeLogOffset;
+            if (startWithSnapshotMinimumOffset) {
+                // find the min offset of change log
+                if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
+                    minOffset = changeLogOffset;
+                }
             }
             completedSnapshotSplitInfos.add(
                     new CompletedSnapshotSplitInfo(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 54e59b769..90d849b08 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -57,8 +58,6 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
 
     private final Map<String, SnapshotSplit> finishedUnackedSplits;
 
-    private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
-
     private volatile boolean running = false;
     private final int subtaskId;
 
@@ -79,7 +78,6 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
                 context);
         this.sourceConfig = sourceConfig;
         this.finishedUnackedSplits = new HashMap<>();
-        this.uncompletedIncrementalSplits = new HashMap<>();
         this.subtaskId = context.getIndexOfSubtask();
     }
 
@@ -110,15 +108,15 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
                     unfinishedSplits.add(split);
                 }
             } else {
-                // the incremental split is uncompleted
-                uncompletedIncrementalSplits.put(split.splitId(), 
split.asIncrementalSplit());
                 unfinishedSplits.add(split.asIncrementalSplit());
             }
         }
         // notify split enumerator again about the finished unacked snapshot 
splits
         reportFinishedSnapshotSplitsIfNeed();
         // add all un-finished splits (including incremental split) to 
SourceReaderBase
-        super.addSplits(unfinishedSplits);
+        if (!unfinishedSplits.isEmpty()) {
+            super.addSplits(unfinishedSplits);
+        }
     }
 
     @Override
@@ -168,16 +166,18 @@ public class IncrementalSourceReader<T, C extends 
SourceConfig>
 
     @Override
     public List<SourceSplitBase> snapshotState(long checkpointId) {
-        // unfinished splits
         List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
 
-        // add finished snapshot splits that didn't receive ack yet
-        stateSplits.addAll(finishedUnackedSplits.values());
+        // unfinished splits
+        List<SourceSplitBase> unfinishedSplits =
+                stateSplits.stream()
+                        .filter(split -> 
!finishedUnackedSplits.containsKey(split.splitId()))
+                        .collect(Collectors.toList());
 
-        // add incremental splits who are uncompleted
-        stateSplits.addAll(uncompletedIncrementalSplits.values());
+        // add finished snapshot splits that didn't receive ack yet
+        unfinishedSplits.addAll(finishedUnackedSplits.values());
 
-        return stateSplits;
+        return unfinishedSplits;
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 0113426a9..e9e59cf55 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -48,9 +48,10 @@ public class JobConfigParserTest {
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
-        Assertions.assertEquals("LocalFile", actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-default", 
actions.get(0).getName());
         Assertions.assertEquals(1, actions.get(0).getUpstream().size());
-        Assertions.assertEquals("FakeSource", 
actions.get(0).getUpstream().get(0).getName());
+        Assertions.assertEquals(
+                "Source[0]-FakeSource-default", 
actions.get(0).getUpstream().get(0).getName());
 
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(0).getParallelism());
         Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -69,10 +70,12 @@ public class JobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
 
-        Assertions.assertEquals("LocalFile", actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile-fake", 
actions.get(0).getName());
         Assertions.assertEquals(2, actions.get(0).getUpstream().size());
-        Assertions.assertEquals("FakeSource", 
actions.get(0).getUpstream().get(0).getName());
-        Assertions.assertEquals("FakeSource", 
actions.get(0).getUpstream().get(1).getName());
+        Assertions.assertEquals(
+                "Source[0]-FakeSource-fake", 
actions.get(0).getUpstream().get(0).getName());
+        Assertions.assertEquals(
+                "Source[1]-FakeSource-fake", 
actions.get(0).getUpstream().get(1).getName());
 
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(0).getParallelism());
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(1).getParallelism());
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 29bd4ec57..b7a2b0383 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
-                
"{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"},{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"}]}";
+                
"{\"vertices\":[{\"id\":2,\"name\":\"Source[0]-FakeSource-fake(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Source[1]-FakeSource-fake(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"Sink[0]-LocalFile-fake(id=1)\",\"parallelism\":6}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
         Assertions.assertEquals(result, logicalDagJson.toString());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
index c28719bf0..37477189d 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -29,11 +29,15 @@ import lombok.Setter;
 import lombok.ToString;
 import lombok.experimental.SuperBuilder;
 import lombok.experimental.Tolerate;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+@Slf4j
 @SuperBuilder(toBuilder = true)
 @Getter
 @Setter
@@ -57,6 +61,13 @@ public class ShuffleMultipleRowStrategy extends 
ShuffleStrategy {
             queue.clear();
             shuffleMap.put(queueName, queue);
         }
+
+        log.info(
+                "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
+                pipelineId,
+                inputIndex,
+                shuffleMap.keySet());
+
         return shuffleMap;
     }
 
@@ -75,18 +86,24 @@ public class ShuffleMultipleRowStrategy extends 
ShuffleStrategy {
             String queueName = generateQueueName(pipelineId, inputIndex, 
targetTableId);
             queues[inputIndex] = getIQueue(hazelcast, queueName);
         }
+
+        log.info(
+                "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
+                pipelineId,
+                targetIndex,
+                Stream.of(queues).map(e -> 
e.getName()).collect(Collectors.toList()));
+
         return queues;
     }
 
     private String generateQueueName(int pipelineId, int inputIndex, String 
tableId) {
-        return "ShuffleMultipleRow-Queue["
+        return "ShuffleMultipleRow-Queue_"
                 + getJobId()
-                + "-"
+                + "_"
                 + pipelineId
-                + "-"
+                + "_"
                 + inputIndex
-                + "-"
-                + tableId
-                + "]";
+                + "_"
+                + tableId;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
index 62460d4c5..4b69eba22 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
@@ -26,13 +26,18 @@ import lombok.Setter;
 import lombok.ToString;
 import lombok.experimental.SuperBuilder;
 import lombok.experimental.Tolerate;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+@Slf4j
 @SuperBuilder
 @Getter
 @Setter
@@ -48,7 +53,7 @@ public class ShufflePartitionStrategy extends ShuffleStrategy 
{
     public Map<String, IQueue<Record<?>>> createShuffles(
             HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
         checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
-        Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+        Map<String, IQueue<Record<?>>> shuffleMap = new LinkedHashMap<>();
         for (int targetIndex = 0; targetIndex < targetPartitions; 
targetIndex++) {
             String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
             IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
@@ -56,6 +61,13 @@ public class ShufflePartitionStrategy extends 
ShuffleStrategy {
             queue.clear();
             shuffleMap.put(queueName, queue);
         }
+
+        log.info(
+                "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
+                pipelineId,
+                inputIndex,
+                shuffleMap.keySet());
+
         return shuffleMap;
     }
 
@@ -86,12 +98,19 @@ public class ShufflePartitionStrategy extends 
ShuffleStrategy {
             String queueName = generateQueueName(pipelineId, inputIndex, 
targetIndex);
             shuffles[inputIndex] = getIQueue(hazelcast, queueName);
         }
+
+        log.info(
+                "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
+                pipelineId,
+                targetIndex,
+                Stream.of(shuffles).map(e -> 
e.getName()).collect(Collectors.toList()));
+
         return shuffles;
     }
 
     private String generateQueueName(int pipelineId, int inputIndex, int 
targetIndex) {
         return String.format(
-                "ShufflePartition-Queue[%s-%s-%s-%s]",
+                "ShufflePartition-Queue_%s_%s_%s_%s",
                 getJobId(), pipelineId, inputIndex, targetIndex);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index a767c8258..f847c2127 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -26,8 +26,8 @@ import com.hazelcast.logging.Logger;
 import lombok.NonNull;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,13 +39,13 @@ public class LogicalDagGenerator {
     private JobConfig jobConfig;
     private IdGenerator idGenerator;
 
-    private final Map<Long, LogicalVertex> logicalVertexMap = new HashMap<>();
+    private final Map<Long, LogicalVertex> logicalVertexMap = new 
LinkedHashMap<>();
 
     /**
      * key: input vertex id; <br>
      * value: target vertices id;
      */
-    private final Map<Long, Set<Long>> inputVerticesMap = new HashMap<>();
+    private final Map<Long, LinkedHashSet<Long>> inputVerticesMap = new 
LinkedHashMap<>();
 
     public LogicalDagGenerator(
             @NonNull List<Action> actions,
@@ -79,7 +79,8 @@ public class LogicalDagGenerator {
                         inputAction -> {
                             createLogicalVertex(inputAction);
                             inputVerticesMap
-                                    .computeIfAbsent(inputAction.getId(), id 
-> new HashSet<>())
+                                    .computeIfAbsent(
+                                            inputAction.getId(), id -> new 
LinkedHashSet<>())
                                     .add(logicalVertexId);
                         });
 
@@ -101,6 +102,6 @@ public class LogicalDagGenerator {
                                                                 
logicalVertexMap.get(targetId)))
                                         .collect(Collectors.toList()))
                 .flatMap(Collection::stream)
-                .collect(Collectors.toSet());
+                .collect(Collectors.toCollection(LinkedHashSet::new));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index f17d5236b..04dcbe8d0 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -185,7 +185,8 @@ public class JobConfigParser {
             List<? extends Config> sinkConfigs) {
         initRelationMap(sourceConfigs, transformConfigs);
 
-        for (Config config : sinkConfigs) {
+        for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
+            Config config = sinkConfigs.get(configIndex);
             ImmutablePair<
                             SeaTunnelSink<SeaTunnelRow, Serializable, 
Serializable, Serializable>,
                             Set<URL>>
@@ -193,10 +194,15 @@ public class JobConfigParser {
                             ConnectorInstanceLoader.loadSinkInstance(
                                     config, jobConfig.getJobContext(), 
commonPluginJars);
 
+            String sinkActionName =
+                    createSinkActionName(
+                            configIndex,
+                            sinkListImmutablePair.getLeft().getPluginName(),
+                            getTableName(config));
             SinkAction sinkAction =
                     createSinkAction(
                             idGenerator.getNextId(),
-                            sinkListImmutablePair.getLeft().getPluginName(),
+                            sinkActionName,
                             sinkListImmutablePair.getLeft(),
                             sinkListImmutablePair.getRight());
 
@@ -245,15 +251,21 @@ public class JobConfigParser {
         // of its upstream action.
         SeaTunnelDataType dataType = null;
         AtomicInteger totalParallelism = new AtomicInteger();
-        for (Config sourceConfig : sourceConfigList) {
+        for (int configIndex = 0; configIndex < sourceConfigList.size(); 
configIndex++) {
+            Config sourceConfig = sourceConfigList.get(configIndex);
             ImmutablePair<SeaTunnelSource, Set<URL>> 
seaTunnelSourceListImmutablePair =
                     ConnectorInstanceLoader.loadSourceInstance(
                             sourceConfig, jobConfig.getJobContext(), 
commonPluginJars);
             dataType = 
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
+            String sourceActionName =
+                    createSourceActionName(
+                            configIndex,
+                            
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+                            getTableName(sourceConfig));
             SourceAction sourceAction =
                     createSourceAction(
                             idGenerator.getNextId(),
-                            
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+                            sourceActionName,
                             seaTunnelSourceListImmutablePair.getLeft(),
                             seaTunnelSourceListImmutablePair.getRight());
 
@@ -274,14 +286,20 @@ public class JobConfigParser {
         } else {
             AtomicInteger totalParallelism = new AtomicInteger();
             SeaTunnelDataType<?> dataTypeResult = null;
-            for (Config config : transformConfigList) {
+            for (int configIndex = 0; configIndex < 
transformConfigList.size(); configIndex++) {
+                Config config = transformConfigList.get(configIndex);
                 ImmutablePair<SeaTunnelTransform<?>, Set<URL>> 
transformListImmutablePair =
                         ConnectorInstanceLoader.loadTransformInstance(
                                 config, jobConfig.getJobContext(), 
commonPluginJars);
+                String transformActionName =
+                        createTransformActionName(
+                                configIndex,
+                                
transformListImmutablePair.getLeft().getPluginName(),
+                                getTableName(config));
                 TransformAction transformAction =
                         createTransformAction(
                                 idGenerator.getNextId(),
-                                
transformListImmutablePair.getLeft().getPluginName(),
+                                transformActionName,
                                 transformListImmutablePair.getLeft(),
                                 transformListImmutablePair.getRight());
 
@@ -352,12 +370,11 @@ public class JobConfigParser {
         ImmutablePair<SeaTunnelSource, Set<URL>> pair =
                 ConnectorInstanceLoader.loadSourceInstance(
                         sourceConfigs.get(0), jobConfig.getJobContext(), 
commonPluginJars);
+        String sourceActionName =
+                createSourceActionName(0, pair.getLeft().getPluginName(), 
"default");
         SourceAction sourceAction =
                 createSourceAction(
-                        idGenerator.getNextId(),
-                        pair.getLeft().getPluginName(),
-                        pair.getLeft(),
-                        pair.getRight());
+                        idGenerator.getNextId(), sourceActionName, 
pair.getLeft(), pair.getRight());
         
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
         SeaTunnelDataType dataType = 
sourceAction.getSource().getProducedType();
 
@@ -370,10 +387,13 @@ public class JobConfigParser {
             transformListImmutablePair.getLeft().setTypeInfo(dataType);
 
             dataType = transformListImmutablePair.getLeft().getProducedType();
+            String transformActionName =
+                    createTransformActionName(
+                            0, 
transformListImmutablePair.getLeft().getPluginName(), "default");
             TransformAction transformAction =
                     createTransformAction(
                             idGenerator.getNextId(),
-                            
transformListImmutablePair.getLeft().getPluginName(),
+                            transformActionName,
                             Lists.newArrayList(sourceAction),
                             transformListImmutablePair.getLeft(),
                             transformListImmutablePair.getRight());
@@ -393,10 +413,12 @@ public class JobConfigParser {
                 sinkListImmutablePair =
                         ConnectorInstanceLoader.loadSinkInstance(
                                 sinkConfigs.get(0), jobConfig.getJobContext(), 
commonPluginJars);
+        String sinkActionName =
+                createSinkActionName(0, 
sinkListImmutablePair.getLeft().getPluginName(), "default");
         SinkAction sinkAction =
                 createSinkAction(
                         idGenerator.getNextId(),
-                        sinkListImmutablePair.getLeft().getPluginName(),
+                        sinkActionName,
                         Lists.newArrayList(sinkUpstreamAction),
                         sinkListImmutablePair.getLeft(),
                         sinkListImmutablePair.getRight());
@@ -489,4 +511,41 @@ public class JobConfigParser {
         }
         return new SinkAction(id, name, sink, jarUrls);
     }
+
+    static String createSourceActionName(int configIndex, String pluginName, 
String tableName) {
+        return String.format("Source[%s]-%s-%s", configIndex, pluginName, 
tableName);
+    }
+
+    static String createSinkActionName(int configIndex, String pluginName, 
String tableName) {
+        return String.format("Sink[%s]-%s-%s", configIndex, pluginName, 
tableName);
+    }
+
+    static String createTransformActionName(int configIndex, String 
pluginName, String tableName) {
+        return String.format("Transform[%s]-%s-%s", configIndex, pluginName, 
tableName);
+    }
+
+    static String getTableName(Config config) {
+        return getTableName(config, "default");
+    }
+
+    static String getTableName(Config config, String defaultValue) {
+        String sourceTableName = null;
+        if (config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+            sourceTableName = 
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
+        }
+        String resultTableName = null;
+        if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+            resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
+        }
+        if (sourceTableName != null && resultTableName != null) {
+            return String.format("%s_%s", sourceTableName, resultTableName);
+        }
+        if (sourceTableName == null) {
+            return resultTableName;
+        }
+        if (resultTableName == null) {
+            return sourceTableName;
+        }
+        return defaultValue;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 39c1a5cae..4c96a20e8 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -133,8 +133,10 @@ public class MultipleTableJobConfigParser {
             tableWithActionMap.put(tuple2._1(), tuple2._2());
         }
         List<Action> sinkActions = new ArrayList<>();
-        for (Config sinkConfig : sinkConfigs) {
-            sinkActions.addAll(parserSink(sinkConfig, classLoader, 
tableWithActionMap));
+        for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
+            Config sinkConfig = sinkConfigs.get(configIndex);
+            sinkActions.addAll(
+                    parserSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
         }
         Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
         factoryUrls.addAll(commonPluginJars);
@@ -212,11 +214,14 @@ public class MultipleTableJobConfigParser {
 
         List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
         int parallelism = getParallelism(readonlyConfig);
-        for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 :
-                sources) {
+        for (int configIndex = 0; configIndex < sources.size(); configIndex++) 
{
+            Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 =
+                    sources.get(configIndex);
             long id = idGenerator.getNextId();
+            String actionName =
+                    JobConfigParser.createSourceActionName(configIndex, 
factoryId, tableId);
             SourceAction<Object, SourceSplit, Serializable> action =
-                    new SourceAction<>(id, factoryId, tuple2._1(), 
factoryUrls);
+                    new SourceAction<>(id, actionName, tuple2._1(), 
factoryUrls);
             action.setParallelism(parallelism);
             for (CatalogTable catalogTable : tuple2._2()) {
                 actions.add(new Tuple2<>(catalogTable, action));
@@ -248,6 +253,7 @@ public class MultipleTableJobConfigParser {
     }
 
     public List<SinkAction<?, ?, ?, ?>> parserSink(
+            int configIndex,
             Config sinkConfig,
             ClassLoader classLoader,
             Map<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap) {
@@ -290,15 +296,23 @@ public class MultipleTableJobConfigParser {
             SeaTunnelSink<?, ?, ?, ?> sink =
                     FactoryUtil.createAndPrepareSink(
                             catalogTable, readonlyConfig, classLoader, 
factoryId);
+            SinkConfig actionConfig =
+                    new 
SinkConfig(catalogTable.getTableId().toTablePath().toString());
             long id = idGenerator.getNextId();
+            String actionName =
+                    JobConfigParser.createSinkActionName(
+                            configIndex,
+                            factoryId,
+                            String.format(
+                                    "%s(%s)", leftTableId, 
actionConfig.getMultipleRowTableId()));
             SinkAction<?, ?, ?, ?> sinkAction =
                     new SinkAction<>(
                             id,
-                            factoryId,
+                            actionName,
                             Collections.singletonList(leftAction),
                             sink,
                             factoryUrls,
-                            new 
SinkConfig(catalogTable.getTableId().toTablePath().toString()));
+                            actionConfig);
             handleSaveMode(sink);
             sinkAction.setParallelism(leftAction.getParallelism());
             sinkActions.add(sinkAction);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
index 634909e82..6874a94b5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
@@ -25,8 +25,8 @@ public class ActionState implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    /** The id of the action. */
-    private final String actionId;
+    /** The key of the action state. */
+    private final ActionStateKey stateKey;
 
     /** The handles to states created by the parallel actions: action index -> 
action state. */
     private final List<ActionSubtaskState> subtaskStates;
@@ -36,14 +36,14 @@ public class ActionState implements Serializable {
     /** The parallelism of the action when it was checkpointed. */
     private final int parallelism;
 
-    public ActionState(String actionId, int parallelism) {
-        this.actionId = actionId;
+    public ActionState(ActionStateKey stateKey, int parallelism) {
+        this.stateKey = stateKey;
         this.subtaskStates = Arrays.asList(new 
ActionSubtaskState[parallelism]);
         this.parallelism = parallelism;
     }
 
-    public String getActionId() {
-        return actionId;
+    public ActionStateKey getStateKey() {
+        return stateKey;
     }
 
     public List<ActionSubtaskState> getSubtaskStates() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
similarity index 64%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
index 5a126e44d..1903eaad1 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
@@ -17,15 +17,25 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
-import lombok.Data;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
 
 import java.io.Serializable;
-import java.util.List;
 
-@Data
-public class ActionSubtaskState implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private final long actionId;
-    private final int index;
-    private final List<byte[]> state;
+@ToString
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode
+public class ActionStateKey implements Serializable {
+    private String name;
+
+    public static ActionStateKey of(Action action) {
+        return new ActionStateKey("ActionStateKey - " + action.getName());
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
index 5a126e44d..20c531f53 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
@@ -25,7 +25,7 @@ import java.util.List;
 @Data
 public class ActionSubtaskState implements Serializable {
     private static final long serialVersionUID = 1L;
-    private final long actionId;
+    private final ActionStateKey stateKey;
     private final int index;
     private final List<byte[]> state;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 84054ac6e..553cd5e0b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -508,15 +508,13 @@ public class CheckpointCoordinator {
                 .collect(Collectors.toCollection(CopyOnWriteArraySet::new));
     }
 
-    private Map<Long, ActionState> getActionStates() {
+    private Map<ActionStateKey, ActionState> getActionStates() {
         // TODO: some tasks have completed and will not submit state again.
         return plan.getPipelineActions().entrySet().stream()
                 .collect(
                         Collectors.toMap(
                                 Map.Entry::getKey,
-                                entry ->
-                                        new ActionState(
-                                                
String.valueOf(entry.getKey()), entry.getValue())));
+                                entry -> new ActionState(entry.getKey(), 
entry.getValue())));
     }
 
     private Map<Long, TaskStatistics> getTaskStatistics() {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index 0001e8d78..e1fe88392 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -48,24 +48,24 @@ public class CheckpointPlan {
 
     /**
      * All actions in this pipeline. <br>
-     * key: the action id; <br>
+     * key: the action state key; <br>
      * value: the parallelism of the action;
      */
-    private final Map<Long, Integer> pipelineActions;
+    private final Map<ActionStateKey, Integer> pipelineActions;
 
     /**
      * <br>
      * key: the subtask locations; <br>
-     * value: all actions in this subtask; f0: action id, f1: action index;
+     * value: all actions in this subtask; f0: action state key, f1: action 
index;
      */
-    private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+    private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> 
subtaskActions;
 
     public static final class Builder {
         private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
         private final Set<TaskLocation> startingSubtasks = new HashSet<>();
-        private final Map<Long, Integer> pipelineActions = new HashMap<>();
+        private final Map<ActionStateKey, Integer> pipelineActions = new 
HashMap<>();
 
-        private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> 
subtaskActions =
+        private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> 
subtaskActions =
                 new HashMap<>();
 
         private Builder() {}
@@ -80,13 +80,13 @@ public class CheckpointPlan {
             return this;
         }
 
-        public Builder pipelineActions(Map<Long, Integer> pipelineActions) {
+        public Builder pipelineActions(Map<ActionStateKey, Integer> 
pipelineActions) {
             this.pipelineActions.putAll(pipelineActions);
             return this;
         }
 
         public Builder subtaskActions(
-                Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions) {
+                Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> 
subtaskActions) {
             this.subtaskActions.putAll(subtaskActions);
             return this;
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 73662eb21..8d6ea554d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -37,7 +37,7 @@ public class CompletedCheckpoint implements Checkpoint, 
Serializable {
 
     private final long completedTimestamp;
 
-    private final Map<Long, ActionState> taskStates;
+    private final Map<ActionStateKey, ActionState> taskStates;
 
     private final Map<Long, TaskStatistics> taskStatistics;
 
@@ -48,7 +48,7 @@ public class CompletedCheckpoint implements Checkpoint, 
Serializable {
             long triggerTimestamp,
             CheckpointType checkpointType,
             long completedTimestamp,
-            Map<Long, ActionState> taskStates,
+            Map<ActionStateKey, ActionState> taskStates,
             Map<Long, TaskStatistics> taskStatistics) {
         this.jobId = jobId;
         this.pipelineId = pipelineId;
@@ -89,7 +89,7 @@ public class CompletedCheckpoint implements Checkpoint, 
Serializable {
         return completedTimestamp;
     }
 
-    public Map<Long, ActionState> getTaskStates() {
+    public Map<ActionStateKey, ActionState> getTaskStates() {
         return taskStates;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index dcdccdacf..35cb85d53 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -51,7 +51,7 @@ public class PendingCheckpoint implements Checkpoint {
 
     private final Map<Long, TaskStatistics> taskStatistics;
 
-    private final Map<Long, ActionState> actionStates;
+    private final Map<ActionStateKey, ActionState> actionStates;
 
     private final CompletableFuture<CompletedCheckpoint> completableFuture;
 
@@ -65,7 +65,7 @@ public class PendingCheckpoint implements Checkpoint {
             CheckpointType checkpointType,
             Set<Long> notYetAcknowledgedTasks,
             Map<Long, TaskStatistics> taskStatistics,
-            Map<Long, ActionState> actionStates) {
+            Map<ActionStateKey, ActionState> actionStates) {
         this.jobId = jobId;
         this.pipelineId = pipelineId;
         this.checkpointId = checkpointId;
@@ -106,7 +106,7 @@ public class PendingCheckpoint implements Checkpoint {
         return taskStatistics;
     }
 
-    protected Map<Long, ActionState> getActionStates() {
+    protected Map<ActionStateKey, ActionState> getActionStates() {
         return actionStates;
     }
 
@@ -127,7 +127,7 @@ public class PendingCheckpoint implements Checkpoint {
 
         long stateSize = 0;
         for (ActionSubtaskState state : states) {
-            ActionState actionState = actionStates.get(state.getActionId());
+            ActionState actionState = actionStates.get(state.getStateKey());
             if (actionState == null) {
                 continue;
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 735addd1c..de7c1fcb2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -45,7 +45,6 @@ import lombok.extern.slf4j.Slf4j;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -145,7 +144,16 @@ public class ExecutionPlanGenerator {
 
         List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
         Collections.sort(
-                sortedLogicalEdges, 
Comparator.comparingLong(LogicalEdge::getInputVertexId));
+                sortedLogicalEdges,
+                (o1, o2) -> {
+                    if (o1.getInputVertexId() != o2.getInputVertexId()) {
+                        return o1.getInputVertexId() > o2.getInputVertexId() ? 
1 : -1;
+                    }
+                    if (o1.getTargetVertexId() != o2.getTargetVertexId()) {
+                        return o1.getTargetVertexId() > o2.getTargetVertexId() 
? 1 : -1;
+                    }
+                    return 0;
+                });
         for (LogicalEdge logicalEdge : sortedLogicalEdges) {
             LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
             ExecutionVertex executionInputVertex =
@@ -233,11 +241,7 @@ public class ExecutionPlanGenerator {
                 
ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();
 
         long shuffleVertexId = idGenerator.getNextId();
-        String shuffleActionName =
-                String.format(
-                        "Shuffle [%s -> table[0~%s]]",
-                        sourceAction.getName(),
-                        ((MultipleRowType) 
sourceProducedType).getTableIds().length - 1);
+        String shuffleActionName = String.format("Shuffle [%s]", 
sourceAction.getName());
         ShuffleAction shuffleAction =
                 new ShuffleAction(shuffleVertexId, shuffleActionName, 
shuffleConfig);
         shuffleAction.setParallelism(sourceAction.getParallelism());
@@ -358,9 +362,11 @@ public class ExecutionPlanGenerator {
                                 jars.addAll(action.getJarUrls());
                                 names.add(action.getName());
                             });
+            String transformChainActionName =
+                    String.format("TransformChain[%s]", String.join("->", 
names));
             TransformChainAction transformChainAction =
                     new TransformChainAction(
-                            newVertexId, String.join("->", names), jars, 
transforms);
+                            newVertexId, transformChainActionName, jars, 
transforms);
             
transformChainAction.setParallelism(currentVertex.getAction().getParallelism());
 
             ExecutionVertex executionVertex =
@@ -413,7 +419,24 @@ public class ExecutionPlanGenerator {
             executionVertices.add(edge.getLeftVertex());
             executionVertices.add(edge.getRightVertex());
         }
-        return new PipelineGenerator(executionVertices, new 
ArrayList<>(executionEdges))
-                .generatePipelines();
+        PipelineGenerator pipelineGenerator =
+                new PipelineGenerator(executionVertices, new 
ArrayList<>(executionEdges));
+        List<Pipeline> pipelines = pipelineGenerator.generatePipelines();
+
+        long actionCount = 0;
+        Set<String> actionNames = new HashSet<>();
+        for (Pipeline pipeline : pipelines) {
+            Integer pipelineId = pipeline.getId();
+            for (ExecutionVertex vertex : pipeline.getVertexes().values()) {
+                Action action = vertex.getAction();
+                String actionName = String.format("pipeline-%s [%s]", 
pipelineId, action.getName());
+                action.setName(actionName);
+                actionNames.add(actionName);
+                actionCount++;
+            }
+        }
+        checkArgument(actionNames.size() == actionCount, "Action name is 
duplicated");
+
+        return pipelines;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index 5e08190c2..e482a190e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.dag.execution;
 
 import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 
 import java.util.List;
 import java.util.Map;
@@ -50,9 +51,11 @@ public class Pipeline {
         return vertexes;
     }
 
-    public Map<Long, Integer> getActions() {
+    public Map<ActionStateKey, Integer> getActions() {
         return vertexes.values().stream()
                 .map(ExecutionVertex::getAction)
-                .collect(Collectors.toMap(Action::getId, 
Action::getParallelism));
+                .collect(
+                        Collectors.toMap(
+                                action -> ActionStateKey.of(action), 
Action::getParallelism));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index f65d9a8d5..c3c3fc4e4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -32,6 +32,7 @@ import 
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
 import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
@@ -114,9 +115,9 @@ public class PhysicalPlanGenerator {
     /**
      * <br>
      * key: the subtask locations; <br>
-     * value: all actions in this subtask; f0: action id, f1: action index;
+     * value: all actions in this subtask; f0: action state key, f1: action 
index;
      */
-    private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+    private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>> 
subtaskActions;
 
     private final IMap<Object, Object> runningJobStateIMap;
 
@@ -270,7 +271,7 @@ public class PhysicalPlanGenerator {
                                 subtaskActions.put(
                                         taskLocation,
                                         Collections.singleton(
-                                                
Tuple2.tuple2(sinkAction.getId(), -1)));
+                                                
Tuple2.tuple2(ActionStateKey.of(sinkAction), -1)));
 
                                 return new PhysicalVertex(
                                         atomicInteger.incrementAndGet(),
@@ -344,8 +345,10 @@ public class PhysicalPlanGenerator {
                                     long shuffleActionId = 
idGenerator.getNextId();
                                     String shuffleActionName =
                                             String.format(
-                                                    "Shuffle [table[%s] -> 
%s]",
-                                                    sinkTableIndex, 
sinkAction.getName());
+                                                    "%s -> %s -> %s",
+                                                    shuffleAction.getName(),
+                                                    sinkTableId,
+                                                    sinkAction.getName());
                                     ShuffleAction shuffleActionOfSinkFlow =
                                             new ShuffleAction(
                                                     shuffleActionId,
@@ -474,7 +477,8 @@ public class PhysicalPlanGenerator {
                             startingTasks.add(taskLocation);
                             subtaskActions.put(
                                     taskLocation,
-                                    
Collections.singleton(Tuple2.tuple2(sourceAction.getId(), -1)));
+                                    Collections.singleton(
+                                            
Tuple2.tuple2(ActionStateKey.of(sourceAction), -1)));
                             enumeratorTaskIDMap.put(sourceAction, 
taskLocation);
 
                             return new PhysicalVertex(
@@ -633,8 +637,11 @@ public class PhysicalPlanGenerator {
         pipelineTasks.add(task.getTaskLocation());
         subtaskActions.put(
                 task.getTaskLocation(),
-                task.getActionIds().stream()
-                        .map(id -> Tuple2.tuple2(id, 
task.getTaskLocation().getTaskIndex()))
+                task.getActionStateKeys().stream()
+                        .map(
+                                stateKey ->
+                                        Tuple2.tuple2(
+                                                stateKey, 
task.getTaskLocation().getTaskIndex()))
                         .collect(Collectors.toSet()));
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 9a30da6db..9f332c684 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
 import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
@@ -283,8 +284,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
         return getFlowInfo((action, set) -> set.addAll(action.getJarUrls()));
     }
 
-    public Set<Long> getActionIds() {
-        return getFlowInfo((action, set) -> set.add(action.getId()));
+    public Set<ActionStateKey> getActionStateKeys() {
+        return getFlowInfo((action, set) -> 
set.add(ActionStateKey.of(action)));
     }
 
     private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> function) {
@@ -340,10 +341,10 @@ public abstract class SeaTunnelTask extends AbstractTask {
         }
     }
 
-    public void addState(Barrier barrier, long actionId, List<byte[]> state) {
+    public void addState(Barrier barrier, ActionStateKey stateKey, 
List<byte[]> state) {
         List<ActionSubtaskState> states =
                 checkpointStates.computeIfAbsent(barrier.getId(), id -> new 
ArrayList<>());
-        states.add(new ActionSubtaskState(actionId, indexID, state));
+        states.add(new ActionSubtaskState(stateKey, indexID, state));
     }
 
     @Override
@@ -368,11 +369,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
     @Override
     public void restoreState(List<ActionSubtaskState> actionStateList) throws 
Exception {
         log.debug("restoreState for SeaTunnelTask[{}]", actionStateList);
-        Map<Long, List<ActionSubtaskState>> stateMap =
+        Map<ActionStateKey, List<ActionSubtaskState>> stateMap =
                 actionStateList.stream()
                         .collect(
                                 Collectors.groupingBy(
-                                        ActionSubtaskState::getActionId, 
Collectors.toList()));
+                                        ActionSubtaskState::getStateKey, 
Collectors.toList()));
         allCycles.stream()
                 .filter(cycle -> cycle instanceof ActionFlowLifeCycle)
                 .map(cycle -> (ActionFlowLifeCycle) cycle)
@@ -381,7 +382,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
                             try {
                                 actionFlowLifeCycle.restoreState(
                                         stateMap.getOrDefault(
-                                                
actionFlowLifeCycle.getAction().getId(),
+                                                
ActionStateKey.of(actionFlowLifeCycle.getAction()),
                                                 Collections.emptyList()));
                             } catch (Exception e) {
                                 sneakyThrow(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 9b21c0856..7f798bfbb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
@@ -221,7 +222,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
                                     this.taskLocation,
                                     (CheckpointBarrier) barrier,
                                     Collections.singletonList(
-                                            new 
ActionSubtaskState(sink.getId(), -1, states))));
+                                            new ActionSubtaskState(
+                                                    ActionStateKey.of(sink), 
-1, states))));
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 7e6b7d63d..fbf873536 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import 
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
@@ -154,7 +155,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                                     (CheckpointBarrier) barrier,
                                     Collections.singletonList(
                                             new ActionSubtaskState(
-                                                    source.getId(),
+                                                    ActionStateKey.of(source),
                                                     -1,
                                                     
Collections.singletonList(serialize)))));
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 3369fc4a9..8637ea7c3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.flow;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
@@ -80,7 +81,8 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle
                 prepareClose = true;
             }
             if (barrier.snapshot()) {
-                runningTask.addState(barrier, shuffleAction.getId(), 
Collections.emptyList());
+                runningTask.addState(
+                        barrier, ActionStateKey.of(shuffleAction), 
Collections.emptyList());
             }
             runningTask.ack(barrier);
 
@@ -106,7 +108,7 @@ public class ShuffleSinkFlowLifeCycle extends 
AbstractFlowLifeCycle
     public void close() throws IOException {
         super.close();
         for (Map.Entry<String, IQueue<Record<?>>> shuffleItem : 
shuffles.entrySet()) {
-            log.info("destroy shuffle queue[{}]", shuffleItem.getKey());
+            log.info("destroy shuffle queue: {}", shuffleItem.getKey());
             shuffleItem.getValue().destroy();
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 1365125cf..6c3559a97 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.engine.server.task.flow;
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
 import com.hazelcast.collection.IQueue;
 import com.hazelcast.core.HazelcastInstance;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
+@Slf4j
 @SuppressWarnings("MagicNumber")
 public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
         implements OneOutputFlowLifeCycle<Record<?>> {
@@ -105,7 +108,9 @@ public class ShuffleSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle
                         }
                         if (barrier.snapshot()) {
                             runningTask.addState(
-                                    barrier, shuffleAction.getId(), 
Collections.emptyList());
+                                    barrier,
+                                    ActionStateKey.of(shuffleAction),
+                                    Collections.emptyList());
                         }
                         runningTask.ack(barrier);
 
@@ -139,6 +144,7 @@ public class ShuffleSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCycle
     public void close() throws IOException {
         super.close();
         for (IQueue<Record<?>> shuffleQueue : shuffles) {
+            log.info("destroy shuffle queue: {}", shuffleQueue.getName());
             shuffleQueue.destroy();
         }
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 308642e0c..0346b7313 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -161,11 +162,12 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     }
                     List<StateT> states = 
writer.snapshotState(barrier.getId());
                     if (!writerStateSerializer.isPresent()) {
-                        runningTask.addState(barrier, sinkAction.getId(), 
Collections.emptyList());
+                        runningTask.addState(
+                                barrier, ActionStateKey.of(sinkAction), 
Collections.emptyList());
                     } else {
                         runningTask.addState(
                                 barrier,
-                                sinkAction.getId(),
+                                ActionStateKey.of(sinkAction),
                                 serializeStates(writerStateSerializer.get(), 
states));
                     }
                     if (containAggCommitter) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 17741b85c..16adec49e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
@@ -214,7 +215,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
             if (barrier.snapshot()) {
                 List<byte[]> states =
                         serializeStates(splitSerializer, 
reader.snapshotState(barrier.getId()));
-                runningTask.addState(barrier, sourceAction.getId(), states);
+                runningTask.addState(barrier, ActionStateKey.of(sourceAction), 
states);
             }
             // ack after #addState
             runningTask.ack(barrier);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index d7a9cc4e3..187aa3659 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
 import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -78,7 +79,7 @@ public class TransformFlowLifeCycle<T> extends 
ActionFlowLifeCycle
                 prepareClose = true;
             }
             if (barrier.snapshot()) {
-                runningTask.addState(barrier, action.getId(), 
Collections.emptyList());
+                runningTask.addState(barrier, ActionStateKey.of(action), 
Collections.emptyList());
             }
             // ack after #addState
             runningTask.ack(barrier);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
index 48c2b5b68..ac416766b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
@@ -39,8 +39,9 @@ public class StorageTest {
 
         Map<Long, TaskStatistics> taskStatisticsMap = new HashMap<>();
         taskStatisticsMap.put(1L, new TaskStatistics(1L, 32));
-        Map<Long, ActionState> actionStateMap = new HashMap<>();
-        actionStateMap.put(2L, new ActionState("test", 13));
+        Map<ActionStateKey, ActionState> actionStateMap = new HashMap<>();
+        ActionStateKey actionStateKey = new ActionStateKey("test-action");
+        actionStateMap.put(actionStateKey, new ActionState(actionStateKey, 
13));
         CompletedCheckpoint completedCheckpoint =
                 new CompletedCheckpoint(
                         1,

Reply via email to