This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5705c70b4d [Improve][Zeta] Remove `result_table_name` from action
name(checkpoint state key) (#5779)
5705c70b4d is described below
commit 5705c70b4d10e0e02c72629c484c7aeb8144ed25
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 7 10:45:17 2023 +0800
[Improve][Zeta] Remove `result_table_name` from action name(checkpoint
state key) (#5779)
Avoid `result_table_name` changes affecting state recovery
---
.../seatunnel/api/table/factory/FactoryUtil.java | 13 ++----
.../engine/client/LogicalDagGeneratorTest.java | 2 +-
.../client/MultipleTableJobConfigParserTest.java | 14 +++---
.../engine/core/parse/JobConfigParser.java | 38 +++++----------
.../core/parse/MultipleTableJobConfigParser.java | 54 +++++++++++-----------
.../engine/server/checkpoint/ActionState.java | 3 ++
.../server/checkpoint/ActionSubtaskState.java | 2 +
.../server/checkpoint/CheckpointCoordinator.java | 16 +++++++
.../engine/server/checkpoint/CheckpointPlan.java | 2 +
.../server/checkpoint/CompletedCheckpoint.java | 2 +
.../server/checkpoint/SubtaskStatistics.java | 2 +
.../engine/server/checkpoint/TaskStatistics.java | 3 ++
.../engine/server/execution/TaskLocation.java | 2 +
13 files changed, 81 insertions(+), 72 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 77da03b8ed..04e74413cf 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -43,7 +43,6 @@ import scala.Tuple2;
import java.io.Serializable;
import java.net.URL;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -64,17 +63,12 @@ public final class FactoryUtil {
public static final String DEFAULT_ID = "default-identifier";
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
- List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>>
- createAndPrepareSource(
- ReadonlyConfig options,
- ClassLoader classLoader,
- String factoryIdentifier) {
+ Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>
createAndPrepareSource(
+ ReadonlyConfig options, ClassLoader classLoader, String
factoryIdentifier) {
try {
final TableSourceFactory factory =
discoverFactory(classLoader, TableSourceFactory.class,
factoryIdentifier);
- List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>> sources =
- new ArrayList<>();
SeaTunnelSource<T, SplitT, StateT> source =
createAndPrepareSource(factory, options, classLoader);
List<CatalogTable> catalogTables;
@@ -100,8 +94,7 @@ public final class FactoryUtil {
catalogTables.clear();
catalogTables.add(catalogTable);
}
- sources.add(new Tuple2<>(source, catalogTables));
- return sources;
+ return new Tuple2<>(source, catalogTables);
} catch (Throwable t) {
throw new FactoryException(
String.format(
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
index 7b9ea063c5..962e6aab5b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/LogicalDagGeneratorTest.java
@@ -56,7 +56,7 @@ public class LogicalDagGeneratorTest {
LogicalDag logicalDag = logicalDagGenerator.generate();
JsonObject logicalDagJson = logicalDag.getLogicalDagAsJson();
String result =
-
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource-fake(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[0]-FakeSource-fake2(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile-default-identifier(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource-fake\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"},{\"inputVertex\":\"Source[0]-FakeSource-fake2\",\"targetVertex\":\"Sink[0]-LocalFile-default-identifier\"}]}";
+
"{\"vertices\":[{\"id\":1,\"name\":\"Source[0]-FakeSource(id=1)\",\"parallelism\":3},{\"id\":2,\"name\":\"Source[1]-FakeSource(id=2)\",\"parallelism\":3},{\"id\":3,\"name\":\"Sink[0]-LocalFile(id=3)\",\"parallelism\":3}],\"edges\":[{\"inputVertex\":\"Source[0]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"},{\"inputVertex\":\"Source[1]-FakeSource\",\"targetVertex\":\"Sink[0]-LocalFile\"}]}";
Assertions.assertEquals(result, logicalDagJson.toString());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index b44e90b469..92518ae1b1 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -54,10 +54,10 @@ public class MultipleTableJobConfigParserTest {
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
Assertions.assertEquals(1, actions.get(0).getUpstream().size());
Assertions.assertEquals(
- "Source[0]-FakeSource-fake",
actions.get(0).getUpstream().get(0).getName());
+ "Source[0]-FakeSource",
actions.get(0).getUpstream().get(0).getName());
Assertions.assertEquals(3,
actions.get(0).getUpstream().get(0).getParallelism());
Assertions.assertEquals(3, actions.get(0).getParallelism());
@@ -75,10 +75,10 @@ public class MultipleTableJobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
Assertions.assertEquals(2, actions.get(0).getUpstream().size());
- String[] expected = {"Source[0]-FakeSource-fake",
"Source[0]-FakeSource-fake2"};
+ String[] expected = {"Source[0]-FakeSource", "Source[1]-FakeSource"};
String[] actual = {
actions.get(0).getUpstream().get(0).getName(),
actions.get(0).getUpstream().get(1).getName()
@@ -106,8 +106,8 @@ public class MultipleTableJobConfigParserTest {
List<Action> actions = parse.getLeft();
Assertions.assertEquals(2, actions.size());
- Assertions.assertEquals("Sink[0]-LocalFile-default-identifier",
actions.get(0).getName());
- Assertions.assertEquals("Sink[1]-LocalFile-default-identifier",
actions.get(1).getName());
+ Assertions.assertEquals("Sink[0]-LocalFile", actions.get(0).getName());
+ Assertions.assertEquals("Sink[1]-LocalFile", actions.get(1).getName());
}
@Test
@@ -122,7 +122,7 @@ public class MultipleTableJobConfigParserTest {
ImmutablePair<List<Action>, Set<URL>> parse = jobConfigParser.parse();
List<Action> actions = parse.getLeft();
Assertions.assertEquals(1, actions.size());
- Assertions.assertEquals("MultiTableSink-Console",
actions.get(0).getName());
+ Assertions.assertEquals("Sink[0]-console-MultiTableSink",
actions.get(0).getName());
Assertions.assertFalse(
((SinkAction)
actions.get(0)).getSink().createCommitter().isPresent());
Assertions.assertFalse(
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index 096d251465..2ef1a28aff 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.engine.core.parse;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -56,7 +55,6 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;
import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.checkProducedTypeEquals;
import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.handleSaveMode;
@@ -87,8 +85,7 @@ public class JobConfigParser {
source.setJobContext(jobConfig.getJobContext());
PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
String actionName =
- createSourceActionName(
- 0, config.getString(CollectionConstants.PLUGIN_NAME),
getTableName(config));
+ createSourceActionName(0,
config.getString(CollectionConstants.PLUGIN_NAME));
SourceAction action =
new SourceAction(
idGenerator.getNextId(),
@@ -118,8 +115,7 @@ public class JobConfigParser {
transform.prepare(config);
transform.setJobContext(jobConfig.getJobContext());
transform.setTypeInfo((SeaTunnelDataType) rowType);
- final String actionName =
- createTransformActionName(0, tuple.getLeft().getPluginName(),
getTableName(config));
+ final String actionName = createTransformActionName(0,
tuple.getLeft().getPluginName());
final TransformAction action =
new TransformAction(
idGenerator.getNextId(),
@@ -208,8 +204,7 @@ public class JobConfigParser {
handleSaveMode(sink);
}
final String actionName =
- createSinkActionName(
- configIndex, tuple.getLeft().getPluginName(),
getTableName(config));
+ createSinkActionName(configIndex,
tuple.getLeft().getPluginName());
final SinkAction action =
new SinkAction<>(
idGenerator.getNextId(),
@@ -222,30 +217,19 @@ public class JobConfigParser {
return action;
}
- static String createSourceActionName(int configIndex, String pluginName,
String tableName) {
- return String.format("Source[%s]-%s-%s", configIndex, pluginName,
tableName);
- }
-
- static String createSinkActionName(int configIndex, String pluginName,
String tableName) {
- return String.format("Sink[%s]-%s-%s", configIndex, pluginName,
tableName);
+ static String createSourceActionName(int configIndex, String pluginName) {
+ return String.format("Source[%s]-%s", configIndex, pluginName);
}
- static String createTransformActionName(int configIndex, String
pluginName, String tableName) {
- return String.format("Transform[%s]-%s-%s", configIndex, pluginName,
tableName);
+ static String createSinkActionName(int configIndex, String pluginName) {
+ return String.format("Sink[%s]-%s", configIndex, pluginName);
}
- static String getTableName(Config config) {
- return getTableName(config, DEFAULT_ID);
+ static String createSinkActionName(int configIndex, String pluginName,
String table) {
+ return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
}
- static String getTableName(Config config, String defaultValue) {
- String resultTableName = null;
- if (config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
- resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
- }
- if (resultTableName == null) {
- return defaultValue;
- }
- return resultTableName;
+ static String createTransformActionName(int configIndex, String
pluginName) {
+ return String.format("Transform[%s]-%s", configIndex, pluginName);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index ab2de6e355..7c835e11db 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -174,9 +174,10 @@ public class MultipleTableJobConfigParser {
new LinkedHashMap<>();
log.info("start generating all sources.");
- for (Config sourceConfig : sourceConfigs) {
+ for (int configIndex = 0; configIndex < sourceConfigs.size();
configIndex++) {
+ Config sourceConfig = sourceConfigs.get(configIndex);
Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
- parseSource(sourceConfig, classLoader);
+ parseSource(configIndex, sourceConfig, classLoader);
tableWithActionMap.put(tuple2._1(), tuple2._2());
}
@@ -286,7 +287,7 @@ public class MultipleTableJobConfigParser {
}
public Tuple2<String, List<Tuple2<CatalogTable, Action>>> parseSource(
- Config sourceConfig, ClassLoader classLoader) {
+ int configIndex, Config sourceConfig, ClassLoader classLoader) {
final ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
final String factoryId = getFactoryId(readonlyConfig);
final String tableId =
@@ -307,29 +308,23 @@ public class MultipleTableJobConfigParser {
return new Tuple2<>(tableId, Collections.singletonList(tuple));
}
- List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>>
- sources =
- FactoryUtil.createAndPrepareSource(readonlyConfig,
classLoader, factoryId);
+ Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 =
+ FactoryUtil.createAndPrepareSource(readonlyConfig,
classLoader, factoryId);
Set<URL> factoryUrls = new HashSet<>();
factoryUrls.addAll(getSourcePluginJarPaths(sourceConfig));
List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
- for (int configIndex = 0; configIndex < sources.size(); configIndex++)
{
- Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 =
- sources.get(configIndex);
- long id = idGenerator.getNextId();
- String actionName =
- JobConfigParser.createSourceActionName(configIndex,
factoryId, tableId);
- SeaTunnelSource<Object, SourceSplit, Serializable> source =
tuple2._1();
- source.setJobContext(jobConfig.getJobContext());
- PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
- SourceAction<Object, SourceSplit, Serializable> action =
- new SourceAction<>(id, actionName, tuple2._1(),
factoryUrls, new HashSet<>());
- action.setParallelism(parallelism);
- for (CatalogTable catalogTable : tuple2._2()) {
- actions.add(new Tuple2<>(catalogTable, action));
- }
+ long id = idGenerator.getNextId();
+ String actionName =
JobConfigParser.createSourceActionName(configIndex, factoryId);
+ SeaTunnelSource<Object, SourceSplit, Serializable> source =
tuple2._1();
+ source.setJobContext(jobConfig.getJobContext());
+ PluginUtil.ensureJobModeMatch(jobConfig.getJobContext(), source);
+ SourceAction<Object, SourceSplit, Serializable> action =
+ new SourceAction<>(id, actionName, tuple2._1(), factoryUrls,
new HashSet<>());
+ action.setParallelism(parallelism);
+ for (CatalogTable catalogTable : tuple2._2()) {
+ actions.add(new Tuple2<>(catalogTable, action));
}
return new Tuple2<>(tableId, actions);
}
@@ -421,9 +416,9 @@ public class MultipleTableJobConfigParser {
catalogTable, readonlyConfig, classLoader, factoryId);
transform.setJobContext(jobConfig.getJobContext());
long id = idGenerator.getNextId();
- String actionName =
- JobConfigParser.createTransformActionName(
- 0, factoryId, JobConfigParser.getTableName(config));
+ // TODO If you need to support snapshot transform state, you need to
use ordered index to
+ // generate unique names.
+ String actionName = JobConfigParser.createTransformActionName(0,
factoryId);
TransformAction transformAction =
new TransformAction(
@@ -574,7 +569,8 @@ public class MultipleTableJobConfigParser {
sinkActions.add(sinkAction);
}
Optional<SinkAction<?, ?, ?, ?>> multiTableSink =
- tryGenerateMultiTableSink(sinkActions, readonlyConfig,
classLoader);
+ tryGenerateMultiTableSink(
+ sinkActions, readonlyConfig, classLoader, factoryId,
configIndex);
return multiTableSink
.<List<SinkAction<?, ?, ?, ?>>>map(Collections::singletonList)
.orElse(sinkActions);
@@ -583,7 +579,9 @@ public class MultipleTableJobConfigParser {
private Optional<SinkAction<?, ?, ?, ?>> tryGenerateMultiTableSink(
List<SinkAction<?, ?, ?, ?>> sinkActions,
ReadonlyConfig options,
- ClassLoader classLoader) {
+ ClassLoader classLoader,
+ String factoryId,
+ int configIndex) {
if (sinkActions.stream()
.anyMatch(action -> !(action.getSink() instanceof
SupportMultiTableSink))) {
log.info("Unsupported multi table sink api, rollback to sink
template");
@@ -602,10 +600,12 @@ public class MultipleTableJobConfigParser {
});
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createMultiTableSink(sinks, options, classLoader);
+ String actionName =
+ JobConfigParser.createSinkActionName(configIndex, factoryId,
"MultiTableSink");
SinkAction<?, ?, ?, ?> multiTableAction =
new SinkAction<>(
idGenerator.getNextId(),
- "MultiTableSink-" +
sinkActions.get(0).getSink().getPluginName(),
+ actionName,
sinkActions.get(0).getUpstream(),
sink,
jars,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
index 6874a94b53..186bf3d7d7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionState.java
@@ -17,10 +17,13 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import lombok.ToString;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
+@ToString
public class ActionState implements Serializable {
private static final long serialVersionUID = 1L;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
index 20c531f53e..e37df418b4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/ActionSubtaskState.java
@@ -18,11 +18,13 @@
package org.apache.seatunnel.engine.server.checkpoint;
import lombok.Data;
+import lombok.ToString;
import java.io.Serializable;
import java.util.List;
@Data
+@ToString(exclude = "state")
public class ActionSubtaskState implements Serializable {
private static final long serialVersionUID = 1L;
private final ActionStateKey stateKey;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 9168cdbaac..97588b5d03 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -185,10 +185,22 @@ public class CheckpointCoordinator {
this.pipelineTaskStatus = new ConcurrentHashMap<>();
this.checkpointIdCounter = checkpointIdCounter;
this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
+
+ LOG.info(
+ "Create CheckpointCoordinator for job({}@{}) with plan({})",
+ pipelineId,
+ jobId,
+ plan);
if (pipelineState != null) {
this.latestCompletedCheckpoint =
serializer.deserialize(pipelineState.getStates(),
CompletedCheckpoint.class);
this.latestCompletedCheckpoint.setRestored(true);
+ LOG.info(
+ "Restore job({}@{}) with checkpoint({}), data: {}",
+ pipelineId,
+ jobId,
+ latestCompletedCheckpoint.getCheckpointId(),
+ latestCompletedCheckpoint);
}
this.checkpointCoordinatorFuture = new CompletableFuture();
@@ -280,6 +292,10 @@ public class CheckpointCoordinator {
ActionState actionState =
latestCompletedCheckpoint.getTaskStates().get(tuple.f0());
if (actionState == null) {
+ LOG.info(
+ "Not found task({}) state for
key({})",
+ taskLocation,
+ tuple.f0());
return;
}
if (COORDINATOR_INDEX.equals(tuple.f1())) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index dfab876f03..9925c7ccfc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -24,6 +24,7 @@ import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
+import lombok.ToString;
import java.util.Map;
import java.util.Set;
@@ -31,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
/** checkpoint plan info */
+@ToString
@Getter
@Builder(builderClassName = "Builder")
@AllArgsConstructor(access = AccessLevel.PRIVATE)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
index 3f196f2c8f..74be795205 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -22,10 +22,12 @@ import
org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
import java.io.Serializable;
import java.util.Map;
+@ToString
public class CompletedCheckpoint implements Checkpoint, Serializable {
private static final long serialVersionUID = 1L;
private final long jobId;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
index 7bba1f886d..f4879d5408 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
@@ -19,9 +19,11 @@ package org.apache.seatunnel.engine.server.checkpoint;
import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.ToString;
import java.io.Serializable;
+@ToString
@Getter
@AllArgsConstructor
public class SubtaskStatistics implements Serializable {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
index 3b00db5546..ee4344bc92 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.engine.server.checkpoint;
+import lombok.ToString;
+
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
@@ -24,6 +26,7 @@ import java.util.List;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+@ToString
public class TaskStatistics implements Serializable {
/** ID of the task the statistics belong to. */
private final Long jobVertexId;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index 00ee084145..5469b55556 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -25,10 +25,12 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import lombok.ToString;
import java.io.IOException;
import java.io.Serializable;
+@ToString
public class TaskLocation implements IdentifiedDataSerializable, Serializable {
private TaskGroupLocation taskGroupLocation;