This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new add75d7d5 [Feature][CDC] Support add & dorp tables when restore cdc
jobs (#4254)
add75d7d5 is described below
commit add75d7d5d0af061664ff6fa42a7b3ef58eae81b
Author: hailin0 <[email protected]>
AuthorDate: Fri Mar 10 13:41:40 2023 +0800
[Feature][CDC] Support add & dorp tables when restore cdc jobs (#4254)
---
config/seatunnel.yaml | 2 +-
.../api/table/catalog/CatalogTableUtil.java | 2 +-
.../cdc/base/source/IncrementalSource.java | 70 +++++++++++++++---
.../enumerator/IncrementalSplitAssigner.java | 30 ++++++--
.../source/reader/IncrementalSourceReader.java | 24 +++----
.../engine/client/JobConfigParserTest.java | 13 ++--
.../engine/client/LogicalDagGeneratorTest.java | 2 +-
.../dag/actions/ShuffleMultipleRowStrategy.java | 29 ++++++--
.../core/dag/actions/ShufflePartitionStrategy.java | 23 +++++-
.../core/dag/logical/LogicalDagGenerator.java | 13 ++--
.../engine/core/parse/JobConfigParser.java | 83 ++++++++++++++++++----
.../core/parse/MultipleTableJobConfigParser.java | 28 ++++++--
.../engine/server/checkpoint/ActionState.java | 12 ++--
...ActionSubtaskState.java => ActionStateKey.java} | 26 ++++---
.../server/checkpoint/ActionSubtaskState.java | 2 +-
.../server/checkpoint/CheckpointCoordinator.java | 6 +-
.../engine/server/checkpoint/CheckpointPlan.java | 16 ++---
.../server/checkpoint/CompletedCheckpoint.java | 6 +-
.../server/checkpoint/PendingCheckpoint.java | 8 +--
.../dag/execution/ExecutionPlanGenerator.java | 43 ++++++++---
.../engine/server/dag/execution/Pipeline.java | 7 +-
.../server/dag/physical/PhysicalPlanGenerator.java | 23 +++---
.../engine/server/task/SeaTunnelTask.java | 15 ++--
.../server/task/SinkAggregatedCommitterTask.java | 4 +-
.../server/task/SourceSplitEnumeratorTask.java | 3 +-
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 6 +-
.../task/flow/ShuffleSourceFlowLifeCycle.java | 8 ++-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 6 +-
.../server/task/flow/SourceFlowLifeCycle.java | 3 +-
.../server/task/flow/TransformFlowLifeCycle.java | 3 +-
.../engine/server/checkpoint/StorageTest.java | 5 +-
31 files changed, 381 insertions(+), 140 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index add3996c8..8202d069c 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -34,4 +34,4 @@ seatunnel:
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot
storage.type: hdfs
- fs.defaultFS: file:/// # Ensure that the directory has written
permission
+ fs.defaultFS: file:///tmp/ # Ensure that the directory has written
permission
\ No newline at end of file
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index f5856a013..00603e3f5 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -111,7 +111,7 @@ public class CatalogTableUtil implements Serializable {
// Get the list of specified tables
List<String> tableNames =
catalogConfig.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
- if (tableNames != null && tableNames.size() > 1) {
+ if (tableNames != null && tableNames.size() >= 1) {
for (String tableName : tableNames) {
catalogTables.add(catalog.getTable(TablePath.of(tableName)));
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 98cc6eb4d..dbad777cc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -42,10 +42,12 @@ import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.SplitAssigner;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
+import
org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.SnapshotPhaseState;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader;
+import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import
org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
@@ -54,15 +56,21 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmit
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
+import com.google.common.collect.Sets;
import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
@NoArgsConstructor
public abstract class IncrementalSource<T, C extends SourceConfig>
@@ -218,16 +226,20 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
PendingSplitsState checkpointState)
throws Exception {
C sourceConfig = configFactory.create(0);
- final List<TableId> remainingTables =
- dataSourceDialect.discoverDataCollections(sourceConfig);
- SplitAssigner.Context<C> assignerContext =
- new SplitAssigner.Context<>(
- sourceConfig,
- new HashSet<>(remainingTables),
- new HashMap<>(),
- new HashMap<>());
+ Set<TableId> capturedTables =
+ new
HashSet<>(dataSourceDialect.discoverDataCollections(sourceConfig));
+
final SplitAssigner splitAssigner;
if (checkpointState instanceof HybridPendingSplitsState) {
+ checkpointState = restore(capturedTables,
(HybridPendingSplitsState) checkpointState);
+ SnapshotPhaseState checkpointSnapshotState =
+ ((HybridPendingSplitsState)
checkpointState).getSnapshotPhaseState();
+ SplitAssigner.Context<C> assignerContext =
+ new SplitAssigner.Context<>(
+ sourceConfig,
+ capturedTables,
+ checkpointSnapshotState.getAssignedSplits(),
+
checkpointSnapshotState.getSplitCompletedOffsets());
splitAssigner =
new HybridSplitAssigner<>(
assignerContext,
@@ -237,6 +249,9 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
dataSourceDialect,
offsetFactory);
} else if (checkpointState instanceof IncrementalPhaseState) {
+ SplitAssigner.Context<C> assignerContext =
+ new SplitAssigner.Context<>(
+ sourceConfig, capturedTables, new HashMap<>(), new
HashMap<>());
splitAssigner =
new IncrementalSplitAssigner<>(
assignerContext, incrementalParallelism,
offsetFactory);
@@ -246,4 +261,43 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
}
return new IncrementalSourceEnumerator(enumeratorContext,
splitAssigner);
}
+
+ private HybridPendingSplitsState restore(
+ Set<TableId> capturedTables, HybridPendingSplitsState
checkpointState) {
+ SnapshotPhaseState checkpointSnapshotState =
checkpointState.getSnapshotPhaseState();
+ Set<TableId> checkpointCapturedTables =
+ Stream.concat(
+
checkpointSnapshotState.getAlreadyProcessedTables().stream(),
+
checkpointSnapshotState.getRemainingTables().stream())
+ .collect(Collectors.toSet());
+ Set<TableId> newTables = Sets.difference(capturedTables,
checkpointCapturedTables);
+ Set<TableId> deletedTables = Sets.difference(checkpointCapturedTables,
capturedTables);
+
+ checkpointSnapshotState.getRemainingTables().addAll(newTables);
+ checkpointSnapshotState.getRemainingTables().removeAll(deletedTables);
+
checkpointSnapshotState.getAlreadyProcessedTables().removeAll(deletedTables);
+ Set<String> deletedSplitIds = new HashSet<>();
+ Iterator<SnapshotSplit> splitIterator =
+ checkpointSnapshotState.getRemainingSplits().iterator();
+ while (splitIterator.hasNext()) {
+ SnapshotSplit split = splitIterator.next();
+ if (deletedTables.contains(split.getTableId())) {
+ splitIterator.remove();
+ deletedSplitIds.add(split.splitId());
+ }
+ }
+ for (Map.Entry<String, SnapshotSplit> entry :
+ checkpointSnapshotState.getAssignedSplits().entrySet()) {
+ SnapshotSplit split = entry.getValue();
+ if (deletedTables.contains(split.getTableId())) {
+ deletedSplitIds.add(entry.getKey());
+ }
+ }
+ deletedSplitIds.forEach(
+ splitId -> {
+
checkpointSnapshotState.getAssignedSplits().remove(splitId);
+
checkpointSnapshotState.getSplitCompletedOffsets().remove(splitId);
+ });
+ return checkpointState;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
index b203ecd92..cf6caf87b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/IncrementalSplitAssigner.java
@@ -69,6 +69,8 @@ public class IncrementalSplitAssigner<C extends SourceConfig>
implements SplitAs
private final Map<String, IncrementalSplit> assignedSplits = new
HashMap<>();
+ private boolean startWithSnapshotMinimumOffset = true;
+
public IncrementalSplitAssigner(
SplitAssigner.Context<C> context,
int incrementalParallelism,
@@ -94,7 +96,8 @@ public class IncrementalSplitAssigner<C extends SourceConfig>
implements SplitAs
if (splitAssigned) {
return Optional.empty();
}
- List<IncrementalSplit> incrementalSplits = createIncrementalSplits();
+ List<IncrementalSplit> incrementalSplits =
+ createIncrementalSplits(startWithSnapshotMinimumOffset);
remainingSplits.addAll(incrementalSplits);
splitAssigned = true;
return getNext();
@@ -136,15 +139,24 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
List<CompletedSnapshotSplitInfo>
completedSnapshotSplitInfos =
incrementalSplit.getCompletedSnapshotSplitInfos();
for (CompletedSnapshotSplitInfo info :
completedSnapshotSplitInfos) {
+ if
(!context.getCapturedTables().contains(info.getTableId())) {
+ continue;
+ }
context.getSplitCompletedOffsets()
.put(info.getSplitId(),
info.getWatermark());
context.getAssignedSnapshotSplit()
.put(info.getSplitId(),
info.asSnapshotSplit());
}
for (TableId tableId :
incrementalSplit.getTableIds()) {
+ if
(!context.getCapturedTables().contains(tableId)) {
+ continue;
+ }
tableWatermarks.put(tableId, startupOffset);
}
});
+ if (!tableWatermarks.isEmpty()) {
+ this.startWithSnapshotMinimumOffset = false;
+ }
}
@Override
@@ -159,7 +171,7 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
//
------------------------------------------------------------------------------------------
- public List<IncrementalSplit> createIncrementalSplits() {
+ public List<IncrementalSplit> createIncrementalSplits(boolean
startWithSnapshotMinimumOffset) {
Set<TableId> allTables = new HashSet<>(context.getCapturedTables());
assignedSplits.values().forEach(split ->
split.getTableIds().forEach(allTables::remove));
List<TableId>[] capturedTables = new List[incrementalParallelism];
@@ -175,12 +187,14 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
i = 0;
List<IncrementalSplit> incrementalSplits = new ArrayList<>();
for (List<TableId> capturedTable : capturedTables) {
- incrementalSplits.add(createIncrementalSplit(capturedTable, i++));
+ incrementalSplits.add(
+ createIncrementalSplit(capturedTable, i++,
startWithSnapshotMinimumOffset));
}
return incrementalSplits;
}
- private IncrementalSplit createIncrementalSplit(List<TableId>
capturedTables, int index) {
+ private IncrementalSplit createIncrementalSplit(
+ List<TableId> capturedTables, int index, boolean
startWithSnapshotMinimumOffset) {
final List<SnapshotSplit> assignedSnapshotSplit =
context.getAssignedSnapshotSplit().values().stream()
.filter(split ->
capturedTables.contains(split.getTableId()))
@@ -191,10 +205,12 @@ public class IncrementalSplitAssigner<C extends
SourceConfig> implements SplitAs
final List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
new ArrayList<>();
Offset minOffset = null;
for (SnapshotSplit split : assignedSnapshotSplit) {
- // find the min offset of change log
Offset changeLogOffset =
splitCompletedOffsets.get(split.splitId());
- if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
- minOffset = changeLogOffset;
+ if (startWithSnapshotMinimumOffset) {
+ // find the min offset of change log
+ if (minOffset == null || changeLogOffset.isBefore(minOffset)) {
+ minOffset = changeLogOffset;
+ }
}
completedSnapshotSplitInfos.add(
new CompletedSnapshotSplitInfo(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
index 54e59b769..90d849b08 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
@@ -57,8 +58,6 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
private final Map<String, SnapshotSplit> finishedUnackedSplits;
- private final Map<String, IncrementalSplit> uncompletedIncrementalSplits;
-
private volatile boolean running = false;
private final int subtaskId;
@@ -79,7 +78,6 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
context);
this.sourceConfig = sourceConfig;
this.finishedUnackedSplits = new HashMap<>();
- this.uncompletedIncrementalSplits = new HashMap<>();
this.subtaskId = context.getIndexOfSubtask();
}
@@ -110,15 +108,15 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
unfinishedSplits.add(split);
}
} else {
- // the incremental split is uncompleted
- uncompletedIncrementalSplits.put(split.splitId(),
split.asIncrementalSplit());
unfinishedSplits.add(split.asIncrementalSplit());
}
}
// notify split enumerator again about the finished unacked snapshot
splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including incremental split) to
SourceReaderBase
- super.addSplits(unfinishedSplits);
+ if (!unfinishedSplits.isEmpty()) {
+ super.addSplits(unfinishedSplits);
+ }
}
@Override
@@ -168,16 +166,18 @@ public class IncrementalSourceReader<T, C extends
SourceConfig>
@Override
public List<SourceSplitBase> snapshotState(long checkpointId) {
- // unfinished splits
List<SourceSplitBase> stateSplits = super.snapshotState(checkpointId);
- // add finished snapshot splits that didn't receive ack yet
- stateSplits.addAll(finishedUnackedSplits.values());
+ // unfinished splits
+ List<SourceSplitBase> unfinishedSplits =
+ stateSplits.stream()
+ .filter(split ->
!finishedUnackedSplits.containsKey(split.splitId()))
+ .collect(Collectors.toList());
- // add incremental splits who are uncompleted
- stateSplits.addAll(uncompletedIncrementalSplits.values());
+ // add finished snapshot splits that didn't receive ack yet
+ unfinishedSplits.addAll(finishedUnackedSplits.values());
- return stateSplits;
+ return unfinishedSplits;
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
index 0113426a9..e9e59cf55 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/JobConfigParserTest.java
@@ -48,9 +48,10 @@ public class JobConfigParserTest {
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("LocalFile", actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-default",
actions.get(0).getName());
Assertions.assertEquals(1, actions.get(0).getUpstream().size());
- Assertions.assertEquals("FakeSource",
actions.get(0).getUpstream().get(0).getName());
+ Assertions.assertEquals(
+ "Source[0]-FakeSource-default",
actions.get(0).getUpstream().get(0).getName());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(0).getParallelism());
Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -69,10 +70,12 @@ public class JobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("LocalFile", actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile-fake",
actions.get(0).getName());
Assertions.assertEquals(2, actions.get(0).getUpstream().size());
- Assertions.assertEquals("FakeSource",
actions.get(0).getUpstream().get(0).getName());
- Assertions.assertEquals("FakeSource",
actions.get(0).getUpstream().get(1).getName());
+ Assertions.assertEquals(
+ "Source[0]-FakeSource-fake",
actions.get(0).getUpstream().get(0).getName());
+ Assertions.assertEquals(
+ "Source[1]-FakeSource-fake",
actions.get(0).getUpstream().get(1).getName());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(0).getParallelism());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(1).getParallelism());
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 29bd4ec57..b7a2b0383 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
-
"{\"vertices\":[{\"id\":1,\"name\":\"LocalFile(id=1)\",\"parallelism\":6},{\"id\":2,\"name\":\"FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"FakeSource(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"},{\"inputVertex\":\"FakeSource\",\"targetVertex\":\"LocalFile\"}]}";
+
"{\"vertices\":[{\"id\":2,\"name\":\"Source[0]-FakeSource-fake(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Source[1]-FakeSource-fake(id=3)\",\"parallelism\":3},{\"id\":1,\"name\":\"Sink[0]-LocalFile-fake(id=1)\",\"parallelism\":6}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"},{\"inputVertex\":\"Source[1]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-fake\"}]}";
Assertions.assertEquals(result, logicalDagJson.toString());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
index c28719bf0..37477189d 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -29,11 +29,15 @@ import lombok.Setter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.experimental.Tolerate;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+@Slf4j
@SuperBuilder(toBuilder = true)
@Getter
@Setter
@@ -57,6 +61,13 @@ public class ShuffleMultipleRowStrategy extends
ShuffleStrategy {
queue.clear();
shuffleMap.put(queueName, queue);
}
+
+ log.info(
+ "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
+ pipelineId,
+ inputIndex,
+ shuffleMap.keySet());
+
return shuffleMap;
}
@@ -75,18 +86,24 @@ public class ShuffleMultipleRowStrategy extends
ShuffleStrategy {
String queueName = generateQueueName(pipelineId, inputIndex,
targetTableId);
queues[inputIndex] = getIQueue(hazelcast, queueName);
}
+
+ log.info(
+ "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
+ pipelineId,
+ targetIndex,
+ Stream.of(queues).map(e ->
e.getName()).collect(Collectors.toList()));
+
return queues;
}
private String generateQueueName(int pipelineId, int inputIndex, String
tableId) {
- return "ShuffleMultipleRow-Queue["
+ return "ShuffleMultipleRow-Queue_"
+ getJobId()
- + "-"
+ + "_"
+ pipelineId
- + "-"
+ + "_"
+ inputIndex
- + "-"
- + tableId
- + "]";
+ + "_"
+ + tableId;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
index 62460d4c5..4b69eba22 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
@@ -26,13 +26,18 @@ import lombok.Setter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.experimental.Tolerate;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
+@Slf4j
@SuperBuilder
@Getter
@Setter
@@ -48,7 +53,7 @@ public class ShufflePartitionStrategy extends ShuffleStrategy
{
public Map<String, IQueue<Record<?>>> createShuffles(
HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
- Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ Map<String, IQueue<Record<?>>> shuffleMap = new LinkedHashMap<>();
for (int targetIndex = 0; targetIndex < targetPartitions;
targetIndex++) {
String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
@@ -56,6 +61,13 @@ public class ShufflePartitionStrategy extends
ShuffleStrategy {
queue.clear();
shuffleMap.put(queueName, queue);
}
+
+ log.info(
+ "pipeline[{}] / reader[{}] assigned shuffle queue list: {}",
+ pipelineId,
+ inputIndex,
+ shuffleMap.keySet());
+
return shuffleMap;
}
@@ -86,12 +98,19 @@ public class ShufflePartitionStrategy extends
ShuffleStrategy {
String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
shuffles[inputIndex] = getIQueue(hazelcast, queueName);
}
+
+ log.info(
+ "pipeline[{}] / writer[{}] assigned shuffle queue list: {}",
+ pipelineId,
+ targetIndex,
+ Stream.of(shuffles).map(e ->
e.getName()).collect(Collectors.toList()));
+
return shuffles;
}
private String generateQueueName(int pipelineId, int inputIndex, int
targetIndex) {
return String.format(
- "ShufflePartition-Queue[%s-%s-%s-%s]",
+ "ShufflePartition-Queue_%s_%s_%s_%s",
getJobId(), pipelineId, inputIndex, targetIndex);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
index a767c8258..f847c2127 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java
@@ -26,8 +26,8 @@ import com.hazelcast.logging.Logger;
import lombok.NonNull;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,13 +39,13 @@ public class LogicalDagGenerator {
private JobConfig jobConfig;
private IdGenerator idGenerator;
- private final Map<Long, LogicalVertex> logicalVertexMap = new HashMap<>();
+ private final Map<Long, LogicalVertex> logicalVertexMap = new
LinkedHashMap<>();
/**
* key: input vertex id; <br>
* value: target vertices id;
*/
- private final Map<Long, Set<Long>> inputVerticesMap = new HashMap<>();
+ private final Map<Long, LinkedHashSet<Long>> inputVerticesMap = new
LinkedHashMap<>();
public LogicalDagGenerator(
@NonNull List<Action> actions,
@@ -79,7 +79,8 @@ public class LogicalDagGenerator {
inputAction -> {
createLogicalVertex(inputAction);
inputVerticesMap
- .computeIfAbsent(inputAction.getId(), id
-> new HashSet<>())
+ .computeIfAbsent(
+ inputAction.getId(), id -> new
LinkedHashSet<>())
.add(logicalVertexId);
});
@@ -101,6 +102,6 @@ public class LogicalDagGenerator {
logicalVertexMap.get(targetId)))
.collect(Collectors.toList()))
.flatMap(Collection::stream)
- .collect(Collectors.toSet());
+ .collect(Collectors.toCollection(LinkedHashSet::new));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index f17d5236b..04dcbe8d0 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -185,7 +185,8 @@ public class JobConfigParser {
List<? extends Config> sinkConfigs) {
initRelationMap(sourceConfigs, transformConfigs);
- for (Config config : sinkConfigs) {
+ for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
+ Config config = sinkConfigs.get(configIndex);
ImmutablePair<
SeaTunnelSink<SeaTunnelRow, Serializable,
Serializable, Serializable>,
Set<URL>>
@@ -193,10 +194,15 @@ public class JobConfigParser {
ConnectorInstanceLoader.loadSinkInstance(
config, jobConfig.getJobContext(),
commonPluginJars);
+ String sinkActionName =
+ createSinkActionName(
+ configIndex,
+ sinkListImmutablePair.getLeft().getPluginName(),
+ getTableName(config));
SinkAction sinkAction =
createSinkAction(
idGenerator.getNextId(),
- sinkListImmutablePair.getLeft().getPluginName(),
+ sinkActionName,
sinkListImmutablePair.getLeft(),
sinkListImmutablePair.getRight());
@@ -245,15 +251,21 @@ public class JobConfigParser {
// of its upstream action.
SeaTunnelDataType dataType = null;
AtomicInteger totalParallelism = new AtomicInteger();
- for (Config sourceConfig : sourceConfigList) {
+ for (int configIndex = 0; configIndex < sourceConfigList.size();
configIndex++) {
+ Config sourceConfig = sourceConfigList.get(configIndex);
ImmutablePair<SeaTunnelSource, Set<URL>>
seaTunnelSourceListImmutablePair =
ConnectorInstanceLoader.loadSourceInstance(
sourceConfig, jobConfig.getJobContext(),
commonPluginJars);
dataType =
seaTunnelSourceListImmutablePair.getLeft().getProducedType();
+ String sourceActionName =
+ createSourceActionName(
+ configIndex,
+
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+ getTableName(sourceConfig));
SourceAction sourceAction =
createSourceAction(
idGenerator.getNextId(),
-
sourceConfig.getString(CollectionConstants.PLUGIN_NAME),
+ sourceActionName,
seaTunnelSourceListImmutablePair.getLeft(),
seaTunnelSourceListImmutablePair.getRight());
@@ -274,14 +286,20 @@ public class JobConfigParser {
} else {
AtomicInteger totalParallelism = new AtomicInteger();
SeaTunnelDataType<?> dataTypeResult = null;
- for (Config config : transformConfigList) {
+ for (int configIndex = 0; configIndex <
transformConfigList.size(); configIndex++) {
+ Config config = transformConfigList.get(configIndex);
ImmutablePair<SeaTunnelTransform<?>, Set<URL>>
transformListImmutablePair =
ConnectorInstanceLoader.loadTransformInstance(
config, jobConfig.getJobContext(),
commonPluginJars);
+ String transformActionName =
+ createTransformActionName(
+ configIndex,
+
transformListImmutablePair.getLeft().getPluginName(),
+ getTableName(config));
TransformAction transformAction =
createTransformAction(
idGenerator.getNextId(),
-
transformListImmutablePair.getLeft().getPluginName(),
+ transformActionName,
transformListImmutablePair.getLeft(),
transformListImmutablePair.getRight());
@@ -352,12 +370,11 @@ public class JobConfigParser {
ImmutablePair<SeaTunnelSource, Set<URL>> pair =
ConnectorInstanceLoader.loadSourceInstance(
sourceConfigs.get(0), jobConfig.getJobContext(),
commonPluginJars);
+ String sourceActionName =
+ createSourceActionName(0, pair.getLeft().getPluginName(),
"default");
SourceAction sourceAction =
createSourceAction(
- idGenerator.getNextId(),
- pair.getLeft().getPluginName(),
- pair.getLeft(),
- pair.getRight());
+ idGenerator.getNextId(), sourceActionName,
pair.getLeft(), pair.getRight());
sourceAction.setParallelism(getSourceParallelism(sourceConfigs.get(0)));
SeaTunnelDataType dataType =
sourceAction.getSource().getProducedType();
@@ -370,10 +387,13 @@ public class JobConfigParser {
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataType = transformListImmutablePair.getLeft().getProducedType();
+ String transformActionName =
+ createTransformActionName(
+ 0,
transformListImmutablePair.getLeft().getPluginName(), "default");
TransformAction transformAction =
createTransformAction(
idGenerator.getNextId(),
-
transformListImmutablePair.getLeft().getPluginName(),
+ transformActionName,
Lists.newArrayList(sourceAction),
transformListImmutablePair.getLeft(),
transformListImmutablePair.getRight());
@@ -393,10 +413,12 @@ public class JobConfigParser {
sinkListImmutablePair =
ConnectorInstanceLoader.loadSinkInstance(
sinkConfigs.get(0), jobConfig.getJobContext(),
commonPluginJars);
+ String sinkActionName =
+ createSinkActionName(0,
sinkListImmutablePair.getLeft().getPluginName(), "default");
SinkAction sinkAction =
createSinkAction(
idGenerator.getNextId(),
- sinkListImmutablePair.getLeft().getPluginName(),
+ sinkActionName,
Lists.newArrayList(sinkUpstreamAction),
sinkListImmutablePair.getLeft(),
sinkListImmutablePair.getRight());
@@ -489,4 +511,41 @@ public class JobConfigParser {
}
return new SinkAction(id, name, sink, jarUrls);
}
+
+ static String createSourceActionName(int configIndex, String pluginName,
String tableName) {
+ return String.format("Source[%s]-%s-%s", configIndex, pluginName,
tableName);
+ }
+
+ static String createSinkActionName(int configIndex, String pluginName,
String tableName) {
+ return String.format("Sink[%s]-%s-%s", configIndex, pluginName,
tableName);
+ }
+
+ static String createTransformActionName(int configIndex, String
pluginName, String tableName) {
+ return String.format("Transform[%s]-%s-%s", configIndex, pluginName,
tableName);
+ }
+
+ static String getTableName(Config config) {
+ return getTableName(config, "default");
+ }
+
+ static String getTableName(Config config, String defaultValue) {
+ String sourceTableName = null;
+ if (config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+ sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
+ }
+ String resultTableName = null;
+ if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+ resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
+ }
+ if (sourceTableName != null && resultTableName != null) {
+ return String.format("%s_%s", sourceTableName, resultTableName);
+ }
+ if (sourceTableName == null) {
+ return resultTableName;
+ }
+ if (resultTableName == null) {
+ return sourceTableName;
+ }
+ return defaultValue;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 39c1a5cae..4c96a20e8 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -133,8 +133,10 @@ public class MultipleTableJobConfigParser {
tableWithActionMap.put(tuple2._1(), tuple2._2());
}
List<Action> sinkActions = new ArrayList<>();
- for (Config sinkConfig : sinkConfigs) {
- sinkActions.addAll(parserSink(sinkConfig, classLoader,
tableWithActionMap));
+ for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
+ Config sinkConfig = sinkConfigs.get(configIndex);
+ sinkActions.addAll(
+ parserSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
}
Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
factoryUrls.addAll(commonPluginJars);
@@ -212,11 +214,14 @@ public class MultipleTableJobConfigParser {
List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
int parallelism = getParallelism(readonlyConfig);
- for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 :
- sources) {
+ for (int configIndex = 0; configIndex < sources.size(); configIndex++)
{
+ Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 =
+ sources.get(configIndex);
long id = idGenerator.getNextId();
+ String actionName =
+ JobConfigParser.createSourceActionName(configIndex,
factoryId, tableId);
SourceAction<Object, SourceSplit, Serializable> action =
- new SourceAction<>(id, factoryId, tuple2._1(),
factoryUrls);
+ new SourceAction<>(id, actionName, tuple2._1(),
factoryUrls);
action.setParallelism(parallelism);
for (CatalogTable catalogTable : tuple2._2()) {
actions.add(new Tuple2<>(catalogTable, action));
@@ -248,6 +253,7 @@ public class MultipleTableJobConfigParser {
}
public List<SinkAction<?, ?, ?, ?>> parserSink(
+ int configIndex,
Config sinkConfig,
ClassLoader classLoader,
Map<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap) {
@@ -290,15 +296,23 @@ public class MultipleTableJobConfigParser {
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader,
factoryId);
+ SinkConfig actionConfig =
+ new
SinkConfig(catalogTable.getTableId().toTablePath().toString());
long id = idGenerator.getNextId();
+ String actionName =
+ JobConfigParser.createSinkActionName(
+ configIndex,
+ factoryId,
+ String.format(
+ "%s(%s)", leftTableId,
actionConfig.getMultipleRowTableId()));
SinkAction<?, ?, ?, ?> sinkAction =
new SinkAction<>(
id,
- factoryId,
+ actionName,
Collections.singletonList(leftAction),
sink,
factoryUrls,
- new
SinkConfig(catalogTable.getTableId().toTablePath().toString()));
+ actionConfig);
handleSaveMode(sink);
sinkAction.setParallelism(leftAction.getParallelism());
sinkActions.add(sinkAction);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
index 634909e82..6874a94b5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
@@ -25,8 +25,8 @@ public class ActionState implements Serializable {
private static final long serialVersionUID = 1L;
- /** The id of the action. */
- private final String actionId;
+ /** The key of the action state. */
+ private final ActionStateKey stateKey;
/** The handles to states created by the parallel actions: action index ->
action state. */
private final List<ActionSubtaskState> subtaskStates;
@@ -36,14 +36,14 @@ public class ActionState implements Serializable {
/** The parallelism of the action when it was checkpointed. */
private final int parallelism;
- public ActionState(String actionId, int parallelism) {
- this.actionId = actionId;
+ public ActionState(ActionStateKey stateKey, int parallelism) {
+ this.stateKey = stateKey;
this.subtaskStates = Arrays.asList(new
ActionSubtaskState[parallelism]);
this.parallelism = parallelism;
}
- public String getActionId() {
- return actionId;
+ public ActionStateKey getStateKey() {
+ return stateKey;
}
public List<ActionSubtaskState> getSubtaskStates() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
similarity index 64%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
index 5a126e44d..1903eaad1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionStateKey.java
@@ -17,15 +17,25 @@
package org.apache.seatunnel.engine.server.checkpoint;
-import lombok.Data;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
import java.io.Serializable;
-import java.util.List;
-@Data
-public class ActionSubtaskState implements Serializable {
- private static final long serialVersionUID = 1L;
- private final long actionId;
- private final int index;
- private final List<byte[]> state;
+@ToString
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+@EqualsAndHashCode
+public class ActionStateKey implements Serializable {
+ private String name;
+
+ public static ActionStateKey of(Action action) {
+ return new ActionStateKey("ActionStateKey - " + action.getName());
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
index 5a126e44d..20c531f53 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
@@ -25,7 +25,7 @@ import java.util.List;
@Data
public class ActionSubtaskState implements Serializable {
private static final long serialVersionUID = 1L;
- private final long actionId;
+ private final ActionStateKey stateKey;
private final int index;
private final List<byte[]> state;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 84054ac6e..553cd5e0b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -508,15 +508,13 @@ public class CheckpointCoordinator {
.collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}
- private Map<Long, ActionState> getActionStates() {
+ private Map<ActionStateKey, ActionState> getActionStates() {
// TODO: some tasks have completed and will not submit state again.
return plan.getPipelineActions().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
- entry ->
- new ActionState(
-
String.valueOf(entry.getKey()), entry.getValue())));
+ entry -> new ActionState(entry.getKey(),
entry.getValue())));
}
private Map<Long, TaskStatistics> getTaskStatistics() {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index 0001e8d78..e1fe88392 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -48,24 +48,24 @@ public class CheckpointPlan {
/**
* All actions in this pipeline. <br>
- * key: the action id; <br>
+ * key: the action state key; <br>
* value: the parallelism of the action;
*/
- private final Map<Long, Integer> pipelineActions;
+ private final Map<ActionStateKey, Integer> pipelineActions;
/**
* <br>
* key: the subtask locations; <br>
- * value: all actions in this subtask; f0: action id, f1: action index;
+ * value: all actions in this subtask; f0: action state key, f1: action
index;
*/
- private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+ private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>>
subtaskActions;
public static final class Builder {
private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
private final Set<TaskLocation> startingSubtasks = new HashSet<>();
- private final Map<Long, Integer> pipelineActions = new HashMap<>();
+ private final Map<ActionStateKey, Integer> pipelineActions = new
HashMap<>();
- private final Map<TaskLocation, Set<Tuple2<Long, Integer>>>
subtaskActions =
+ private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>>
subtaskActions =
new HashMap<>();
private Builder() {}
@@ -80,13 +80,13 @@ public class CheckpointPlan {
return this;
}
- public Builder pipelineActions(Map<Long, Integer> pipelineActions) {
+ public Builder pipelineActions(Map<ActionStateKey, Integer>
pipelineActions) {
this.pipelineActions.putAll(pipelineActions);
return this;
}
public Builder subtaskActions(
- Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions) {
+ Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>>
subtaskActions) {
this.subtaskActions.putAll(subtaskActions);
return this;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 73662eb21..8d6ea554d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -37,7 +37,7 @@ public class CompletedCheckpoint implements Checkpoint,
Serializable {
private final long completedTimestamp;
- private final Map<Long, ActionState> taskStates;
+ private final Map<ActionStateKey, ActionState> taskStates;
private final Map<Long, TaskStatistics> taskStatistics;
@@ -48,7 +48,7 @@ public class CompletedCheckpoint implements Checkpoint,
Serializable {
long triggerTimestamp,
CheckpointType checkpointType,
long completedTimestamp,
- Map<Long, ActionState> taskStates,
+ Map<ActionStateKey, ActionState> taskStates,
Map<Long, TaskStatistics> taskStatistics) {
this.jobId = jobId;
this.pipelineId = pipelineId;
@@ -89,7 +89,7 @@ public class CompletedCheckpoint implements Checkpoint,
Serializable {
return completedTimestamp;
}
- public Map<Long, ActionState> getTaskStates() {
+ public Map<ActionStateKey, ActionState> getTaskStates() {
return taskStates;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
index dcdccdacf..35cb85d53 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -51,7 +51,7 @@ public class PendingCheckpoint implements Checkpoint {
private final Map<Long, TaskStatistics> taskStatistics;
- private final Map<Long, ActionState> actionStates;
+ private final Map<ActionStateKey, ActionState> actionStates;
private final CompletableFuture<CompletedCheckpoint> completableFuture;
@@ -65,7 +65,7 @@ public class PendingCheckpoint implements Checkpoint {
CheckpointType checkpointType,
Set<Long> notYetAcknowledgedTasks,
Map<Long, TaskStatistics> taskStatistics,
- Map<Long, ActionState> actionStates) {
+ Map<ActionStateKey, ActionState> actionStates) {
this.jobId = jobId;
this.pipelineId = pipelineId;
this.checkpointId = checkpointId;
@@ -106,7 +106,7 @@ public class PendingCheckpoint implements Checkpoint {
return taskStatistics;
}
- protected Map<Long, ActionState> getActionStates() {
+ protected Map<ActionStateKey, ActionState> getActionStates() {
return actionStates;
}
@@ -127,7 +127,7 @@ public class PendingCheckpoint implements Checkpoint {
long stateSize = 0;
for (ActionSubtaskState state : states) {
- ActionState actionState = actionStates.get(state.getActionId());
+ ActionState actionState = actionStates.get(state.getStateKey());
if (actionState == null) {
continue;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 735addd1c..de7c1fcb2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -45,7 +45,6 @@ import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -145,7 +144,16 @@ public class ExecutionPlanGenerator {
List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
Collections.sort(
- sortedLogicalEdges,
Comparator.comparingLong(LogicalEdge::getInputVertexId));
+ sortedLogicalEdges,
+ (o1, o2) -> {
+ if (o1.getInputVertexId() != o2.getInputVertexId()) {
+ return o1.getInputVertexId() > o2.getInputVertexId() ?
1 : -1;
+ }
+ if (o1.getTargetVertexId() != o2.getTargetVertexId()) {
+ return o1.getTargetVertexId() > o2.getTargetVertexId()
? 1 : -1;
+ }
+ return 0;
+ });
for (LogicalEdge logicalEdge : sortedLogicalEdges) {
LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
ExecutionVertex executionInputVertex =
@@ -233,11 +241,7 @@ public class ExecutionPlanGenerator {
ShuffleConfig.builder().shuffleStrategy(shuffleStrategy).build();
long shuffleVertexId = idGenerator.getNextId();
- String shuffleActionName =
- String.format(
- "Shuffle [%s -> table[0~%s]]",
- sourceAction.getName(),
- ((MultipleRowType)
sourceProducedType).getTableIds().length - 1);
+ String shuffleActionName = String.format("Shuffle [%s]",
sourceAction.getName());
ShuffleAction shuffleAction =
new ShuffleAction(shuffleVertexId, shuffleActionName,
shuffleConfig);
shuffleAction.setParallelism(sourceAction.getParallelism());
@@ -358,9 +362,11 @@ public class ExecutionPlanGenerator {
jars.addAll(action.getJarUrls());
names.add(action.getName());
});
+ String transformChainActionName =
+ String.format("TransformChain[%s]", String.join("->",
names));
TransformChainAction transformChainAction =
new TransformChainAction(
- newVertexId, String.join("->", names), jars,
transforms);
+ newVertexId, transformChainActionName, jars,
transforms);
transformChainAction.setParallelism(currentVertex.getAction().getParallelism());
ExecutionVertex executionVertex =
@@ -413,7 +419,24 @@ public class ExecutionPlanGenerator {
executionVertices.add(edge.getLeftVertex());
executionVertices.add(edge.getRightVertex());
}
- return new PipelineGenerator(executionVertices, new
ArrayList<>(executionEdges))
- .generatePipelines();
+ PipelineGenerator pipelineGenerator =
+ new PipelineGenerator(executionVertices, new
ArrayList<>(executionEdges));
+ List<Pipeline> pipelines = pipelineGenerator.generatePipelines();
+
+ long actionCount = 0;
+ Set<String> actionNames = new HashSet<>();
+ for (Pipeline pipeline : pipelines) {
+ Integer pipelineId = pipeline.getId();
+ for (ExecutionVertex vertex : pipeline.getVertexes().values()) {
+ Action action = vertex.getAction();
+ String actionName = String.format("pipeline-%s [%s]",
pipelineId, action.getName());
+ action.setName(actionName);
+ actionNames.add(actionName);
+ actionCount++;
+ }
+ }
+ checkArgument(actionNames.size() == actionCount, "Action name is
duplicated");
+
+ return pipelines;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
index 5e08190c2..e482a190e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/Pipeline.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.dag.execution;
import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import java.util.List;
import java.util.Map;
@@ -50,9 +51,11 @@ public class Pipeline {
return vertexes;
}
- public Map<Long, Integer> getActions() {
+ public Map<ActionStateKey, Integer> getActions() {
return vertexes.values().stream()
.map(ExecutionVertex::getAction)
- .collect(Collectors.toMap(Action::getId,
Action::getParallelism));
+ .collect(
+ Collectors.toMap(
+ action -> ActionStateKey.of(action),
Action::getParallelism));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index f65d9a8d5..c3c3fc4e4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -32,6 +32,7 @@ import
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
@@ -114,9 +115,9 @@ public class PhysicalPlanGenerator {
/**
* <br>
* key: the subtask locations; <br>
- * value: all actions in this subtask; f0: action id, f1: action index;
+ * value: all actions in this subtask; f0: action state key, f1: action
index;
*/
- private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+ private final Map<TaskLocation, Set<Tuple2<ActionStateKey, Integer>>>
subtaskActions;
private final IMap<Object, Object> runningJobStateIMap;
@@ -270,7 +271,7 @@ public class PhysicalPlanGenerator {
subtaskActions.put(
taskLocation,
Collections.singleton(
-
Tuple2.tuple2(sinkAction.getId(), -1)));
+
Tuple2.tuple2(ActionStateKey.of(sinkAction), -1)));
return new PhysicalVertex(
atomicInteger.incrementAndGet(),
@@ -344,8 +345,10 @@ public class PhysicalPlanGenerator {
long shuffleActionId =
idGenerator.getNextId();
String shuffleActionName =
String.format(
- "Shuffle [table[%s] ->
%s]",
- sinkTableIndex,
sinkAction.getName());
+ "%s -> %s -> %s",
+ shuffleAction.getName(),
+ sinkTableId,
+ sinkAction.getName());
ShuffleAction shuffleActionOfSinkFlow =
new ShuffleAction(
shuffleActionId,
@@ -474,7 +477,8 @@ public class PhysicalPlanGenerator {
startingTasks.add(taskLocation);
subtaskActions.put(
taskLocation,
-
Collections.singleton(Tuple2.tuple2(sourceAction.getId(), -1)));
+ Collections.singleton(
+
Tuple2.tuple2(ActionStateKey.of(sourceAction), -1)));
enumeratorTaskIDMap.put(sourceAction,
taskLocation);
return new PhysicalVertex(
@@ -633,8 +637,11 @@ public class PhysicalPlanGenerator {
pipelineTasks.add(task.getTaskLocation());
subtaskActions.put(
task.getTaskLocation(),
- task.getActionIds().stream()
- .map(id -> Tuple2.tuple2(id,
task.getTaskLocation().getTaskIndex()))
+ task.getActionStateKeys().stream()
+ .map(
+ stateKey ->
+ Tuple2.tuple2(
+ stateKey,
task.getTaskLocation().getTaskIndex()))
.collect(Collectors.toSet()));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 9a30da6db..9f332c684 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.core.dag.actions.UnknownActionException;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
@@ -283,8 +284,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
return getFlowInfo((action, set) -> set.addAll(action.getJarUrls()));
}
- public Set<Long> getActionIds() {
- return getFlowInfo((action, set) -> set.add(action.getId()));
+ public Set<ActionStateKey> getActionStateKeys() {
+ return getFlowInfo((action, set) ->
set.add(ActionStateKey.of(action)));
}
private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> function) {
@@ -340,10 +341,10 @@ public abstract class SeaTunnelTask extends AbstractTask {
}
}
- public void addState(Barrier barrier, long actionId, List<byte[]> state) {
+ public void addState(Barrier barrier, ActionStateKey stateKey,
List<byte[]> state) {
List<ActionSubtaskState> states =
checkpointStates.computeIfAbsent(barrier.getId(), id -> new
ArrayList<>());
- states.add(new ActionSubtaskState(actionId, indexID, state));
+ states.add(new ActionSubtaskState(stateKey, indexID, state));
}
@Override
@@ -368,11 +369,11 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public void restoreState(List<ActionSubtaskState> actionStateList) throws
Exception {
log.debug("restoreState for SeaTunnelTask[{}]", actionStateList);
- Map<Long, List<ActionSubtaskState>> stateMap =
+ Map<ActionStateKey, List<ActionSubtaskState>> stateMap =
actionStateList.stream()
.collect(
Collectors.groupingBy(
- ActionSubtaskState::getActionId,
Collectors.toList()));
+ ActionSubtaskState::getStateKey,
Collectors.toList()));
allCycles.stream()
.filter(cycle -> cycle instanceof ActionFlowLifeCycle)
.map(cycle -> (ActionFlowLifeCycle) cycle)
@@ -381,7 +382,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
try {
actionFlowLifeCycle.restoreState(
stateMap.getOrDefault(
-
actionFlowLifeCycle.getAction().getId(),
+
ActionStateKey.of(actionFlowLifeCycle.getAction()),
Collections.emptyList()));
} catch (Exception e) {
sneakyThrow(e);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 9b21c0856..7f798bfbb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointCloseReason;
@@ -221,7 +222,8 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
this.taskLocation,
(CheckpointBarrier) barrier,
Collections.singletonList(
- new
ActionSubtaskState(sink.getId(), -1, states))));
+ new ActionSubtaskState(
+ ActionStateKey.of(sink),
-1, states))));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 7e6b7d63d..fbf873536 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import
org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation;
@@ -154,7 +155,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
(CheckpointBarrier) barrier,
Collections.singletonList(
new ActionSubtaskState(
- source.getId(),
+ ActionStateKey.of(source),
-1,
Collections.singletonList(serialize)))));
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index 3369fc4a9..8637ea7c3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
@@ -80,7 +81,8 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle
prepareClose = true;
}
if (barrier.snapshot()) {
- runningTask.addState(barrier, shuffleAction.getId(),
Collections.emptyList());
+ runningTask.addState(
+ barrier, ActionStateKey.of(shuffleAction),
Collections.emptyList());
}
runningTask.ack(barrier);
@@ -106,7 +108,7 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle
public void close() throws IOException {
super.close();
for (Map.Entry<String, IQueue<Record<?>>> shuffleItem :
shuffles.entrySet()) {
- log.info("destroy shuffle queue[{}]", shuffleItem.getKey());
+ log.info("destroy shuffle queue: {}", shuffleItem.getKey());
shuffleItem.getValue().destroy();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index 1365125cf..6c3559a97 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -20,11 +20,13 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+@Slf4j
@SuppressWarnings("MagicNumber")
public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
implements OneOutputFlowLifeCycle<Record<?>> {
@@ -105,7 +108,9 @@ public class ShuffleSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle
}
if (barrier.snapshot()) {
runningTask.addState(
- barrier, shuffleAction.getId(),
Collections.emptyList());
+ barrier,
+ ActionStateKey.of(shuffleAction),
+ Collections.emptyList());
}
runningTask.ack(barrier);
@@ -139,6 +144,7 @@ public class ShuffleSourceFlowLifeCycle<T> extends
AbstractFlowLifeCycle
public void close() throws IOException {
super.close();
for (IQueue<Record<?>> shuffleQueue : shuffles) {
+ log.info("destroy shuffle queue: {}", shuffleQueue.getName());
shuffleQueue.destroy();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 308642e0c..0346b7313 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -161,11 +162,12 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
}
List<StateT> states =
writer.snapshotState(barrier.getId());
if (!writerStateSerializer.isPresent()) {
- runningTask.addState(barrier, sinkAction.getId(),
Collections.emptyList());
+ runningTask.addState(
+ barrier, ActionStateKey.of(sinkAction),
Collections.emptyList());
} else {
runningTask.addState(
barrier,
- sinkAction.getId(),
+ ActionStateKey.of(sinkAction),
serializeStates(writerStateSerializer.get(),
states));
}
if (containAggCommitter) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 17741b85c..16adec49e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
@@ -214,7 +215,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
if (barrier.snapshot()) {
List<byte[]> states =
serializeStates(splitSerializer,
reader.snapshotState(barrier.getId()));
- runningTask.addState(barrier, sourceAction.getId(), states);
+ runningTask.addState(barrier, ActionStateKey.of(sourceAction),
states);
}
// ack after #addState
runningTask.ack(barrier);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
index d7a9cc4e3..187aa3659 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
+import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
@@ -78,7 +79,7 @@ public class TransformFlowLifeCycle<T> extends
ActionFlowLifeCycle
prepareClose = true;
}
if (barrier.snapshot()) {
- runningTask.addState(barrier, action.getId(),
Collections.emptyList());
+ runningTask.addState(barrier, ActionStateKey.of(action),
Collections.emptyList());
}
// ack after #addState
runningTask.ack(barrier);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
index 48c2b5b68..ac416766b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/StorageTest.java
@@ -39,8 +39,9 @@ public class StorageTest {
Map<Long, TaskStatistics> taskStatisticsMap = new HashMap<>();
taskStatisticsMap.put(1L, new TaskStatistics(1L, 32));
- Map<Long, ActionState> actionStateMap = new HashMap<>();
- actionStateMap.put(2L, new ActionState("test", 13));
+ Map<ActionStateKey, ActionState> actionStateMap = new HashMap<>();
+ ActionStateKey actionStateKey = new ActionStateKey("test-action");
+ actionStateMap.put(actionStateKey, new ActionState(actionStateKey,
13));
CompletedCheckpoint completedCheckpoint =
new CompletedCheckpoint(
1,