This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5705c70b4d [Improve][Zeta] Remove `result_table_name` from action 
name(checkpoint state key) (#5779)
5705c70b4d is described below

commit 5705c70b4d10e0e02c72629c484c7aeb8144ed25
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 7 10:45:17 2023 +0800

    [Improve][Zeta] Remove `result_table_name` from action name(checkpoint 
state key) (#5779)
    
    Avoid `result_table_name` changes affecting state recovery
---
 .../seatunnel/api/table/factory/FactoryUtil.java   | 13 ++----
 .../engine/client/LogicalDagGeneratorTest.java     |  2 +-
 .../client/MultipleTableJobConfigParserTest.java   | 14 +++---
 .../engine/core/parse/JobConfigParser.java         | 38 +++++----------
 .../core/parse/MultipleTableJobConfigParser.java   | 54 +++++++++++-----------
 .../engine/server/checkpoint/ActionState.java      |  3 ++
 .../server/checkpoint/ActionSubtaskState.java      |  2 +
 .../server/checkpoint/CheckpointCoordinator.java   | 16 +++++++
 .../engine/server/checkpoint/CheckpointPlan.java   |  2 +
 .../server/checkpoint/CompletedCheckpoint.java     |  2 +
 .../server/checkpoint/SubtaskStatistics.java       |  2 +
 .../engine/server/checkpoint/TaskStatistics.java   |  3 ++
 .../engine/server/execution/TaskLocation.java      |  2 +
 13 files changed, 81 insertions(+), 72 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 77da03b8ed..04e74413cf 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -43,7 +43,6 @@ import scala.Tuple2;
 
 import java.io.Serializable;
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,17 +63,12 @@ public final class FactoryUtil {
     public static final String DEFAULT_ID = "default-identifier";
 
     public static <T, SplitT extends SourceSplit, StateT extends Serializable>
-            List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>>
-                    createAndPrepareSource(
-                            ReadonlyConfig options,
-                            ClassLoader classLoader,
-                            String factoryIdentifier) {
+            Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> 
createAndPrepareSource(
+                    ReadonlyConfig options, ClassLoader classLoader, String 
factoryIdentifier) {
 
         try {
             final TableSourceFactory factory =
                     discoverFactory(classLoader, TableSourceFactory.class, 
factoryIdentifier);
-            List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, 
List<CatalogTable>>> sources =
-                    new ArrayList<>();
             SeaTunnelSource<T, SplitT, StateT> source =
                     createAndPrepareSource(factory, options, classLoader);
             List<CatalogTable> catalogTables;
@@ -100,8 +94,7 @@ public final class FactoryUtil {
                 catalogTables.clear();
                 catalogTables.add(catalogTable);
             }
-            sources.add(new Tuple2<>(source, catalogTables));
-            return sources;
+            return new Tuple2<>(source, catalogTables);
         } catch (Throwable t) {
             throw new FactoryException(
                     String.format(
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 7b9ea063c5..962e6aab5b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
         LogicalDag logicalDag = logicalDagGenerator.generate();
         JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
         String result =
-                
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource-fake(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[0]-FakeSource-fake2(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-default-identifier(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"},{\"inputVertex\":\"Source[0]-FakeSource-fake2\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"}]}";
+                
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"}]}";
         Assertions.assertEquals(result, logicalDagJson.toString());
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index b44e90b469..92518ae1b1 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -54,10 +54,10 @@ public class MultipleTableJobConfigParserTest {
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
-        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
         Assertions.assertEquals(1, actions.get(0).getUpstream().size());
         Assertions.assertEquals(
-                "Source[0]-FakeSource-fake", 
actions.get(0).getUpstream().get(0).getName());
+                "Source[0]-FakeSource", 
actions.get(0).getUpstream().get(0).getName());
 
         Assertions.assertEquals(3, 
actions.get(0).getUpstream().get(0).getParallelism());
         Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -75,10 +75,10 @@ public class MultipleTableJobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
 
-        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
         Assertions.assertEquals(2, actions.get(0).getUpstream().size());
 
-        String[] expected = {"Source[0]-FakeSource-fake", 
"Source[0]-FakeSource-fake2"};
+        String[] expected = {"Source[0]-FakeSource", "Source[1]-FakeSource"};
         String[] actual = {
             actions.get(0).getUpstream().get(0).getName(),
             actions.get(0).getUpstream().get(1).getName()
@@ -106,8 +106,8 @@ public class MultipleTableJobConfigParserTest {
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(2, actions.size());
 
-        Assertions.assertEquals("Sink[0]-LocalFile-default-identifier", 
actions.get(0).getName());
-        Assertions.assertEquals("Sink[1]-LocalFile-default-identifier", 
actions.get(1).getName());
+        Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+        Assertions.assertEquals("Sink[1]-LocalFile", actions.get(1).getName());
     }
 
     @Test
@@ -122,7 +122,7 @@ public class MultipleTableJobConfigParserTest {
         ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
         List<Action> actions = parse.getLeft();
         Assertions.assertEquals(1, actions.size());
-        Assertions.assertEquals("MultiTableSink-Console", 
actions.get(0).getName());
+        Assertions.assertEquals("Sink[0]-console-MultiTableSink", 
actions.get(0).getName());
         Assertions.assertFalse(
                 ((SinkAction) 
actions.get(0)).getSink().createCommitter().isPresent());
         Assertions.assertFalse(
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 096d251465..2ef1a28aff 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.core.parse;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -56,7 +55,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
 import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
 import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
 
@@ -87,8 +85,7 @@ public class JobConfigParser {
         source.setJobContext(jobConfig.getJobContext());
         PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
         String actionName =
-                createSourceActionName(
-                        0, config.getString(CollectionConstants.PLUGIN_NAME), 
getTableName(config));
+                createSourceActionName(0, 
config.getString(CollectionConstants.PLUGIN_NAME));
         SourceAction action =
                 new SourceAction(
                         idGenerator.getNextId(),
@@ -118,8 +115,7 @@ public class JobConfigParser {
         transform.prepare(config);
         transform.setJobContext(jobConfig.getJobContext());
         transform.setTypeInfo((SeaTunnelDataType) rowType);
-        final String actionName =
-                createTransformActionName(0, tuple.getLeft().getPluginName(), 
getTableName(config));
+        final String actionName = createTransformActionName(0, 
tuple.getLeft().getPluginName());
         final TransformAction action =
                 new TransformAction(
                         idGenerator.getNextId(),
@@ -208,8 +204,7 @@ public class JobConfigParser {
             handleSaveMode(sink);
         }
         final String actionName =
-                createSinkActionName(
-                        configIndex, tuple.getLeft().getPluginName(), 
getTableName(config));
+                createSinkActionName(configIndex, 
tuple.getLeft().getPluginName());
         final SinkAction action =
                 new SinkAction<>(
                         idGenerator.getNextId(),
@@ -222,30 +217,19 @@ public class JobConfigParser {
         return action;
     }
 
-    static String createSourceActionName(int configIndex, String pluginName, 
String tableName) {
-        return String.format("Source[%s]-%s-%s", configIndex, pluginName, 
tableName);
-    }
-
-    static String createSinkActionName(int configIndex, String pluginName, 
String tableName) {
-        return String.format("Sink[%s]-%s-%s", configIndex, pluginName, 
tableName);
+    static String createSourceActionName(int configIndex, String pluginName) {
+        return String.format("Source[%s]-%s", configIndex, pluginName);
     }
 
-    static String createTransformActionName(int configIndex, String 
pluginName, String tableName) {
-        return String.format("Transform[%s]-%s-%s", configIndex, pluginName, 
tableName);
+    static String createSinkActionName(int configIndex, String pluginName) {
+        return String.format("Sink[%s]-%s", configIndex, pluginName);
     }
 
-    static String getTableName(Config config) {
-        return getTableName(config, DEFAULT_ID);
+    static String createSinkActionName(int configIndex, String pluginName, 
String table) {
+        return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
     }
 
-    static String getTableName(Config config, String defaultValue) {
-        String resultTableName = null;
-        if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
-            resultTableName = 
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
-        }
-        if (resultTableName == null) {
-            return defaultValue;
-        }
-        return resultTableName;
+    static String createTransformActionName(int configIndex, String 
pluginName) {
+        return String.format("Transform[%s]-%s", configIndex, pluginName);
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ab2de6e355..7c835e11db 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -174,9 +174,10 @@ public class MultipleTableJobConfigParser {
                 new LinkedHashMap<>();
 
         log.info("start generating all sources.");
-        for (Config sourceConfig : sourceConfigs) {
+        for (int configIndex = 0; configIndex < sourceConfigs.size(); 
configIndex++) {
+            Config sourceConfig = sourceConfigs.get(configIndex);
             Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
-                    parseSource(sourceConfig, classLoader);
+                    parseSource(configIndex, sourceConfig, classLoader);
             tableWithActionMap.put(tuple2._1(), tuple2._2());
         }
 
@@ -286,7 +287,7 @@ public class MultipleTableJobConfigParser {
     }
 
     public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(
-            Config sourceConfig, ClassLoader classLoader) {
+            int configIndex, Config sourceConfig, ClassLoader classLoader) {
         final ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(sourceConfig);
         final String factoryId = getFactoryId(readonlyConfig);
         final String tableId =
@@ -307,29 +308,23 @@ public class MultipleTableJobConfigParser {
             return new Tuple2<>(tableId, Collections.singletonList(tuple));
         }
 
-        List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>>
-                sources =
-                        FactoryUtil.createAndPrepareSource(readonlyConfig, 
classLoader, factoryId);
+        Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 =
+                FactoryUtil.createAndPrepareSource(readonlyConfig, 
classLoader, factoryId);
 
         Set<URL> factoryUrls = new HashSet<>();
         factoryUrls.addAll(getSourcePluginJarPaths(sourceConfig));
 
         List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
-        for (int configIndex = 0; configIndex < sources.size(); configIndex++) 
{
-            Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>> tuple2 =
-                    sources.get(configIndex);
-            long id = idGenerator.getNextId();
-            String actionName =
-                    JobConfigParser.createSourceActionName(configIndex, 
factoryId, tableId);
-            SeaTunnelSource<Object, SourceSplit, Serializable> source = 
tuple2._1();
-            source.setJobContext(jobConfig.getJobContext());
-            PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
-            SourceAction<Object, SourceSplit, Serializable> action =
-                    new SourceAction<>(id, actionName, tuple2._1(), 
factoryUrls, new HashSet<>());
-            action.setParallelism(parallelism);
-            for (CatalogTable catalogTable : tuple2._2()) {
-                actions.add(new Tuple2<>(catalogTable, action));
-            }
+        long id = idGenerator.getNextId();
+        String actionName = 
JobConfigParser.createSourceActionName(configIndex, factoryId);
+        SeaTunnelSource<Object, SourceSplit, Serializable> source = 
tuple2._1();
+        source.setJobContext(jobConfig.getJobContext());
+        PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
+        SourceAction<Object, SourceSplit, Serializable> action =
+                new SourceAction<>(id, actionName, tuple2._1(), factoryUrls, 
new HashSet<>());
+        action.setParallelism(parallelism);
+        for (CatalogTable catalogTable : tuple2._2()) {
+            actions.add(new Tuple2<>(catalogTable, action));
         }
         return new Tuple2<>(tableId, actions);
     }
@@ -421,9 +416,9 @@ public class MultipleTableJobConfigParser {
                         catalogTable, readonlyConfig, classLoader, factoryId);
         transform.setJobContext(jobConfig.getJobContext());
         long id = idGenerator.getNextId();
-        String actionName =
-                JobConfigParser.createTransformActionName(
-                        0, factoryId, JobConfigParser.getTableName(config));
+        // TODO If you need to support snapshot transform state, you need to 
use ordered index to
+        // generate unique names.
+        String actionName = JobConfigParser.createTransformActionName(0, 
factoryId);
 
         TransformAction transformAction =
                 new TransformAction(
@@ -574,7 +569,8 @@ public class MultipleTableJobConfigParser {
             sinkActions.add(sinkAction);
         }
         Optional<SinkAction<?, ?, ?, ?>> multiTableSink =
-                tryGenerateMultiTableSink(sinkActions, readonlyConfig, 
classLoader);
+                tryGenerateMultiTableSink(
+                        sinkActions, readonlyConfig, classLoader, factoryId, 
configIndex);
         return multiTableSink
                 .<List<SinkAction<?, ?, ?, ?>>>map(Collections::singletonList)
                 .orElse(sinkActions);
@@ -583,7 +579,9 @@ public class MultipleTableJobConfigParser {
     private Optional<SinkAction<?, ?, ?, ?>> tryGenerateMultiTableSink(
             List<SinkAction<?, ?, ?, ?>> sinkActions,
             ReadonlyConfig options,
-            ClassLoader classLoader) {
+            ClassLoader classLoader,
+            String factoryId,
+            int configIndex) {
         if (sinkActions.stream()
                 .anyMatch(action -> !(action.getSink() instanceof 
SupportMultiTableSink))) {
             log.info("Unsupported multi table sink api, rollback to sink 
template");
@@ -602,10 +600,12 @@ public class MultipleTableJobConfigParser {
                 });
         SeaTunnelSink<?, ?, ?, ?> sink =
                 FactoryUtil.createMultiTableSink(sinks, options, classLoader);
+        String actionName =
+                JobConfigParser.createSinkActionName(configIndex, factoryId, 
"MultiTableSink");
         SinkAction<?, ?, ?, ?> multiTableAction =
                 new SinkAction<>(
                         idGenerator.getNextId(),
-                        "MultiTableSink-" + 
sinkActions.get(0).getSink().getPluginName(),
+                        actionName,
                         sinkActions.get(0).getUpstream(),
                         sink,
                         jars,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
index 6874a94b53..186bf3d7d7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
@@ -17,10 +17,13 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import lombok.ToString;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 
+@ToString
 public class ActionState implements Serializable {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
index 20c531f53e..e37df418b4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.engine.server.checkpoint;
 
 import lombok.Data;
+import lombok.ToString;
 
 import java.io.Serializable;
 import java.util.List;
 
 @Data
+@ToString(exclude = "state")
 public class ActionSubtaskState implements Serializable {
     private static final long serialVersionUID = 1L;
     private final ActionStateKey stateKey;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9168cdbaac..97588b5d03 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -185,10 +185,22 @@ public class CheckpointCoordinator {
         this.pipelineTaskStatus = new ConcurrentHashMap<>();
         this.checkpointIdCounter = checkpointIdCounter;
         this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
+
+        LOG.info(
+                "Create CheckpointCoordinator for job({}@{}) with plan({})",
+                pipelineId,
+                jobId,
+                plan);
         if (pipelineState != null) {
             this.latestCompletedCheckpoint =
                     serializer.deserialize(pipelineState.getStates(), 
CompletedCheckpoint.class);
             this.latestCompletedCheckpoint.setRestored(true);
+            LOG.info(
+                    "Restore job({}@{}) with checkpoint({}), data: {}",
+                    pipelineId,
+                    jobId,
+                    latestCompletedCheckpoint.getCheckpointId(),
+                    latestCompletedCheckpoint);
         }
         this.checkpointCoordinatorFuture = new CompletableFuture();
 
@@ -280,6 +292,10 @@ public class CheckpointCoordinator {
                                 ActionState actionState =
                                         
latestCompletedCheckpoint.getTaskStates().get(tuple.f0());
                                 if (actionState == null) {
+                                    LOG.info(
+                                            "Not found task({}) state for 
key({})",
+                                            taskLocation,
+                                            tuple.f0());
                                     return;
                                 }
                                 if (COORDINATOR_INDEX.equals(tuple.f1())) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index dfab876f03..9925c7ccfc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -24,6 +24,7 @@ import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Getter;
+import lombok.ToString;
 
 import java.util.Map;
 import java.util.Set;
@@ -31,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /** checkpoint plan info */
+@ToString
 @Getter
 @Builder(builderClassName = "Builder")
 @AllArgsConstructor(access = AccessLevel.PRIVATE)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 3f196f2c8f..74be795205 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -22,10 +22,12 @@ import 
org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
 
 import lombok.Getter;
 import lombok.Setter;
+import lombok.ToString;
 
 import java.io.Serializable;
 import java.util.Map;
 
+@ToString
 public class CompletedCheckpoint implements Checkpoint, Serializable {
     private static final long serialVersionUID = 1L;
     private final long jobId;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
index 7bba1f886d..f4879d5408 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.ToString;
 
 import java.io.Serializable;
 
+@ToString
 @Getter
 @AllArgsConstructor
 public class SubtaskStatistics implements Serializable {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
index 3b00db5546..ee4344bc92 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.engine.server.checkpoint;
 
+import lombok.ToString;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -24,6 +26,7 @@ import java.util.List;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
+@ToString
 public class TaskStatistics implements Serializable {
     /** ID of the task the statistics belong to. */
     private final Long jobVertexId;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 00ee084145..5469b55556 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -25,10 +25,12 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
 import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.ToString;
 
 import java.io.IOException;
 import java.io.Serializable;
 
+@ToString
 public class TaskLocation implements IdentifiedDataSerializable, Serializable {
 
     private TaskGroupLocation taskGroupLocation;

Reply via email to