This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dbdf0c229f [Docs][Core] Add Javadoc to SourceFlowLifeCycle (#10552)
dbdf0c229f is described below

commit dbdf0c229fe91ee17f6c82a1e46769064e53413b
Author: zoo-code <[email protected]>
AuthorDate: Fri Mar 6 18:50:54 2026 +0900

    [Docs][Core] Add Javadoc to SourceFlowLifeCycle (#10552)
---
 .../server/task/flow/SourceFlowLifeCycle.java      | 136 +++++++++++++++++++++
 1 file changed, 136 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 7843eff7f8..6f473c6c9f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -64,6 +64,23 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
+/**
+ * Runtime lifecycle bridge between the Zeta engine and a connector's {@link 
SourceReader}.
+ *
+ * <p>This class manages the full lifecycle of a source reader within a Zeta 
worker task, including:
+ *
+ * <ul>
+ *   <li>Creating and opening the {@link SourceReader} from the {@link 
SourceAction}
+ *   <li>Registering with the remote {@link 
org.apache.seatunnel.api.source.SourceSplitEnumerator}
+ *       and requesting splits
+ *   <li>Running the core read loop via {@link #collect()}
+ *   <li>Handling checkpoint barriers with proper checkpoint-lock 
synchronization
+ *   <li>Coordinating schema-change signals (before/after checkpoint phases)
+ * </ul>
+ *
+ * @param <T> the type of records produced by the source
+ * @param <SplitT> the type of source splits
+ */
 @Slf4j
 public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends 
ActionFlowLifeCycle
         implements InternalCheckpointListener {
@@ -111,6 +128,15 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         this.collector = collector;
     }
 
+    /**
+     * Initializes the source reader and supporting components.
+     *
+     * <p>This method creates the split serializer from the {@link 
SourceAction}, builds a {@link
+     * SourceReaderContext} for the reader, creates the {@link SourceReader} 
instance, and resolves
+     * the remote enumerator's network address.
+     *
+     * @throws Exception if reader creation or enumerator address resolution 
fails
+     */
     @Override
     public void init() throws Exception {
         this.splitSerializer = sourceAction.getSource().getSplitSerializer();
@@ -125,6 +151,14 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         this.enumeratorTaskAddress = getEnumeratorTaskAddress();
     }
 
+    /**
+     * Opens the source reader and registers this reader with the remote split 
enumerator.
+     *
+     * <p>Fires a {@link ReaderOpenEvent}, delegates to {@link 
SourceReader#open()}, and then calls
+     * {@link #register()} to notify the enumerator that this reader is ready 
to receive splits.
+     *
+     * @throws Exception if the reader fails to open or registration fails
+     */
     @Override
     public void open() throws Exception {
         context.getEventListener().onEvent(new ReaderOpenEvent());
@@ -147,6 +181,30 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         super.close();
     }
 
+    /**
+     * Core read loop that polls the source reader for the next batch of 
records.
+     *
+     * <p>This method is called repeatedly by the task execution loop. It 
performs the following:
+     *
+     * <ol>
+     *   <li>If {@code prepareClose} is set, the reader is shutting down and 
this method sleeps to
+     *       yield the thread.
+     *   <li>If a schema change is in progress, reading is paused until the 
schema-change checkpoint
+     *       completes.
+     *   <li>Otherwise, calls {@link SourceReader#pollNext} to fetch records. 
If no records were
+     *       produced, sleeps briefly to avoid busy-waiting.
+     *   <li>After polling, checks for schema-change signals from the 
collector. If a before or
+     *       after schema-change signal is captured, it initiates the 
corresponding schema-change
+     *       checkpoint phase and pauses further collection until the 
checkpoint completes.
+     * </ol>
+     *
+     * <p><b>Checkpoint lock interaction:</b> The reader holds the checkpoint 
lock during {@code
+     * pollNext}. A brief {@code Thread.sleep(0L)} after a non-empty poll 
gives the checkpoint
+     * thread a chance to acquire the lock via {@link 
#triggerBarrier(Barrier)}, preventing
+     * checkpoint starvation under high CPU load.
+     *
+     * @throws Exception if polling or schema-change triggering fails
+     */
     public void collect() throws Exception {
         if (!prepareClose) {
             if (schemaChanging()) {
@@ -195,6 +253,15 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    /**
+     * Signals that this reader has no more data to produce.
+     *
+     * <p>Sets the {@code prepareClose} flag to {@code true} and sends a {@link
+     * SourceNoMoreElementOperation} to the remote enumerator, deregistering 
this reader from
+     * further split assignment.
+     *
+     * @throws RuntimeException if the deregistration message fails to send
+     */
     public void signalNoMoreElement() {
         // ready close this reader
         try {
@@ -212,6 +279,14 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    /**
+     * Registers this reader with the remote split enumerator.
+     *
+     * <p>Sends a {@link SourceRegisterOperation} to the enumerator at the 
previously resolved
+     * address, informing it that this reader subtask is ready to receive 
splits.
+     *
+     * @throws RuntimeException if registration fails due to communication 
errors
+     */
     private void register() {
         try {
             runningTask
@@ -227,6 +302,15 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    /**
+     * Sends a split request to the remote split enumerator.
+     *
+     * <p>Sends a {@link RequestSplitOperation} to the enumerator, requesting 
new splits to be
+     * assigned to this reader. The enumerator will respond asynchronously by 
calling {@link
+     * #receivedSplits(List)}.
+     *
+     * @throws RuntimeException if the split request fails due to 
communication errors
+     */
     public void requestSplit() {
         try {
             runningTask
@@ -256,6 +340,16 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    /**
+     * Handles splits received from the remote split enumerator.
+     *
+     * <p>If the split list is empty, it indicates that the enumerator has no 
more splits to assign,
+     * and {@link SourceReader#handleNoMoreSplits()} is called. Otherwise, the 
splits are forwarded
+     * to the reader via {@link SourceReader#addSplits(List)}.
+     *
+     * @param splits the list of splits assigned by the enumerator; an empty 
list signals no more
+     *     splits
+     */
     public void receivedSplits(List<SplitT> splits) {
         if (splits.isEmpty()) {
             reader.handleNoMoreSplits();
@@ -264,6 +358,26 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         }
     }
 
+    /**
+     * Injects a checkpoint barrier into the record stream.
+     *
+     * <p>This method acquires the {@code checkpointLock} on the collector to 
ensure mutual
+     * exclusion with the reader's {@code pollNext} calls. While holding the 
lock, it:
+     *
+     * <ol>
+     *   <li>Propagates the {@code prepareClose} flag if the barrier targets 
this task
+     *   <li>Snapshots the reader state (if the barrier requires a snapshot) 
and registers it with
+     *       the running task
+     *   <li>Acknowledges the barrier and sends it downstream as a {@link 
Record}
+     * </ol>
+     *
+     * <p>After releasing the lock, if the barrier carries a schema-change 
checkpoint type, the
+     * method associates the barrier's checkpoint ID with the current {@link 
SchemaChangePhase}.
+     * This locks the collect loop until the schema-change checkpoint 
completes or is aborted.
+     *
+     * @param barrier the checkpoint or savepoint barrier to inject
+     * @throws Exception if state snapshotting or barrier acknowledgment fails
+     */
     public void triggerBarrier(Barrier barrier) throws Exception {
         log.debug("source trigger barrier [{}]", barrier);
 
@@ -326,11 +440,33 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
         return schemaChangePhase.get() != null;
     }
 
+    /**
+     * Notifies the source reader that a checkpoint has been successfully 
completed.
+     *
+     * <p>Delegates to {@link SourceReader#notifyCheckpointComplete(long)}, 
allowing the connector
+     * to perform post-commit cleanup such as acknowledging consumed offsets 
or removing temporary
+     * files.
+     *
+     * @param checkpointId the ID of the completed checkpoint
+     * @throws Exception if the reader's post-checkpoint hook fails
+     */
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         reader.notifyCheckpointComplete(checkpointId);
     }
 
+    /**
+     * Notifies the source reader that a checkpoint has been aborted.
+     *
+     * <p>Delegates to {@link SourceReader#notifyCheckpointAborted(long)} and 
then checks whether
+     * the aborted checkpoint matches an in-progress schema-change phase. If 
so, an {@link
+     * IllegalStateException} is thrown because a schema-change checkpoint 
cannot be safely retried
+     * once aborted.
+     *
+     * @param checkpointId the ID of the aborted checkpoint
+     * @throws IllegalStateException if the aborted checkpoint is a 
schema-change checkpoint
+     * @throws Exception if the reader's abort notification hook fails
+     */
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
         reader.notifyCheckpointAborted(checkpointId);

Reply via email to