This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c84f62fb12 [Docs][Core] Add Javadoc to SeaTunnelTask (state machine 
and lifecycle) (#10559)
c84f62fb12 is described below

commit c84f62fb123e0f7aaf747f507806d6760e8f313c
Author: Ramesh Reddy Adutla 
<[email protected]>
AuthorDate: Sun Mar 8 00:39:06 2026 +0000

    [Docs][Core] Add Javadoc to SeaTunnelTask (state machine and lifecycle) 
(#10559)
    
    Co-authored-by: Copilot <[email protected]>
---
 .../engine/server/task/SeaTunnelTask.java          | 132 +++++++++++++++++++++
 1 file changed, 132 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index d95f7ef9eb..3b5c24fe15 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -86,6 +86,25 @@ import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.STARTING;
 import static 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
 
+/**
+ * Abstract base class for all Zeta engine task executions.
+ *
+ * <p>A {@code SeaTunnelTask} drives the lifecycle of a single pipeline 
subtask. It holds the
+ * execution DAG as a {@link Flow} graph, converts that graph into a chain of 
{@link FlowLifeCycle}
+ * objects during {@link #init()}, and then repeatedly calls {@link 
#stateProcess()} to advance
+ * through the task state machine:
+ *
+ * <pre>
+ *   CREATED → INIT → WAITING_RESTORE → READY_START → STARTING → RUNNING → 
PREPARE_CLOSE → CLOSED
+ * </pre>
+ *
+ * <p>Checkpoint coordination is handled by accumulating per-cycle ACKs via 
{@link #ack(Barrier)}
+ * and buffering per-action state snapshots via {@link #addState(Barrier, 
ActionStateKey, List)}
+ * before sending a single {@link TaskAcknowledgeOperation} to the {@code 
CheckpointCoordinator}.
+ *
+ * <p>Subclasses must implement {@link #collect()} (the main data-reading 
loop) and {@link
+ * #createSourceFlowLifeCycle} (factory for the source-specific lifecycle).
+ */
 @Slf4j
 public abstract class SeaTunnelTask extends AbstractTask {
     private static final long serialVersionUID = 2604309561613784425L;
@@ -119,6 +138,24 @@ public abstract class SeaTunnelTask extends AbstractTask {
         this.currState = SeaTunnelTaskState.CREATED;
     }
 
+    /**
+     * Initializes the task by converting the execution {@link Flow} DAG into 
a chain of {@link
+     * FlowLifeCycle} objects.
+     *
+     * <p>Specifically this method:
+     *
+     * <ol>
+     *   <li>Creates a {@link SeaTunnelMetricsContext} for this task's metrics 
reporting.
+     *   <li>Recursively traverses the {@code executionFlow} graph via {@link
+     *       #convertFlowToActionLifeCycle(Flow)}, producing one {@link 
FlowLifeCycle} per node and
+     *       wiring their output lists together.
+     *   <li>Calls {@link FlowLifeCycle#init()} on every lifecycle in the 
chain.
+     *   <li>Registers a composite future over all {@code flowFutures} so that 
{@code closeCalled}
+     *       is set to {@code true} when every flow in the chain has completed.
+     * </ol>
+     *
+     * @throws Exception if flow conversion or any lifecycle init fails
+     */
     @Override
     public void init() throws Exception {
         super.init();
@@ -134,6 +171,25 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 .whenComplete((s, e) -> closeCalled = true);
     }
 
+    /**
+     * Advances the task through its state machine. Called repeatedly by the 
task execution loop.
+     *
+     * <p>State transitions:
+     *
+     * <ul>
+     *   <li><b>INIT → WAITING_RESTORE</b>: Reports status and waits for 
{@code restoreComplete}.
+     *   <li><b>WAITING_RESTORE → READY_START</b>: Once restore is done, opens 
all {@link
+     *       FlowLifeCycle} instances and waits for the external start signal.
+     *   <li><b>READY_START → STARTING → RUNNING</b>: Triggered when {@code 
startCalled} is set.
+     *   <li><b>RUNNING</b>: Calls {@link #collect()} to read/process data. 
Transitions to {@code
+     *       PREPARE_CLOSE} when {@code prepareCloseStatus} is set by a 
barrier.
+     *   <li><b>PREPARE_CLOSE → CLOSED</b>: Waits for all flows to complete 
({@code closeCalled}),
+     *       then calls {@link #close()} and marks the task progress as done.
+     *   <li><b>CANCELLING → CANCELED</b>: External cancellation path; closes 
and marks done.
+     * </ul>
+     *
+     * @throws Exception if any state transition or the {@link #collect()} 
call fails
+     */
     protected void stateProcess() throws Exception {
         switch (currState) {
             case INIT:
@@ -193,6 +249,29 @@ public abstract class SeaTunnelTask extends AbstractTask {
         this.taskBelongGroup = group;
     }
 
+    /**
+     * Recursively converts a {@link Flow} DAG into a chain of {@link 
FlowLifeCycle} objects.
+     *
+     * <p>For each node in the graph this method:
+     *
+     * <ol>
+     *   <li>Recurses into {@code flow.getNext()} to build downstream 
lifecycles first.
+     *   <li>Creates a {@link CompletableFuture} and registers it in {@code 
flowFutures} for
+     *       close-detection.
+     *   <li>Instantiates the appropriate lifecycle based on the flow/action 
type:
+     *       <ul>
+     *         <li>{@link SourceAction} → {@link SourceFlowLifeCycle} (via 
subclass factory)
+     *         <li>{@link SinkAction} → {@link SinkFlowLifeCycle}
+     *         <li>{@link TransformChainAction} → {@link 
TransformFlowLifeCycle}
+     *         <li>{@link IntermediateExecutionFlow} → {@link 
IntermediateQueueFlowLifeCycle}
+     *       </ul>
+     *   <li>Wires the downstream lifecycles as the outputs of the newly 
created lifecycle.
+     * </ol>
+     *
+     * @param flow the root (or sub-root) of the DAG to convert
+     * @return the lifecycle corresponding to {@code flow}
+     * @throws Exception if action type is unknown or lifecycle creation fails
+     */
     @SuppressWarnings({"unchecked", "rawtypes"})
     private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow) 
throws Exception {
 
@@ -295,6 +374,15 @@ public abstract class SeaTunnelTask extends AbstractTask {
         return result;
     }
 
+    /**
+     * Performs an ordered teardown of all {@link FlowLifeCycle} objects in 
this task.
+     *
+     * <p>Each lifecycle's {@link FlowLifeCycle#close()} is called in 
iteration order. If any
+     * lifecycle throws an {@link IOException}, the error is logged but does 
not prevent the
+     * remaining lifecycles from being closed (first-exception-wins logging).
+     *
+     * @throws IOException if the parent {@link AbstractTask#close()} fails
+     */
     @Override
     public void close() throws IOException {
         super.close();
@@ -309,6 +397,23 @@ public abstract class SeaTunnelTask extends AbstractTask {
                         });
     }
 
+    /**
+     * Accumulates a per-cycle checkpoint ACK for the given barrier.
+     *
+     * <p>Each {@link FlowLifeCycle} in the chain calls this method when it 
has finished processing
+     * a barrier. Once every cycle has ACKed (i.e. {@code ackSize == 
allCycles.size()}):
+     *
+     * <ol>
+     *   <li>If the barrier carries a {@code prepareClose} signal for this 
task, {@code
+     *       prepareCloseStatus} is set to {@code true} to trigger the {@code 
RUNNING →
+     *       PREPARE_CLOSE} transition.
+     *   <li>If the barrier is a snapshot barrier, a {@link 
TaskAcknowledgeOperation} containing all
+     *       buffered {@link ActionSubtaskState}s is sent to the {@code 
CheckpointCoordinator} on
+     *       the master node.
+     * </ol>
+     *
+     * @param barrier the checkpoint or prepare-close barrier being 
acknowledged
+     */
     public void ack(Barrier barrier) {
         log.debug("seatunnel task ack barrier[{}]", this.taskLocation);
         Integer ackSize =
@@ -331,6 +436,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
         }
     }
 
+    /**
+     * Sends a {@link TriggerSchemaChangeBeforeCheckpointOperation} to the 
master node.
+     *
+     * <p>This propagates a DDL-before-checkpoint barrier to the upstream 
enumerator, signalling
+     * that a schema change must be applied before the next checkpoint can 
proceed.
+     *
+     * @return a future that completes when the master acknowledges the 
operation
+     */
     public InvocationFuture<Object> triggerSchemaChangeBeforeCheckpoint() {
         log.info(
                 "trigger schema-change-before checkpoint. jobID[{}], 
taskLocation[{}]",
@@ -340,6 +453,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 .sendToMaster(new 
TriggerSchemaChangeBeforeCheckpointOperation(taskLocation));
     }
 
+    /**
+     * Sends a {@link TriggerSchemaChangeAfterCheckpointOperation} to the 
master node.
+     *
+     * <p>This propagates a DDL-after-checkpoint barrier signalling that the 
schema change has been
+     * committed and downstream tasks can proceed with the new schema.
+     *
+     * @return a future that completes when the master acknowledges the 
operation
+     */
     public InvocationFuture<Object> triggerSchemaChangeAfterCheckpoint() {
         log.info(
                 "trigger schema-change-after checkpoint. jobID[{}], 
taskLocation[{}]",
@@ -349,6 +470,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
                 .sendToMaster(new 
TriggerSchemaChangeAfterCheckpointOperation(taskLocation));
     }
 
+    /**
+     * Buffers a per-action checkpoint state snapshot for the given barrier.
+     *
+     * <p>Each action in the task chain serializes its state as a list of byte 
arrays and registers
+     * it here. The accumulated states are later sent to the {@code 
CheckpointCoordinator} when all
+     * cycles have ACKed via {@link #ack(Barrier)}.
+     *
+     * @param barrier the checkpoint barrier this state belongs to
+     * @param stateKey identifies the action that produced the state
+     * @param state the serialized action state as a list of byte arrays
+     */
     public void addState(Barrier barrier, ActionStateKey stateKey, 
List<byte[]> state) {
         List<ActionSubtaskState> states =
                 checkpointStates.computeIfAbsent(barrier.getId(), id -> new 
ArrayList<>());

Reply via email to