This is an automated email from the ASF dual-hosted git repository.

chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 013ce82558 [Improve][Checkpoint] Add Javadoc for CheckpointCoordinator 
lifecycle (#10555)
013ce82558 is described below

commit 013ce82558cc5f4747ba900750adc616a5cf6040
Author: Marx-Carvalho <[email protected]>
AuthorDate: Fri Jun 5 12:33:12 2026 -0300

    [Improve][Checkpoint] Add Javadoc for CheckpointCoordinator lifecycle 
(#10555)
    
    Co-authored-by: marxcarvalho <marxpiresdecarvalho@gmailcom>
---
 .../server/checkpoint/CheckpointCoordinator.java   | 253 ++++++++++++++++++++-
 1 file changed, 250 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 0323f16d72..ba557d3fb4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -116,9 +116,32 @@ public class CheckpointCoordinator {
 
     private final CheckpointPlan plan;
 
+    /**
+     * Tracks source (starting) subtasks that have finished emitting data and 
are ready to close,
+     * but are waiting for the final checkpoint barrier before actual 
termination.
+     *
+     * <p>This collection is used to determine when all starting subtasks have 
reached a safe close
+     * point so that a final checkpoint can be triggered.
+     */
     private final Set<TaskLocation> readyToCloseStartingTask;
+
+    /**
+     * Tracks source subtasks that are currently in an idle (no-data) state 
and have indicated
+     * readiness to close.
+     *
+     * <p>Idle tasks may not actively emit records but must still participate 
in checkpoint
+     * coordination to ensure consistent termination semantics.
+     */
     private final Set<TaskLocation> readyToCloseIdleTask;
+
+    /**
+     * Tracks idle subtasks that have fully completed their close procedure.
+     *
+     * <p>This collection is used as a coordination barrier to ensure all idle 
tasks are properly
+     * shut down before the checkpoint coordinator transitions to a completed 
or suspended state.
+     */
     @Getter private final Set<TaskLocation> closedIdleTask;
+
     private final ConcurrentHashMap<Long, PendingCheckpoint> 
pendingCheckpoints;
 
     private final ArrayDeque<String> completedCheckpointIds;
@@ -133,6 +156,13 @@ public class CheckpointCoordinator {
 
     private final AtomicInteger pendingCounter = new AtomicInteger(0);
 
+    /**
+     * Indicates whether a schema change operation (e.g., DDL event) is 
currently in progress.
+     *
+     * <p>When set to {@code true}, normal periodic checkpoint triggering is 
temporarily suspended
+     * to ensure a dedicated schema-change checkpoint is completed before 
applying structural
+     * modifications.
+     */
     private final AtomicBoolean schemaChanging = new AtomicBoolean(false);
 
     private final Object lock = new Object();
@@ -140,6 +170,12 @@ public class CheckpointCoordinator {
     /** Flag marking the coordinator as shut down (not accepting any messages 
anymore). */
     private volatile boolean shutdown;
 
+    /**
+     * Marks whether all pipeline subtasks have reached the READY state.
+     *
+     * <p>This flag is set exactly once during a checkpoint cycle to prevent 
duplicate invocations
+     * of the task-start notification logic and redundant checkpoint 
scheduling.
+     */
     private final AtomicBoolean isAllTaskReady = new AtomicBoolean(false);
 
     private final ExecutorService executorService;
@@ -255,7 +291,21 @@ public class CheckpointCoordinator {
     // 
--------------------------------------------------------------------------------------------
     // The start step of the coordinator
     // 
--------------------------------------------------------------------------------------------
-
+    /**
+     * Entry point for handling task status reports sent by running tasks.
+     *
+     * <p>This method updates the internal task status map and triggers the 
appropriate state
+     * transition in the checkpoint lifecycle.
+     *
+     * <p>Status mappings:
+     *
+     * <ul>
+     *   <li>{@code WAITING_RESTORE} → invokes {@link 
#restoreTaskState(TaskLocation)}
+     *   <li>{@code READY_START} → invokes {@link #allTaskReady()}
+     * </ul>
+     *
+     * @param operation the task status report containing task location and 
status
+     */
     protected void reportedTask(TaskReportStatusOperation operation) {
         pipelineTaskStatus.put(operation.getLocation().getTaskID(), 
operation.getStatus());
         CompletableFuture.runAsync(
@@ -306,6 +356,22 @@ public class CheckpointCoordinator {
         cleanPendingCheckpoint(reason);
     }
 
+    /**
+     * Restores the execution state of the specified task from the latest 
successfully completed
+     * checkpoint.
+     *
+     * <p>This method reconstructs the relevant {@link ActionSubtaskState} 
instances based on the
+     * saved {@link ActionState} information. It supports both 
coordinator-level and subtask-level
+     * state recovery.
+     *
+     * <p>The restoration respects the current task parallelism and ensures 
that only relevant
+     * subtask states are reassigned to the recovering task.
+     *
+     * <p>If no checkpoint is available or a corresponding action state cannot 
be found, the method
+     * safely skips restoration for that entry.
+     *
+     * @param taskLocation identifies the task whose state should be restored
+     */
     private void restoreTaskState(TaskLocation taskLocation) {
         List<ActionSubtaskState> states = new ArrayList<>();
         if (latestCompletedCheckpoint != null) {
@@ -346,6 +412,29 @@ public class CheckpointCoordinator {
                 .join();
     }
 
+    /**
+     * Verifies whether all pipeline tasks are ready to start and triggers the 
next phase of
+     * execution if conditions are satisfied.
+     *
+     * <p>This method checks that:
+     *
+     * <ul>
+     *   <li>All subtasks have reported their status
+     *   <li>All reported statuses are {@code READY_START}
+     * </ul>
+     *
+     * <p>If all tasks are ready, it performs the following actions:
+     *
+     * <ul>
+     *   <li>Ensures the operation executes only once using an atomic guard
+     *   <li>Notifies all tasks to start execution
+     *   <li>Marks the latest completed checkpoint as completed
+     *   <li>Schedules periodic checkpoint triggering if checkpointing is 
enabled
+     * </ul>
+     *
+     * <p>This method plays a central role in the checkpoint coordinator's 
state machine by
+     * synchronizing the transition from task initialization to active 
execution.
+     */
     private void allTaskReady() {
         if (pipelineTaskStatus.size() != plan.getPipelineSubtasks().size()) {
             return;
@@ -401,6 +490,24 @@ public class CheckpointCoordinator {
         return true;
     }
 
+    /**
+     * Sends a start notification to all pipeline subtasks.
+     *
+     * <p>This method iterates over all subtasks defined in the execution plan 
and creates a {@code
+     * NotifyTaskStartOperation} for each one. The operation is then 
dispatched to the corresponding
+     * member node via the {@code checkpointManager}.
+     *
+     * <p>The start notification is sent asynchronously, and an array of 
{@code InvocationFuture}
+     * objects is returned. Each future represents the remote invocation 
result for a specific
+     * subtask.
+     *
+     * <p>The caller may wait for all returned futures to complete (for 
example, using {@code
+     * CompletableFuture.allOf(...)}) in order to ensure that all subtasks 
have successfully started
+     * before proceeding with further actions such as triggering checkpoints.
+     *
+     * @return an array of {@code InvocationFuture} instances corresponding to 
the asynchronous
+     *     start operations for each pipeline subtask
+     */
     public InvocationFuture<?>[] notifyTaskStart() {
         return plan.getPipelineSubtasks().stream()
                 .map(NotifyTaskStartOperation::new)
@@ -428,6 +535,20 @@ public class CheckpointCoordinator {
                 TimeUnit.MILLISECONDS);
     }
 
+    /**
+     * Marks a starting task as ready to close and checks whether a final 
checkpoint should be
+     * triggered.
+     *
+     * <p>This method is invoked when a starting subtask reaches a state where 
it can be safely
+     * closed. The task location is recorded in the {@code 
readyToCloseStartingTask} set. Once all
+     * starting subtasks defined in the execution plan have reported 
readiness, a final checkpoint
+     * of type {@code COMPLETED_POINT_TYPE} is triggered.
+     *
+     * <p>The final checkpoint ensures that all state is fully persisted 
before the job transitions
+     * to a terminal state, providing consistency and fault tolerance 
guarantees.
+     *
+     * @param taskLocation the location metadata of the task that is ready to 
close
+     */
     protected void readyToClose(TaskLocation taskLocation) {
         readyToCloseStartingTask.add(taskLocation);
         updateReadyToCloseStartingTask();
@@ -586,7 +707,27 @@ public class CheckpointCoordinator {
             isAllTaskReady.set(false);
         }
     }
-
+    /**
+     * Attempts to trigger a pending checkpoint based on the given checkpoint 
type.
+     *
+     * <p>This method enforces several preconditions before initiating a 
checkpoint:
+     *
+     * <ul>
+     *   <li>The current thread must not be interrupted
+     *   <li>All pipeline tasks must be in READY state
+     *   <li>The configured checkpoint interval must have elapsed (except for 
final or schema change
+     *       checkpoints)
+     * </ul>
+     *
+     * <p>If the minimum interval has not yet passed, the checkpoint trigger 
will be rescheduled for
+     * the remaining delay time.
+     *
+     * <p>This mechanism ensures stable and controlled checkpoint scheduling, 
preventing excessive
+     * checkpoint triggering while maintaining data consistency.
+     *
+     * @param checkpointType the type of checkpoint to trigger, which 
determines whether interval
+     *     constraints should be applied
+     */
     protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
         if (Thread.currentThread().isInterrupted()) {
             LOG.warn("currentThread already be interrupted, skip trigger 
checkpoint");
@@ -955,6 +1096,35 @@ public class CheckpointCoordinator {
                 .toArray(InvocationFuture[]::new);
     }
 
+    /**
+     * Cleans and aborts all pending checkpoints due to the given close reason.
+     *
+     * <p>This method forcefully terminates all in-progress {@code 
PendingCheckpoint}s and resets
+     * the internal coordinator state. It is typically invoked when the 
checkpoint coordinator is
+     * shutting down, resetting, or when the job reaches a terminal state.
+     *
+     * <p>The cleanup process includes:
+     *
+     * <ul>
+     *   <li>Marking the coordinator as shutdown
+     *   <li>Resetting task readiness state
+     *   <li>Aborting all pending checkpoints with the provided {@code 
CheckpointCloseReason}
+     *   <li>Notifying the {@code CheckpointMonitorService} about checkpoint 
failures (except when
+     *       caused by a coordinator reset)
+     *   <li>Clearing all internal tracking structures
+     *   <li>Resetting counters and schema change flags
+     *   <li>Stopping and recreating the scheduler thread pool
+     * </ul>
+     *
+     * <p>If the close reason is {@code CHECKPOINT_COORDINATOR_RESET}, the 
monitor service will
+     * clear all in-progress checkpoint metadata without reporting them as 
failures.
+     *
+     * <p>This method ensures that no residual checkpoint state remains in 
memory and that the
+     * coordinator is ready for a clean restart if necessary.
+     *
+     * @param closedReason the reason why pending checkpoints are being 
closed; determines how
+     *     monitoring and cleanup are handled
+     */
     protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
         shutdown = true;
         isAllTaskReady.set(false);
@@ -1012,7 +1182,28 @@ public class CheckpointCoordinator {
             checkpointMonitorService.clearInProgress(jobId, pipelineId);
         }
     }
-
+    /**
+     * Processes a checkpoint acknowledgment from a task.
+     *
+     * <p>This method is invoked when a task successfully completes its 
checkpoint operation and
+     * sends back an acknowledgment along with its state.
+     *
+     * <p>The coordinator performs the following actions:
+     *
+     * <ul>
+     *   <li>Validates the existence of the corresponding {@link 
PendingCheckpoint}
+     *   <li>Registers the task acknowledgment and associated states
+     *   <li>Updates subtask execution status (e.g., RUNNING or 
SAVEPOINT_PREPARE_CLOSE)
+     *   <li>Notifies the checkpoint monitor service, if available
+     *   <li>Handles prepare-close logic for non-final checkpoints
+     * </ul>
+     *
+     * <p>If the checkpoint has already been completed or discarded, the 
acknowledgment is safely
+     * ignored.
+     *
+     * @param ackOperation the acknowledgment operation containing the 
checkpoint barrier, task
+     *     location, and state snapshot
+     */
     protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
         final long checkpointId = ackOperation.getBarrier().getId();
         final PendingCheckpoint pendingCheckpoint = 
pendingCheckpoints.get(checkpointId);
@@ -1050,6 +1241,42 @@ public class CheckpointCoordinator {
         }
     }
 
+    /**
+     * Completes a pending checkpoint after all required task acknowledgements 
have been received.
+     *
+     * <p>This method performs the finalization logic of a {@code 
PendingCheckpoint}, converting it
+     * into a {@code CompletedCheckpoint}. The operation includes:
+     *
+     * <ul>
+     *   <li>Logging checkpoint completion metadata (duration, trigger time, 
completion time)
+     *   <li>Persisting serialized checkpoint state into the configured 
storage (when applicable)
+     *   <li>Applying retention policy and deleting old checkpoints if 
necessary
+     *   <li>Updating the latest completed checkpoint reference
+     *   <li>Notifying monitoring services about checkpoint completion
+     *   <li>Cleaning up internal pending checkpoint structures
+     *   <li>Transitioning coordinator state if the job has finished
+     * </ul>
+     *
+     * <p>The method is {@code synchronized} to ensure thread safety, since 
checkpoint completion
+     * may be triggered concurrently by multiple task acknowledgements.
+     *
+     * <p>If the checkpoint type is not marked as a completed-only checkpoint 
(e.g., not a final
+     * checkpoint marker), its serialized state will be stored in the 
configured {@code
+     * CheckpointStorage}. The retention mechanism removes older checkpoints 
based on {@code
+     * maxRetainedCheckpoints}.
+     *
+     * <p>If the job execution is determined to be fully completed after this 
checkpoint, the
+     * coordinator transitions to:
+     *
+     * <ul>
+     *   <li>{@code SUSPEND} state if the checkpoint is a savepoint
+     *   <li>{@code FINISHED} state otherwise
+     * </ul>
+     *
+     * @param completedCheckpoint the fully acknowledged checkpoint to 
finalize and persist; must
+     *     not be {@code null}
+     * @throws RuntimeException if checkpoint state serialization or storage 
fails
+     */
     public synchronized void completePendingCheckpoint(CompletedCheckpoint 
completedCheckpoint) {
         LOG.debug(
                 "pending checkpoint completed, job id: {}, pipeline id: {}, 
checkpoint id: {}, "
@@ -1232,6 +1459,26 @@ public class CheckpointCoordinator {
         }
     }
 
+    /**
+     * Schedules a schema-change-before checkpoint if no schema change is 
currently in progress.
+     *
+     * <p>This method ensures that a dedicated checkpoint of type {@code
+     * SCHEMA_CHANGE_BEFORE_POINT_TYPE} is triggered before applying a schema 
change. It uses an
+     * atomic flag ({@code schemaChanging}) to guarantee that only one 
schema-change checkpoint is
+     * scheduled at a time.
+     *
+     * <p>When invoked:
+     *
+     * <ul>
+     *   <li>If no schema change is in progress, general checkpoint triggering 
is effectively paused
+     *       and a schema-change-before checkpoint is scheduled immediately.
+     *   <li>If a schema change checkpoint is already scheduled or in 
progress, the method logs a
+     *       warning and does nothing.
+     * </ul>
+     *
+     * <p>This mechanism guarantees state consistency and durability before 
modifying the pipeline
+     * schema, preventing inconsistencies between operator state and 
structural changes.
+     */
     protected void scheduleSchemaChangeBeforeCheckpoint() {
         if (schemaChanging.compareAndSet(false, true)) {
             LOG.info(

Reply via email to