This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c84f62fb12 [Docs][Core] Add Javadoc to SeaTunnelTask (state machine
and lifecycle) (#10559)
c84f62fb12 is described below
commit c84f62fb123e0f7aaf747f507806d6760e8f313c
Author: Ramesh Reddy Adutla
<[email protected]>
AuthorDate: Sun Mar 8 00:39:06 2026 +0000
[Docs][Core] Add Javadoc to SeaTunnelTask (state machine and lifecycle)
(#10559)
Co-authored-by: Copilot <[email protected]>
---
.../engine/server/task/SeaTunnelTask.java | 132 +++++++++++++++++++++
1 file changed, 132 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index d95f7ef9eb..3b5c24fe15 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -86,6 +86,25 @@ import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTask
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.STARTING;
import static
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.WAITING_RESTORE;
+/**
+ * Abstract base class for all Zeta engine task executions.
+ *
+ * <p>A {@code SeaTunnelTask} drives the lifecycle of a single pipeline
subtask. It holds the
+ * execution DAG as a {@link Flow} graph, converts that graph into a chain of
{@link FlowLifeCycle}
+ * objects during {@link #init()}, and then repeatedly calls {@link
#stateProcess()} to advance
+ * through the task state machine:
+ *
+ * <pre>
+ * CREATED → INIT → WAITING_RESTORE → READY_START → STARTING → RUNNING →
PREPARE_CLOSE → CLOSED
+ * </pre>
+ *
+ * <p>Checkpoint coordination is handled by accumulating per-cycle ACKs via
{@link #ack(Barrier)}
+ * and buffering per-action state snapshots via {@link #addState(Barrier,
ActionStateKey, List)}
+ * before sending a single {@link TaskAcknowledgeOperation} to the {@code
CheckpointCoordinator}.
+ *
+ * <p>Subclasses must implement {@link #collect()} (the main data-reading
loop) and {@link
+ * #createSourceFlowLifeCycle} (factory for the source-specific lifecycle).
+ */
@Slf4j
public abstract class SeaTunnelTask extends AbstractTask {
private static final long serialVersionUID = 2604309561613784425L;
@@ -119,6 +138,24 @@ public abstract class SeaTunnelTask extends AbstractTask {
this.currState = SeaTunnelTaskState.CREATED;
}
+ /**
+ * Initializes the task by converting the execution {@link Flow} DAG into
a chain of {@link
+ * FlowLifeCycle} objects.
+ *
+ * <p>Specifically this method:
+ *
+ * <ol>
+ * <li>Creates a {@link SeaTunnelMetricsContext} for this task's metrics
reporting.
+ * <li>Recursively traverses the {@code executionFlow} graph via {@link
+ * #convertFlowToActionLifeCycle(Flow)}, producing one {@link
FlowLifeCycle} per node and
+ * wiring their output lists together.
+ * <li>Calls {@link FlowLifeCycle#init()} on every lifecycle in the
chain.
+ * <li>Registers a composite future over all {@code flowFutures} so that
{@code closeCalled}
+ * is set to {@code true} when every flow in the chain has completed.
+ * </ol>
+ *
+ * @throws Exception if flow conversion or any lifecycle init fails
+ */
@Override
public void init() throws Exception {
super.init();
@@ -134,6 +171,25 @@ public abstract class SeaTunnelTask extends AbstractTask {
.whenComplete((s, e) -> closeCalled = true);
}
+ /**
+ * Advances the task through its state machine. Called repeatedly by the
task execution loop.
+ *
+ * <p>State transitions:
+ *
+ * <ul>
+ * <li><b>INIT → WAITING_RESTORE</b>: Reports status and waits for
{@code restoreComplete}.
+ * <li><b>WAITING_RESTORE → READY_START</b>: Once restore is done, opens
all {@link
+ * FlowLifeCycle} instances and waits for the external start signal.
+ * <li><b>READY_START → STARTING → RUNNING</b>: Triggered when {@code
startCalled} is set.
+ * <li><b>RUNNING</b>: Calls {@link #collect()} to read/process data.
Transitions to {@code
+ * PREPARE_CLOSE} when {@code prepareCloseStatus} is set by a
barrier.
+ * <li><b>PREPARE_CLOSE → CLOSED</b>: Waits for all flows to complete
({@code closeCalled}),
+ * then calls {@link #close()} and marks the task progress as done.
+ * <li><b>CANCELLING → CANCELED</b>: External cancellation path; closes
and marks done.
+ * </ul>
+ *
+ * @throws Exception if any state transition or the {@link #collect()}
call fails
+ */
protected void stateProcess() throws Exception {
switch (currState) {
case INIT:
@@ -193,6 +249,29 @@ public abstract class SeaTunnelTask extends AbstractTask {
this.taskBelongGroup = group;
}
+ /**
+ * Recursively converts a {@link Flow} DAG into a chain of {@link
FlowLifeCycle} objects.
+ *
+ * <p>For each node in the graph this method:
+ *
+ * <ol>
+ * <li>Recurses into {@code flow.getNext()} to build downstream
lifecycles first.
+ * <li>Creates a {@link CompletableFuture} and registers it in {@code
flowFutures} for
+ * close-detection.
+ * <li>Instantiates the appropriate lifecycle based on the flow/action
type:
+ * <ul>
+ * <li>{@link SourceAction} → {@link SourceFlowLifeCycle} (via
subclass factory)
+ * <li>{@link SinkAction} → {@link SinkFlowLifeCycle}
+ * <li>{@link TransformChainAction} → {@link
TransformFlowLifeCycle}
+ * <li>{@link IntermediateExecutionFlow} → {@link
IntermediateQueueFlowLifeCycle}
+ * </ul>
+ * <li>Wires the downstream lifecycles as the outputs of the newly
created lifecycle.
+ * </ol>
+ *
+ * @param flow the root (or sub-root) of the DAG to convert
+ * @return the lifecycle corresponding to {@code flow}
+ * @throws Exception if action type is unknown or lifecycle creation fails
+ */
@SuppressWarnings({"unchecked", "rawtypes"})
private FlowLifeCycle convertFlowToActionLifeCycle(@NonNull Flow flow)
throws Exception {
@@ -295,6 +374,15 @@ public abstract class SeaTunnelTask extends AbstractTask {
return result;
}
+ /**
+ * Performs an ordered teardown of all {@link FlowLifeCycle} objects in
this task.
+ *
+ * <p>Each lifecycle's {@link FlowLifeCycle#close()} is called in
iteration order. If any
+ * lifecycle throws an {@link IOException}, the error is logged but does
not prevent the
+ * remaining lifecycles from being closed (first-exception-wins logging).
+ *
+ * @throws IOException if the parent {@link AbstractTask#close()} fails
+ */
@Override
public void close() throws IOException {
super.close();
@@ -309,6 +397,23 @@ public abstract class SeaTunnelTask extends AbstractTask {
});
}
+ /**
+ * Accumulates a per-cycle checkpoint ACK for the given barrier.
+ *
+ * <p>Each {@link FlowLifeCycle} in the chain calls this method when it
has finished processing
+ * a barrier. Once every cycle has ACKed (i.e. {@code ackSize ==
allCycles.size()}):
+ *
+ * <ol>
+ * <li>If the barrier carries a {@code prepareClose} signal for this
task, {@code
+ * prepareCloseStatus} is set to {@code true} to trigger the {@code
RUNNING →
+ * PREPARE_CLOSE} transition.
+ * <li>If the barrier is a snapshot barrier, a {@link
TaskAcknowledgeOperation} containing all
+ * buffered {@link ActionSubtaskState}s is sent to the {@code
CheckpointCoordinator} on
+ * the master node.
+ * </ol>
+ *
+ * @param barrier the checkpoint or prepare-close barrier being
acknowledged
+ */
public void ack(Barrier barrier) {
log.debug("seatunnel task ack barrier[{}]", this.taskLocation);
Integer ackSize =
@@ -331,6 +436,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
}
}
+ /**
+ * Sends a {@link TriggerSchemaChangeBeforeCheckpointOperation} to the
master node.
+ *
+ * <p>This propagates a DDL-before-checkpoint barrier to the upstream
enumerator, signalling
+ * that a schema change must be applied before the next checkpoint can
proceed.
+ *
+ * @return a future that completes when the master acknowledges the
operation
+ */
public InvocationFuture<Object> triggerSchemaChangeBeforeCheckpoint() {
log.info(
"trigger schema-change-before checkpoint. jobID[{}],
taskLocation[{}]",
@@ -340,6 +453,14 @@ public abstract class SeaTunnelTask extends AbstractTask {
.sendToMaster(new
TriggerSchemaChangeBeforeCheckpointOperation(taskLocation));
}
+ /**
+ * Sends a {@link TriggerSchemaChangeAfterCheckpointOperation} to the
master node.
+ *
+ * <p>This propagates a DDL-after-checkpoint barrier signalling that the
schema change has been
+ * committed and downstream tasks can proceed with the new schema.
+ *
+ * @return a future that completes when the master acknowledges the
operation
+ */
public InvocationFuture<Object> triggerSchemaChangeAfterCheckpoint() {
log.info(
"trigger schema-change-after checkpoint. jobID[{}],
taskLocation[{}]",
@@ -349,6 +470,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
.sendToMaster(new
TriggerSchemaChangeAfterCheckpointOperation(taskLocation));
}
+ /**
+ * Buffers a per-action checkpoint state snapshot for the given barrier.
+ *
+ * <p>Each action in the task chain serializes its state as a list of byte
arrays and registers
+ * it here. The accumulated states are later sent to the {@code
CheckpointCoordinator} when all
+ * cycles have ACKed via {@link #ack(Barrier)}.
+ *
+ * @param barrier the checkpoint barrier this state belongs to
+ * @param stateKey identifies the action that produced the state
+ * @param state the serialized action state as a list of byte arrays
+ */
public void addState(Barrier barrier, ActionStateKey stateKey,
List<byte[]> state) {
List<ActionSubtaskState> states =
checkpointStates.computeIfAbsent(barrier.getId(), id -> new
ArrayList<>());