This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 013ce82558 [Improve][Checkpoint] Add Javadoc for CheckpointCoordinator
lifecycle (#10555)
013ce82558 is described below
commit 013ce82558cc5f4747ba900750adc616a5cf6040
Author: Marx-Carvalho <[email protected]>
AuthorDate: Fri Jun 5 12:33:12 2026 -0300
[Improve][Checkpoint] Add Javadoc for CheckpointCoordinator lifecycle
(#10555)
Co-authored-by: marxcarvalho <marxpiresdecarvalho@gmailcom>
---
.../server/checkpoint/CheckpointCoordinator.java | 253 ++++++++++++++++++++-
1 file changed, 250 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 0323f16d72..ba557d3fb4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -116,9 +116,32 @@ public class CheckpointCoordinator {
private final CheckpointPlan plan;
+ /**
+ * Tracks source (starting) subtasks that have finished emitting data and
are ready to close,
+ * but are waiting for the final checkpoint barrier before actual
termination.
+ *
+ * <p>This collection is used to determine when all starting subtasks have
reached a safe close
+ * point so that a final checkpoint can be triggered.
+ */
private final Set<TaskLocation> readyToCloseStartingTask;
+
+ /**
+ * Tracks source subtasks that are currently in an idle (no-data) state
and have indicated
+ * readiness to close.
+ *
+ * <p>Idle tasks may not actively emit records but must still participate
in checkpoint
+ * coordination to ensure consistent termination semantics.
+ */
private final Set<TaskLocation> readyToCloseIdleTask;
+
+ /**
+ * Tracks idle subtasks that have fully completed their close procedure.
+ *
+ * <p>This collection is used as a coordination barrier to ensure all idle
tasks are properly
+ * shut down before the checkpoint coordinator transitions to a completed
or suspended state.
+ */
@Getter private final Set<TaskLocation> closedIdleTask;
+
private final ConcurrentHashMap<Long, PendingCheckpoint>
pendingCheckpoints;
private final ArrayDeque<String> completedCheckpointIds;
@@ -133,6 +156,13 @@ public class CheckpointCoordinator {
private final AtomicInteger pendingCounter = new AtomicInteger(0);
+ /**
+ * Indicates whether a schema change operation (e.g., DDL event) is
currently in progress.
+ *
+ * <p>When set to {@code true}, normal periodic checkpoint triggering is
temporarily suspended
+ * to ensure a dedicated schema-change checkpoint is completed before
applying structural
+ * modifications.
+ */
private final AtomicBoolean schemaChanging = new AtomicBoolean(false);
private final Object lock = new Object();
@@ -140,6 +170,12 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages
anymore). */
private volatile boolean shutdown;
+ /**
+ * Marks whether all pipeline subtasks have reached the READY state.
+ *
+ * <p>This flag is set exactly once during a checkpoint cycle to prevent
duplicate invocations
+ * of the task-start notification logic and redundant checkpoint
scheduling.
+ */
private final AtomicBoolean isAllTaskReady = new AtomicBoolean(false);
private final ExecutorService executorService;
@@ -255,7 +291,21 @@ public class CheckpointCoordinator {
//
--------------------------------------------------------------------------------------------
// The start step of the coordinator
//
--------------------------------------------------------------------------------------------
-
+ /**
+ * Entry point for handling task status reports sent by running tasks.
+ *
+ * <p>This method updates the internal task status map and triggers the
appropriate state
+ * transition in the checkpoint lifecycle.
+ *
+ * <p>Status mappings:
+ *
+ * <ul>
+ * <li>{@code WAITING_RESTORE} → invokes {@link
#restoreTaskState(TaskLocation)}
+ * <li>{@code READY_START} → invokes {@link #allTaskReady()}
+ * </ul>
+ *
+ * @param operation the task status report containing task location and
status
+ */
protected void reportedTask(TaskReportStatusOperation operation) {
pipelineTaskStatus.put(operation.getLocation().getTaskID(),
operation.getStatus());
CompletableFuture.runAsync(
@@ -306,6 +356,22 @@ public class CheckpointCoordinator {
cleanPendingCheckpoint(reason);
}
+ /**
+ * Restores the execution state of the specified task from the latest
successfully completed
+ * checkpoint.
+ *
+ * <p>This method reconstructs the relevant {@link ActionSubtaskState}
instances based on the
+ * saved {@link ActionState} information. It supports both
coordinator-level and subtask-level
+ * state recovery.
+ *
+ * <p>The restoration respects the current task parallelism and ensures
that only relevant
+ * subtask states are reassigned to the recovering task.
+ *
+ * <p>If no checkpoint is available or a corresponding action state cannot
be found, the method
+ * safely skips restoration for that entry.
+ *
+ * @param taskLocation identifies the task whose state should be restored
+ */
private void restoreTaskState(TaskLocation taskLocation) {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
@@ -346,6 +412,29 @@ public class CheckpointCoordinator {
.join();
}
+ /**
+ * Verifies whether all pipeline tasks are ready to start and triggers the
next phase of
+ * execution if conditions are satisfied.
+ *
+ * <p>This method checks that:
+ *
+ * <ul>
+ * <li>All subtasks have reported their status
+ * <li>All reported statuses are {@code READY_START}
+ * </ul>
+ *
+ * <p>If all tasks are ready, it performs the following actions:
+ *
+ * <ul>
+ * <li>Ensures the operation executes only once using an atomic guard
+ * <li>Notifies all tasks to start execution
+ * <li>Marks the latest completed checkpoint as completed
+ * <li>Schedules periodic checkpoint triggering if checkpointing is
enabled
+ * </ul>
+ *
+ * <p>This method plays a central role in the checkpoint coordinator's
state machine by
+ * synchronizing the transition from task initialization to active
execution.
+ */
private void allTaskReady() {
if (pipelineTaskStatus.size() != plan.getPipelineSubtasks().size()) {
return;
@@ -401,6 +490,24 @@ public class CheckpointCoordinator {
return true;
}
+ /**
+ * Sends a start notification to all pipeline subtasks.
+ *
+ * <p>This method iterates over all subtasks defined in the execution plan
and creates a {@code
+ * NotifyTaskStartOperation} for each one. The operation is then
dispatched to the corresponding
+ * member node via the {@code checkpointManager}.
+ *
+ * <p>The start notification is sent asynchronously, and an array of
{@code InvocationFuture}
+ * objects is returned. Each future represents the remote invocation
result for a specific
+ * subtask.
+ *
+ * <p>The caller may wait for all returned futures to complete (for
example, using {@code
+ * CompletableFuture.allOf(...)}) in order to ensure that all subtasks
have successfully started
+ * before proceeding with further actions such as triggering checkpoints.
+ *
+ * @return an array of {@code InvocationFuture} instances corresponding to
the asynchronous
+ * start operations for each pipeline subtask
+ */
public InvocationFuture<?>[] notifyTaskStart() {
return plan.getPipelineSubtasks().stream()
.map(NotifyTaskStartOperation::new)
@@ -428,6 +535,20 @@ public class CheckpointCoordinator {
TimeUnit.MILLISECONDS);
}
+ /**
+ * Marks a starting task as ready to close and checks whether a final
checkpoint should be
+ * triggered.
+ *
+ * <p>This method is invoked when a starting subtask reaches a state where
it can be safely
+ * closed. The task location is recorded in the {@code
readyToCloseStartingTask} set. Once all
+ * starting subtasks defined in the execution plan have reported
readiness, a final checkpoint
+ * of type {@code COMPLETED_POINT_TYPE} is triggered.
+ *
+ * <p>The final checkpoint ensures that all state is fully persisted
before the job transitions
+ * to a terminal state, providing consistency and fault tolerance
guarantees.
+ *
+ * @param taskLocation the location metadata of the task that is ready to
close
+ */
protected void readyToClose(TaskLocation taskLocation) {
readyToCloseStartingTask.add(taskLocation);
updateReadyToCloseStartingTask();
@@ -586,7 +707,27 @@ public class CheckpointCoordinator {
isAllTaskReady.set(false);
}
}
-
+ /**
+ * Attempts to trigger a pending checkpoint based on the given checkpoint
type.
+ *
+ * <p>This method enforces several preconditions before initiating a
checkpoint:
+ *
+ * <ul>
+ * <li>The current thread must not be interrupted
+ * <li>All pipeline tasks must be in READY state
+ * <li>The configured checkpoint interval must have elapsed (except for
final or schema change
+ * checkpoints)
+ * </ul>
+ *
+ * <p>If the minimum interval has not yet passed, the checkpoint trigger
will be rescheduled for
+ * the remaining delay time.
+ *
+ * <p>This mechanism ensures stable and controlled checkpoint scheduling,
preventing excessive
+ * checkpoint triggering while maintaining data consistency.
+ *
+ * @param checkpointType the type of checkpoint to trigger, which
determines whether interval
+ * constraints should be applied
+ */
protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
if (Thread.currentThread().isInterrupted()) {
LOG.warn("currentThread already be interrupted, skip trigger
checkpoint");
@@ -955,6 +1096,35 @@ public class CheckpointCoordinator {
.toArray(InvocationFuture[]::new);
}
+ /**
+ * Cleans and aborts all pending checkpoints due to the given close reason.
+ *
+ * <p>This method forcefully terminates all in-progress {@code
PendingCheckpoint}s and resets
+ * the internal coordinator state. It is typically invoked when the
checkpoint coordinator is
+ * shutting down, resetting, or when the job reaches a terminal state.
+ *
+ * <p>The cleanup process includes:
+ *
+ * <ul>
+ * <li>Marking the coordinator as shutdown
+ * <li>Resetting task readiness state
+ * <li>Aborting all pending checkpoints with the provided {@code
CheckpointCloseReason}
+ * <li>Notifying the {@code CheckpointMonitorService} about checkpoint
failures (except when
+ * caused by a coordinator reset)
+ * <li>Clearing all internal tracking structures
+ * <li>Resetting counters and schema change flags
+ * <li>Stopping and recreating the scheduler thread pool
+ * </ul>
+ *
+ * <p>If the close reason is {@code CHECKPOINT_COORDINATOR_RESET}, the
monitor service will
+ * clear all in-progress checkpoint metadata without reporting them as
failures.
+ *
+ * <p>This method ensures that no residual checkpoint state remains in
memory and that the
+ * coordinator is ready for a clean restart if necessary.
+ *
+ * @param closedReason the reason why pending checkpoints are being
closed; determines how
+ * monitoring and cleanup are handled
+ */
protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
shutdown = true;
isAllTaskReady.set(false);
@@ -1012,7 +1182,28 @@ public class CheckpointCoordinator {
checkpointMonitorService.clearInProgress(jobId, pipelineId);
}
}
-
+ /**
+ * Processes a checkpoint acknowledgment from a task.
+ *
+ * <p>This method is invoked when a task successfully completes its
checkpoint operation and
+ * sends back an acknowledgment along with its state.
+ *
+ * <p>The coordinator performs the following actions:
+ *
+ * <ul>
+ * <li>Validates the existence of the corresponding {@link
PendingCheckpoint}
+ * <li>Registers the task acknowledgment and associated states
+ * <li>Updates subtask execution status (e.g., RUNNING or
SAVEPOINT_PREPARE_CLOSE)
+ * <li>Notifies the checkpoint monitor service, if available
+ * <li>Handles prepare-close logic for non-final checkpoints
+ * </ul>
+ *
+ * <p>If the checkpoint has already been completed or discarded, the
acknowledgment is safely
+ * ignored.
+ *
+ * @param ackOperation the acknowledgment operation containing the
checkpoint barrier, task
+ * location, and state snapshot
+ */
protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
final long checkpointId = ackOperation.getBarrier().getId();
final PendingCheckpoint pendingCheckpoint =
pendingCheckpoints.get(checkpointId);
@@ -1050,6 +1241,42 @@ public class CheckpointCoordinator {
}
}
+ /**
+ * Completes a pending checkpoint after all required task acknowledgements
have been received.
+ *
+ * <p>This method performs the finalization logic of a {@code
PendingCheckpoint}, converting it
+ * into a {@code CompletedCheckpoint}. The operation includes:
+ *
+ * <ul>
+ * <li>Logging checkpoint completion metadata (duration, trigger time,
completion time)
+ * <li>Persisting serialized checkpoint state into the configured
storage (when applicable)
+ * <li>Applying retention policy and deleting old checkpoints if
necessary
+ * <li>Updating the latest completed checkpoint reference
+ * <li>Notifying monitoring services about checkpoint completion
+ * <li>Cleaning up internal pending checkpoint structures
+ * <li>Transitioning coordinator state if the job has finished
+ * </ul>
+ *
+ * <p>The method is {@code synchronized} to ensure thread safety, since
checkpoint completion
+ * may be triggered concurrently by multiple task acknowledgements.
+ *
+ * <p>If the checkpoint type is not marked as a completed-only checkpoint
(e.g., not a final
+ * checkpoint marker), its serialized state will be stored in the
configured {@code
+ * CheckpointStorage}. The retention mechanism removes older checkpoints
based on {@code
+ * maxRetainedCheckpoints}.
+ *
+ * <p>If the job execution is determined to be fully completed after this
checkpoint, the
+ * coordinator transitions to:
+ *
+ * <ul>
+ * <li>{@code SUSPEND} state if the checkpoint is a savepoint
+ * <li>{@code FINISHED} state otherwise
+ * </ul>
+ *
+ * @param completedCheckpoint the fully acknowledged checkpoint to
finalize and persist; must
+ * not be {@code null}
+ * @throws RuntimeException if checkpoint state serialization or storage
fails
+ */
public synchronized void completePendingCheckpoint(CompletedCheckpoint
completedCheckpoint) {
LOG.debug(
"pending checkpoint completed, job id: {}, pipeline id: {},
checkpoint id: {}, "
@@ -1232,6 +1459,26 @@ public class CheckpointCoordinator {
}
}
+ /**
+ * Schedules a schema-change-before checkpoint if no schema change is
currently in progress.
+ *
+ * <p>This method ensures that a dedicated checkpoint of type {@code
+ * SCHEMA_CHANGE_BEFORE_POINT_TYPE} is triggered before applying a schema
change. It uses an
+ * atomic flag ({@code schemaChanging}) to guarantee that only one
schema-change checkpoint is
+ * scheduled at a time.
+ *
+ * <p>When invoked:
+ *
+ * <ul>
+ * <li>If no schema change is in progress, general checkpoint triggering
is effectively paused
+ * and a schema-change-before checkpoint is scheduled immediately.
+ * <li>If a schema change checkpoint is already scheduled or in
progress, the method logs a
+ * warning and does nothing.
+ * </ul>
+ *
+ * <p>This mechanism guarantees state consistency and durability before
modifying the pipeline
+ * schema, preventing inconsistencies between operator state and
structural changes.
+ */
protected void scheduleSchemaChangeBeforeCheckpoint() {
if (schemaChanging.compareAndSet(false, true)) {
LOG.info(