This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dbdf0c229f [Docs][Core] Add Javadoc to SourceFlowLifeCycle (#10552)
dbdf0c229f is described below
commit dbdf0c229fe91ee17f6c82a1e46769064e53413b
Author: zoo-code <[email protected]>
AuthorDate: Fri Mar 6 18:50:54 2026 +0900
[Docs][Core] Add Javadoc to SourceFlowLifeCycle (#10552)
---
.../server/task/flow/SourceFlowLifeCycle.java | 136 +++++++++++++++++++++
1 file changed, 136 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 7843eff7f8..6f473c6c9f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -64,6 +64,23 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
+/**
+ * Runtime lifecycle bridge between the Zeta engine and a connector's {@link
SourceReader}.
+ *
+ * <p>This class manages the full lifecycle of a source reader within a Zeta
worker task, including:
+ *
+ * <ul>
+ * <li>Creating and opening the {@link SourceReader} from the {@link
SourceAction}
+ * <li>Registering with the remote {@link
org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ * and requesting splits
+ * <li>Running the core read loop via {@link #collect()}
+ * <li>Handling checkpoint barriers with proper checkpoint-lock
synchronization
+ * <li>Coordinating schema-change signals (before/after checkpoint phases)
+ * </ul>
+ *
+ * @param <T> the type of records produced by the source
+ * @param <SplitT> the type of source splits
+ */
@Slf4j
public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends
ActionFlowLifeCycle
implements InternalCheckpointListener {
@@ -111,6 +128,15 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
this.collector = collector;
}
+ /**
+ * Initializes the source reader and supporting components.
+ *
+ * <p>This method creates the split serializer from the {@link
SourceAction}, builds a {@link
+ * SourceReaderContext} for the reader, creates the {@link SourceReader}
instance, and resolves
+ * the remote enumerator's network address.
+ *
+ * @throws Exception if reader creation or enumerator address resolution
fails
+ */
@Override
public void init() throws Exception {
this.splitSerializer = sourceAction.getSource().getSplitSerializer();
@@ -125,6 +151,14 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}
+ /**
+ * Opens the source reader and registers this reader with the remote split
enumerator.
+ *
+ * <p>Fires a {@link ReaderOpenEvent}, delegates to {@link
SourceReader#open()}, and then calls
+ * {@link #register()} to notify the enumerator that this reader is ready
to receive splits.
+ *
+ * @throws Exception if the reader fails to open or registration fails
+ */
@Override
public void open() throws Exception {
context.getEventListener().onEvent(new ReaderOpenEvent());
@@ -147,6 +181,30 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
super.close();
}
+ /**
+ * Core read loop that polls the source reader for the next batch of
records.
+ *
+ * <p>This method is called repeatedly by the task execution loop. It
performs the following:
+ *
+ * <ol>
+ * <li>If {@code prepareClose} is set, the reader is shutting down and
this method sleeps to
+ * yield the thread.
+ * <li>If a schema change is in progress, reading is paused until the
schema-change checkpoint
+ * completes.
+ * <li>Otherwise, calls {@link SourceReader#pollNext} to fetch records.
If no records were
+ * produced, sleeps briefly to avoid busy-waiting.
+ * <li>After polling, checks for schema-change signals from the
collector. If a before or
+ * after schema-change signal is captured, it initiates the
corresponding schema-change
+ * checkpoint phase and pauses further collection until the
checkpoint completes.
+ * </ol>
+ *
+ * <p><b>Checkpoint lock interaction:</b> The reader holds the checkpoint
lock during {@code
+ * pollNext}. A brief {@code Thread.sleep(0L)} after a non-empty poll
gives the checkpoint
+ * thread a chance to acquire the lock via {@link
#triggerBarrier(Barrier)}, preventing
+ * checkpoint starvation under high CPU load.
+ *
+ * @throws Exception if polling or schema-change triggering fails
+ */
public void collect() throws Exception {
if (!prepareClose) {
if (schemaChanging()) {
@@ -195,6 +253,15 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ /**
+ * Signals that this reader has no more data to produce.
+ *
+ * <p>Sets the {@code prepareClose} flag to {@code true} and sends a {@link
+ * SourceNoMoreElementOperation} to the remote enumerator, deregistering
this reader from
+ * further split assignment.
+ *
+ * @throws RuntimeException if the deregistration message fails to send
+ */
public void signalNoMoreElement() {
// ready close this reader
try {
@@ -212,6 +279,14 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ /**
+ * Registers this reader with the remote split enumerator.
+ *
+ * <p>Sends a {@link SourceRegisterOperation} to the enumerator at the
previously resolved
+ * address, informing it that this reader subtask is ready to receive
splits.
+ *
+ * @throws RuntimeException if registration fails due to communication
errors
+ */
private void register() {
try {
runningTask
@@ -227,6 +302,15 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ /**
+ * Sends a split request to the remote split enumerator.
+ *
+ * <p>Sends a {@link RequestSplitOperation} to the enumerator, requesting
new splits to be
+ * assigned to this reader. The enumerator will respond asynchronously by
calling {@link
+ * #receivedSplits(List)}.
+ *
+ * @throws RuntimeException if the split request fails due to
communication errors
+ */
public void requestSplit() {
try {
runningTask
@@ -256,6 +340,16 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ /**
+ * Handles splits received from the remote split enumerator.
+ *
+ * <p>If the split list is empty, it indicates that the enumerator has no
more splits to assign,
+ * and {@link SourceReader#handleNoMoreSplits()} is called. Otherwise, the
splits are forwarded
+ * to the reader via {@link SourceReader#addSplits(List)}.
+ *
+ * @param splits the list of splits assigned by the enumerator; an empty
list signals no more
+ * splits
+ */
public void receivedSplits(List<SplitT> splits) {
if (splits.isEmpty()) {
reader.handleNoMoreSplits();
@@ -264,6 +358,26 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
}
}
+ /**
+ * Injects a checkpoint barrier into the record stream.
+ *
+ * <p>This method acquires the {@code checkpointLock} on the collector to
ensure mutual
+ * exclusion with the reader's {@code pollNext} calls. While holding the
lock, it:
+ *
+ * <ol>
+ * <li>Propagates the {@code prepareClose} flag if the barrier targets
this task
+ * <li>Snapshots the reader state (if the barrier requires a snapshot)
and registers it with
+ * the running task
+ * <li>Acknowledges the barrier and sends it downstream as a {@link
Record}
+ * </ol>
+ *
+ * <p>After releasing the lock, if the barrier carries a schema-change
checkpoint type, the
+ * method associates the barrier's checkpoint ID with the current {@link
SchemaChangePhase}.
+ * This locks the collect loop until the schema-change checkpoint
completes or is aborted.
+ *
+ * @param barrier the checkpoint or savepoint barrier to inject
+ * @throws Exception if state snapshotting or barrier acknowledgment fails
+ */
public void triggerBarrier(Barrier barrier) throws Exception {
log.debug("source trigger barrier [{}]", barrier);
@@ -326,11 +440,33 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
return schemaChangePhase.get() != null;
}
+ /**
+ * Notifies the source reader that a checkpoint has been successfully
completed.
+ *
+ * <p>Delegates to {@link SourceReader#notifyCheckpointComplete(long)},
allowing the connector
+ * to perform post-commit cleanup such as acknowledging consumed offsets
or removing temporary
+ * files.
+ *
+ * @param checkpointId the ID of the completed checkpoint
+ * @throws Exception if the reader's post-checkpoint hook fails
+ */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
reader.notifyCheckpointComplete(checkpointId);
}
+ /**
+ * Notifies the source reader that a checkpoint has been aborted.
+ *
+ * <p>Delegates to {@link SourceReader#notifyCheckpointAborted(long)} and
then checks whether
+ * the aborted checkpoint matches an in-progress schema-change phase. If
so, an {@link
+ * IllegalStateException} is thrown because a schema-change checkpoint
cannot be safely retried
+ * once aborted.
+ *
+ * @param checkpointId the ID of the aborted checkpoint
+ * @throws IllegalStateException if the aborted checkpoint is a
schema-change checkpoint
+ * @throws Exception if the reader's abort notification hook fails
+ */
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
reader.notifyCheckpointAborted(checkpointId);