wzhero1 commented on code in PR #8262:
URL: https://github.com/apache/paimon/pull/8262#discussion_r3433756392


##########
paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Streaming scan for chain tables with a two-phase design:
+ *
+ * <ul>
+ *   <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per 
group) and delta
+ *       partitions that come after it. Older snapshot partitions are excluded 
as they are
+ *       considered outdated. Each primary key appears exactly once under its 
natural partition.
+ *       Unlike batch full scan, anchor-based chain merging is intentionally 
skipped to keep Phase 1
+ *       lightweight — this avoids split explosion in long-running jobs with 
many partitions.
+ *   <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta 
branch only, picking up
+ *       from where Phase 1 left off.
+ * </ul>
+ *
+ * <p>Checkpoint state is a single {@code Long} — the delta branch's next 
snapshot id. On stateful
+ * restart, Phase 1 is skipped and incremental streaming resumes from the 
checkpointed position. On
+ * stateless restart (null state), a fresh starting scan is performed.
+ */
+public class ChainTableStreamScan implements StreamDataTableScan {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableStreamScan.class);
+
+    private final ChainGroupReadTable chainGroupReadTable;
+
+    /** Phase 1: batch scan used to access snapshot branch data via {@code 
mainScan}. */
+    private final ChainGroupReadTable.ChainTableBatchScan batchScan;
+
+    /** Phase 2: delta-only stream scan. */
+    private final DataTableStreamScan deltaStreamScan;
+
+    /** Projector for splitting full partition into group and chain parts. */
+    private final ChainPartitionProjector partitionProjector;
+
+    /** Comparator for chain partition keys only. */
+    private final RecordComparator chainPartitionComparator;
+
+    /**
+     * Checkpoint state: the next delta snapshot id to read. Null before Phase 
1 completes; non-null
+     * once Phase 1 is done or after a stateful restore.
+     */
+    @Nullable private Long nextDeltaSnapshotId;
+
+    /** Whether the starting plan (Phase 1) has been completed. */
+    private boolean startingDone = false;
+
+    /** Predicates and shard for applying to local scans created in {@link 
#planStarting()}. */
+    private final List<Predicate> predicates = new ArrayList<>();
+
+    private int shardIndex = -1;
+
+    private int shardCount = -1;
+
+    public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
+        this.chainGroupReadTable = chainGroupReadTable;
+        this.batchScan =
+                new ChainGroupReadTable.ChainTableBatchScan(
+                        chainGroupReadTable.schema(), chainGroupReadTable);
+        this.deltaStreamScan = (DataTableStreamScan) 
chainGroupReadTable.other().newStreamScan();
+
+        // Initialize partition projector and chain comparator using the 
established pattern
+        // from ChainTableBatchScan.
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(
+                        chainGroupReadTable.coreOptions(),
+                        chainGroupReadTable.schema().partitionKeys());
+        this.partitionProjector =
+                new ChainPartitionProjector(
+                        chainGroupReadTable.schema().logicalPartitionType(), 
chainKeys.size());
+        this.chainPartitionComparator =
+                CodeGenUtils.newRecordComparator(
+                        
partitionProjector.chainPartitionType().getFieldTypes());
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (!startingDone) {
+            return StartingContext.EMPTY;
+        }
+        return deltaStreamScan.startingContext();
+    }
+
+    @Override
+    public TableScan.Plan plan() {
+        if (!startingDone) {
+            return planStarting();
+        }
+        TableScan.Plan plan = deltaStreamScan.plan();
+        // Never return SnapshotNotExistPlan — it would cause the Flink 
enumerator to
+        // set stopTriggerScan=true and permanently stop polling for new data.
+        if (plan instanceof SnapshotNotExistPlan) {
+            return new DataFilePlan<>(Collections.emptyList());
+        }
+        return plan;
+    }
+
+    /**
+     * Starting plan: outputs the latest snapshot partition (per group) and 
delta partitions that
+     * come after it. Older snapshot partitions are excluded. Each primary key 
appears exactly once
+     * under its natural partition.
+     *
+     * <p>Unlike batch full scan, anchor-based chain merging is not performed. 
This keeps Phase 1
+     * lightweight for long-running jobs.
+     */
+    private TableScan.Plan planStarting() {
+        FileStoreTable deltaTable = chainGroupReadTable.other();
+        String deltaBranch = deltaTable.coreOptions().branch();
+        String snapshotBranch = 
chainGroupReadTable.wrapped.coreOptions().branch();
+
+        Long latestId = captureDeltaPosition(deltaTable);
+
+        // 1. Read delta branch data at the pinned snapshot, grouped by 
partition.
+        Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition;
+        if (latestId != null) {
+            FileStoreTable pinnedDelta =
+                    deltaTable.copy(
+                            Collections.singletonMap("scan.snapshot-id", 
String.valueOf(latestId)));
+            DataTableScan pinnedDeltaScan = pinnedDelta.newScan();
+            applyPredicatesAndShard(pinnedDeltaScan);
+            deltaSplitsByPartition = groupByPartition(pinnedDeltaScan);
+        } else {
+            deltaSplitsByPartition = Collections.emptyMap();
+        }
+
+        // 2. Read all snapshot branch data, grouped by partition.
+        // Reuse batchScan.mainScan which has predicates/shard already applied.
+        Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition =
+                
chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null
+                        ? groupByPartition(batchScan.mainScan)
+                        : Collections.emptyMap();
+
+        // 3. Find the latest snapshot partition per group (based on chain 
partition keys).
+        //    Only output the latest snapshot partition and delta partitions 
after it.
+        Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>();
+        for (BinaryRow partition : snapshotSplitsByPartition.keySet()) {
+            Object groupKey = toGroupKey(partition);
+            BinaryRow existingLatest = 
latestChainPartitionPerGroup.get(groupKey);
+            if (existingLatest == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(existingLatest))
+                            > 0) {
+                latestChainPartitionPerGroup.put(groupKey, partition);
+            }
+        }
+
+        // 4. Build ChainSplits:
+        //    - For snapshot partitions: only include if chain key == latest 
for that group.
+        //    - For delta partitions: include if (a) chain key > latest for 
that group, or
+        //      (b) no snapshot exists for that group.
+        List<Split> allSplits = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
snapshotSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            if (chainPartitionComparator.compare(
+                            
partitionProjector.extractChainPartition(partition),
+                            
partitionProjector.extractChainPartition(latestPartition))
+                    == 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, snapshotBranch));
+                }
+            }
+        }
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
deltaSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            // Include delta partition if:
+            // - No snapshot exists for this group, OR
+            // - Chain key > latest snapshot chain key
+            if (latestPartition == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(latestPartition))
+                            > 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, deltaBranch));
+                }
+            }
+        }
+
+        LOG.info(
+                "ChainTableStreamScan.planStarting [snapshot={}, delta={}]: "
+                        + "{} delta partitions, {} snapshot partitions, "
+                        + "{} latest snapshot groups, {} total splits",
+                snapshotBranch,
+                deltaBranch,
+                deltaSplitsByPartition.size(),
+                snapshotSplitsByPartition.size(),
+                latestChainPartitionPerGroup.size(),
+                allSplits.size());
+
+        startingDone = true;
+        return new DataFilePlan<>(allSplits);
+    }
+
+    /**
+     * Captures the delta branch's latest snapshot id and positions the Phase 
2 stream scan to start
+     * from the next snapshot. This makes the Phase 1 / Phase 2 boundary 
deterministic: Phase 1
+     * reads delta data pinned at the returned snapshot id, Phase 2 starts 
from the snapshot after.
+     *
+     * @return the latest delta snapshot id, or {@code null} if the delta 
branch has no snapshots
+     */
+    @Nullable
+    private Long captureDeltaPosition(FileStoreTable deltaTable) {
+        SnapshotManager deltaSnapshotManager = deltaTable.snapshotManager();
+        Long latestId = deltaSnapshotManager.latestSnapshotId();
+        nextDeltaSnapshotId = latestId != null ? latestId + 1 : 
Snapshot.FIRST_SNAPSHOT_ID;
+        LOG.info(
+                "ChainTableStreamScan: pinned delta branch '{}' at snapshot 
{}, "
+                        + "nextDeltaSnapshotId={}",
+                deltaTable.coreOptions().branch(),
+                latestId,
+                nextDeltaSnapshotId);
+        deltaStreamScan.restore(nextDeltaSnapshotId);
+        return latestId;
+    }
+
+    /** Plans a scan and groups the resulting splits by partition. */
+    private static Map<BinaryRow, List<DataSplit>> 
groupByPartition(DataTableScan scan) {
+        Map<BinaryRow, List<DataSplit>> grouped = new LinkedHashMap<>();
+        for (Split s : scan.plan().splits()) {
+            DataSplit ds = (DataSplit) s;
+            grouped.computeIfAbsent(ds.partition(), k -> new 
ArrayList<>()).add(ds);
+        }
+        return grouped;
+    }
+
+    /**
+     * Extracts a stable group key from a full partition row. When there is no 
group partition (all
+     * fields are chain keys), returns a shared singleton to avoid zero-field 
{@link BinaryRow}
+     * instances that may have inconsistent {@code hashCode}/{@code equals} 
across different
+     * partitions.
+     */
+    private Object toGroupKey(BinaryRow fullPartition) {
+        if (!partitionProjector.hasGroupPartition()) {
+            return Collections.emptyList();
+        }
+        return partitionProjector.extractGroupPartition(fullPartition);
+    }
+
+    /**
+     * Converts a {@link DataSplit} to a {@link ChainSplit} where all files 
belong to the given
+     * branch. The partition value is preserved as-is (no rewriting).
+     */
+    private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, 
String branch) {
+        HashMap<String, String> fileBranchMapping = new HashMap<>();
+        HashMap<String, String> fileBucketPathMapping = new HashMap<>();
+        for (DataFileMeta file : dataSplit.dataFiles()) {
+            fileBranchMapping.put(file.fileName(), branch);
+            fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath());
+        }
+        return new ChainSplit(
+                dataSplit.partition(),
+                dataSplit.dataFiles(),
+                fileBranchMapping,
+                fileBucketPathMapping);
+    }
+
+    @Override
+    public InnerTableScan withFilter(Predicate predicate) {
+        predicates.add(predicate);
+        batchScan.withFilter(predicate);
+        deltaStreamScan.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        shardIndex = indexOfThisSubtask;
+        shardCount = numberOfParallelSubtasks;
+        batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+        deltaStreamScan.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+        return this;
+    }
+
+    /**
+     * Applies all previously set predicates and shard to a newly created 
scan. Used for the pinned
+     * delta scan in {@link #planStarting()}.
+     */
+    private void applyPredicatesAndShard(DataTableScan scan) {
+        for (Predicate p : predicates) {
+            scan.withFilter(p);
+        }
+        if (shardIndex >= 0) {
+            scan.withShard(shardIndex, shardCount);
+        }
+    }
+
+    @Nullable
+    @Override
+    public Long checkpoint() {
+        return nextDeltaSnapshotId;

Review Comment:
   **[Blocking] `checkpoint()` returns a stale snapshot id in Phase 2, causing 
duplicate consumption after recovery.**
   
   `checkpoint()` returns `nextDeltaSnapshotId`, which is set only once in 
`captureDeltaPosition()` (the Phase-1 boundary, `latestId + 1`). In Phase 2, 
`plan()` delegates to `deltaStreamScan.plan()`, which advances the delta scan's 
*internal* cursor, but `nextDeltaSnapshotId` is never written back — so 
`checkpoint()` is frozen at the boundary.
   
   The Flink enumerator persists `scan.checkpoint()` on every checkpoint 
(`ContinuousFileSplitEnumerator#snapshotState`). So during Phase 2 it keeps 
storing the boundary value; on failure, `restore(boundary)` makes the delta 
stream re-read every snapshot consumed since Phase 1 → large-scale duplicates.
   
   Note that `watermark()` and `startingContext()` already delegate to 
`deltaStreamScan` when `startingDone` — `checkpoint()` is the one that was 
missed.
   
   Secondary effect: because `checkpoint()` never changes, 
`ContinuousFileSplitEnumerator#scanNextSnapshot` never increments 
`handledSnapshotCount`, so `scan.max-snapshot-count` backpressure is silently 
defeated (the `splitMaxNum` guard still applies). The same fix resolves this.
   
   Suggested fix:
   ```java
   @Nullable
   @Override
   public Long checkpoint() {
       if (startingDone) {
           return deltaStreamScan.checkpoint();
       }
       return nextDeltaSnapshotId;
   }
   ```



##########
paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Streaming scan for chain tables with a two-phase design:
+ *
+ * <ul>
+ *   <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per 
group) and delta
+ *       partitions that come after it. Older snapshot partitions are excluded 
as they are
+ *       considered outdated. Each primary key appears exactly once under its 
natural partition.
+ *       Unlike batch full scan, anchor-based chain merging is intentionally 
skipped to keep Phase 1
+ *       lightweight — this avoids split explosion in long-running jobs with 
many partitions.
+ *   <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta 
branch only, picking up
+ *       from where Phase 1 left off.
+ * </ul>
+ *
+ * <p>Checkpoint state is a single {@code Long} — the delta branch's next 
snapshot id. On stateful
+ * restart, Phase 1 is skipped and incremental streaming resumes from the 
checkpointed position. On
+ * stateless restart (null state), a fresh starting scan is performed.
+ */
+public class ChainTableStreamScan implements StreamDataTableScan {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableStreamScan.class);
+
+    private final ChainGroupReadTable chainGroupReadTable;
+
+    /** Phase 1: batch scan used to access snapshot branch data via {@code 
mainScan}. */
+    private final ChainGroupReadTable.ChainTableBatchScan batchScan;
+
+    /** Phase 2: delta-only stream scan. */
+    private final DataTableStreamScan deltaStreamScan;
+
+    /** Projector for splitting full partition into group and chain parts. */
+    private final ChainPartitionProjector partitionProjector;
+
+    /** Comparator for chain partition keys only. */
+    private final RecordComparator chainPartitionComparator;
+
+    /**
+     * Checkpoint state: the next delta snapshot id to read. Null before Phase 
1 completes; non-null
+     * once Phase 1 is done or after a stateful restore.
+     */
+    @Nullable private Long nextDeltaSnapshotId;
+
+    /** Whether the starting plan (Phase 1) has been completed. */
+    private boolean startingDone = false;
+
+    /** Predicates and shard for applying to local scans created in {@link 
#planStarting()}. */
+    private final List<Predicate> predicates = new ArrayList<>();
+
+    private int shardIndex = -1;
+
+    private int shardCount = -1;
+
+    public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
+        this.chainGroupReadTable = chainGroupReadTable;
+        this.batchScan =
+                new ChainGroupReadTable.ChainTableBatchScan(
+                        chainGroupReadTable.schema(), chainGroupReadTable);
+        this.deltaStreamScan = (DataTableStreamScan) 
chainGroupReadTable.other().newStreamScan();
+
+        // Initialize partition projector and chain comparator using the 
established pattern
+        // from ChainTableBatchScan.
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(
+                        chainGroupReadTable.coreOptions(),
+                        chainGroupReadTable.schema().partitionKeys());
+        this.partitionProjector =
+                new ChainPartitionProjector(
+                        chainGroupReadTable.schema().logicalPartitionType(), 
chainKeys.size());
+        this.chainPartitionComparator =
+                CodeGenUtils.newRecordComparator(
+                        
partitionProjector.chainPartitionType().getFieldTypes());
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (!startingDone) {
+            return StartingContext.EMPTY;
+        }
+        return deltaStreamScan.startingContext();
+    }
+
+    @Override
+    public TableScan.Plan plan() {
+        if (!startingDone) {
+            return planStarting();
+        }
+        TableScan.Plan plan = deltaStreamScan.plan();
+        // Never return SnapshotNotExistPlan — it would cause the Flink 
enumerator to
+        // set stopTriggerScan=true and permanently stop polling for new data.
+        if (plan instanceof SnapshotNotExistPlan) {
+            return new DataFilePlan<>(Collections.emptyList());
+        }
+        return plan;
+    }
+
+    /**
+     * Starting plan: outputs the latest snapshot partition (per group) and 
delta partitions that
+     * come after it. Older snapshot partitions are excluded. Each primary key 
appears exactly once
+     * under its natural partition.
+     *
+     * <p>Unlike batch full scan, anchor-based chain merging is not performed. 
This keeps Phase 1
+     * lightweight for long-running jobs.
+     */
+    private TableScan.Plan planStarting() {
+        FileStoreTable deltaTable = chainGroupReadTable.other();
+        String deltaBranch = deltaTable.coreOptions().branch();
+        String snapshotBranch = 
chainGroupReadTable.wrapped.coreOptions().branch();
+
+        Long latestId = captureDeltaPosition(deltaTable);
+
+        // 1. Read delta branch data at the pinned snapshot, grouped by 
partition.
+        Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition;
+        if (latestId != null) {
+            FileStoreTable pinnedDelta =
+                    deltaTable.copy(
+                            Collections.singletonMap("scan.snapshot-id", 
String.valueOf(latestId)));
+            DataTableScan pinnedDeltaScan = pinnedDelta.newScan();
+            applyPredicatesAndShard(pinnedDeltaScan);
+            deltaSplitsByPartition = groupByPartition(pinnedDeltaScan);
+        } else {
+            deltaSplitsByPartition = Collections.emptyMap();
+        }
+
+        // 2. Read all snapshot branch data, grouped by partition.
+        // Reuse batchScan.mainScan which has predicates/shard already applied.
+        Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition =

Review Comment:
   **[Performance] `planStarting()` reads file-level splits for every snapshot 
partition, then keeps only the latest per group.**
   
   `groupByPartition(batchScan.mainScan)` runs a full batch scan that reads the 
manifests and file lists of *all* snapshot-branch partitions, but only the 
latest chain partition per group is kept (steps 3-4). With many historical 
full-dump partitions (e.g. a daily ODS dump over a year), this reads hundreds 
of partitions' metadata to keep one — slow startup and heavy manifest I/O 
(especially on object stores).
   
   The batch path already does the cheaper thing: `ChainGroupReadTable.plan()` 
uses `newChainPartitionListingScan(...).listPartitions()` (partition metadata 
only) to locate partitions before scanning files. Suggest the same here: list 
partitions first, pick the latest per group, then `withPartitionFilter(...)` to 
scan only that partition. The pinned-delta scan (L166) has the same pattern.



##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java:
##########
@@ -364,6 +367,31 @@ protected int assignSuggestedTask(IncrementalSplit split) {
         }
     }
 
+    protected int assignSuggestedTask(ChainSplit split) {
+        int parallelism = context.currentParallelism();
+        // Extract bucket id from the bucket path stored in 
fileBucketPathMapping.
+        // The bucket path ends with "bucket-{id}".
+        int bucketId = 0;
+        if (!split.fileBucketPathMapping().isEmpty()) {
+            String bucketPath = 
split.fileBucketPathMapping().values().iterator().next();
+            int lastSlash = bucketPath.lastIndexOf('/');
+            if (lastSlash >= 0) {
+                String bucketDir = bucketPath.substring(lastSlash + 1);
+                if (bucketDir.startsWith("bucket-")) {
+                    try {
+                        bucketId = 
Integer.parseInt(bucketDir.substring("bucket-".length()));
+                    } catch (NumberFormatException ignored) {

Review Comment:
   **[Minor] Silent catch hides bucket-id parse failures.**
   
   `catch (NumberFormatException ignored) {}` swallows the failure and falls 
back to `bucketId = 0`, which would route all affected splits to the same 
reader subtask (skew) with no trace. The `bucket-{N}` layout is a stable 
convention so this shouldn't happen in practice, but a `LOG.warn(...)` here 
would make any future regression diagnosable rather than silent.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java:
##########
@@ -688,4 +710,1301 @@ public void testChainTableWithGroupPartition() throws 
Exception {
                 .containsExactlyInAnyOrder(
                         "+I[2, 2, 1-1, CN, 20250811]", "+I[4, 1, 1, CN, 
20250811]");
     }
+
+    /** Write Row data (with RowKind) to a specific branch using DataStream 
API. */
+    private void writeChangelogToBranch(String db, String tableName, String 
branch, Row... rows)
+            throws Exception {
+        FileStoreTable table = paimonTable(tableName + "$branch_" + branch);
+
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .parallelism(1)
+                        .build();
+
+        DataStream<Row> stream = env.fromCollection(Arrays.asList(rows));
+
+        new FlinkSinkBuilder(table)
+                .forRow(
+                        stream,
+                        DataTypes.ROW(
+                                DataTypes.FIELD("k", DataTypes.BIGINT()),
+                                DataTypes.FIELD("seq", DataTypes.BIGINT()),
+                                DataTypes.FIELD("v", DataTypes.STRING()),
+                                DataTypes.FIELD("dt", DataTypes.STRING())))
+                .build();
+        env.execute();
+    }
+
+    /**
+     * Collect n rows from a streaming iterator with a timeout. If no data 
arrives within
+     * timeoutSeconds, the iterator is closed and an AssertionError is thrown. 
This is necessary
+     * because it.next() blocks indefinitely when no data is available, and 
JUnit @Timeout cannot
+     * interrupt it.
+     */
+    private List<String> collectRows(CloseableIterator<Row> it, int n, int 
timeoutSeconds)
+            throws Exception {
+        List<String> result = new ArrayList<>();
+        for (int i = 0; i < n; i++) {
+            CompletableFuture<String> future =
+                    CompletableFuture.supplyAsync(() -> it.next().toString());
+            try {
+                result.add(future.get(timeoutSeconds, TimeUnit.SECONDS));
+            } catch (java.util.concurrent.TimeoutException e) {
+                future.cancel(true);
+                it.close();
+                throw new AssertionError(
+                        "Streaming read blocked for "
+                                + timeoutSeconds
+                                + "s after collecting "
+                                + result.size()
+                                + "/"
+                                + n
+                                + " rows. Collected so far: "
+                                + result);
+            }
+        }
+        return result;
+    }
+
+    /** Default collectRows with 30s timeout. */
+    private List<String> collectRows(CloseableIterator<Row> it, int n) throws 
Exception {
+        return collectRows(it, n, 30);
+    }
+
+    /**
+     * Tests the streaming read lifecycle for a chain table with 
changelog-producer=input.
+     *
+     * <p>Verifies: initial full read from delta-only → delta incremental 
visible with changelog
+     * records (-U/+U) → snapshot OVERWRITE has no effect → more delta visible 
→ stateless restart
+     * reads chain-merged state.
+     */
+    @Test
+    @Timeout(120)
+    public void testStreamingReadChainTableLifecycleWithInputChangelog() 
throws Exception {
+        // Create chain table with changelog-producer=input
+        sql(
+                "CREATE TABLE chain_life_cl ("
+                        + "  k BIGINT, seq BIGINT, v STRING, dt STRING"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + "  'primary-key' = 'dt,k',"
+                        + "  'bucket-key' = 'k',"
+                        + "  'bucket' = '2',"
+                        + "  'sequence.field' = 'seq',"
+                        + "  'merge-engine' = 'deduplicate',"
+                        + "  'changelog-producer' = 'input',"
+                        + "  'chain-table.enabled' = 'true',"
+                        + "  'partition.timestamp-pattern' = '$dt',"
+                        + "  'partition.timestamp-formatter' = 'yyyyMMdd',"
+                        + "  'continuous.discovery-interval' = '1ms'"
+                        + ")");
+
+        String db = tEnv.getCurrentDatabase();
+        sql("CALL sys.create_branch('%s.chain_life_cl', 'snapshot')", db);
+        sql("CALL sys.create_branch('%s.chain_life_cl', 'delta')", db);
+        for (String tbl :
+                new String[] {
+                    "chain_life_cl", "chain_life_cl$branch_snapshot", 
"chain_life_cl$branch_delta"
+                }) {
+            sql(
+                    "ALTER TABLE `%s` SET ("
+                            + "  'scan.fallback-snapshot-branch' = 'snapshot',"
+                            + "  'scan.fallback-delta-branch' = 'delta')",
+                    tbl);
+        }
+
+        // === Phase 1: Delta-only initial data (all inserts) ===
+        sql(
+                "INSERT INTO `chain_life_cl$branch_delta` PARTITION (dt = 
'20250808')"
+                        + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 
'base_3'),"
+                        + " (4, 1, 'base_4'), (5, 1, 'base_5')");
+
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
chain_life_cl").collect();
+
+        List<String> phase1 = collectRows(it, 5);
+        assertThat(phase1)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 1, base_1, 20250808]",
+                        "+I[2, 1, base_2, 20250808]",
+                        "+I[3, 1, base_3, 20250808]",
+                        "+I[4, 1, base_4, 20250808]",
+                        "+I[5, 1, base_5, 20250808]");
+
+        // === Phase 2: Write changelog data (with -U/+U for update) via 
DataStream API ===
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.UPDATE_BEFORE, 3L, 1L, "base_3", 
"20250809"),
+                Row.ofKind(RowKind.UPDATE_AFTER, 3L, 2L, "upd_3", "20250809"),
+                Row.ofKind(RowKind.INSERT, 6L, 1L, "new_6", "20250809"),
+                Row.ofKind(RowKind.INSERT, 7L, 1L, "new_7", "20250809"));
+
+        Thread.sleep(2000);
+        List<String> phase2 = collectRows(it, 4);
+        // changelog-producer=input: explicit -U/+U for updates
+        assertThat(phase2)
+                .containsExactlyInAnyOrder(
+                        "-U[3, 1, base_3, 20250809]",
+                        "+U[3, 2, upd_3, 20250809]",
+                        "+I[6, 1, new_6, 20250809]",
+                        "+I[7, 1, new_7, 20250809]");
+
+        // === Phase 3: Snapshot OVERWRITE should have NO effect ===
+        sql(
+                "INSERT OVERWRITE `chain_life_cl$branch_snapshot` PARTITION 
(dt = '20250808')"
+                        + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 
'base_3'),"
+                        + " (4, 1, 'base_4'), (5, 1, 'base_5')");
+
+        Thread.sleep(2000);
+
+        // Write delta AFTER snapshot — this proves snapshot writes don't 
trigger output.
+        // If snapshot writes were detected, we'd see duplicate or unexpected 
rows.
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 100L, 1L, "phase3_probe", 
"20250810"));
+
+        Thread.sleep(2000);
+        List<String> phase3 = collectRows(it, 1);
+        assertThat(phase3)
+                .as("Only delta write should produce output, snapshot 
OVERWRITE should be ignored")
+                .containsExactlyInAnyOrder("+I[100, 1, phase3_probe, 
20250810]");
+
+        // === Phase 4: Write more delta via DataStream API ===
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 8L, 1L, "new_8", "20250810"),
+                Row.ofKind(RowKind.INSERT, 9L, 1L, "new_9", "20250810"));
+
+        Thread.sleep(2000);
+        List<String> phase4 = collectRows(it, 2);
+        assertThat(phase4)
+                .containsExactlyInAnyOrder(
+                        "+I[8, 1, new_8, 20250810]", "+I[9, 1, new_9, 
20250810]");
+
+        // Terminate first streaming job
+        it.close();
+
+        // === Phase 5: Stateless restart ===
+        CloseableIterator<Row> it2 = sEnv.executeSql("SELECT * FROM 
chain_life_cl").collect();
+
+        // Phase 5 starting (matching batch semantics):
+        // - snapshot@20250808: k=1-5 (snapshot wins, delta@20250808 skipped 
since same partition
+        //   exists in snapshot; same values here since OVERWRITE wrote 
identical base data)
+        // - delta@20250809: changelog records (+U for update, +I for inserts)
+        // - delta@20250810: k=8,9,100 (delta-only, no snapshot for this 
partition)
+        // Total: 11 unique rows (PK=(dt,k) makes each (dt,k) pair distinct).
+        List<String> restart = collectRows(it2, 11);
+        assertThat(restart)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 1, base_1, 20250808]",
+                        "+I[2, 1, base_2, 20250808]",
+                        "+I[3, 1, base_3, 20250808]",
+                        "+U[3, 2, upd_3, 20250809]",
+                        "+I[4, 1, base_4, 20250808]",
+                        "+I[5, 1, base_5, 20250808]",
+                        "+I[6, 1, new_6, 20250809]",
+                        "+I[7, 1, new_7, 20250809]",
+                        "+I[8, 1, new_8, 20250810]",
+                        "+I[9, 1, new_9, 20250810]",
+                        "+I[100, 1, phase3_probe, 20250810]");
+
+        // Continue writing delta
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 10L, 1L, "new_10", "20250811"),
+                Row.ofKind(RowKind.INSERT, 11L, 1L, "new_11", "20250811"));
+
+        Thread.sleep(2000);
+        List<String> phase5b = collectRows(it2, 2);
+        assertThat(phase5b)
+                .containsExactlyInAnyOrder(
+                        "+I[10, 1, new_10, 20250811]", "+I[11, 1, new_11, 
20250811]");
+
+        it2.close();
+    }
+
+    /**
+     * Tests stateful restart of a chain table streaming read job using Flink 
checkpoint/restore.
+     *
+     * <p>Phase 1: Write initial delta data, start streaming job, verify read. 
Phase 2: Trigger a
+     * checkpoint (saves enumerator state including nextDeltaSnapshotId), then 
cancel the job. Phase
+     * 3: Write new delta data while the job is down. Phase 4: Restart from 
checkpoint — the
+     * restored scan should skip doFullLoad() and only read Phase 3's new 
data. Phase 5: Verify
+     * incremental streaming continues to work after restore.
+     */
+    @Test
+    @Timeout(180)
+    public void testStreamingReadChainTableStatefulRestart() throws Exception {

Review Comment:
   **[Test gap] This stateful-restart test doesn't exercise the 
checkpoint/restore path that actually regresses.**
   
   The checkpoint here is triggered before Phase 2 has consumed any *new* delta 
snapshot (no delta is written between the Phase-1 write at L1009 and the 
checkpoint at L1024). So the frozen `nextDeltaSnapshotId` happens to equal the 
real delta cursor, and the "no duplicates" assertion (L1088) passes even if 
`checkpoint()` is broken.
   
   The `checkpoint()` regression (see comment on 
`ChainTableStreamScan#checkpoint`) only manifests when Phase 2 has already 
advanced past the boundary. Suggest adding a case: after Phase 1, write delta 
and let Phase 2 consume ≥1 snapshot, *then* checkpoint → cancel → restart, and 
assert no duplicates.



##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkChainTableITCase.java:
##########
@@ -688,4 +710,1301 @@ public void testChainTableWithGroupPartition() throws 
Exception {
                 .containsExactlyInAnyOrder(
                         "+I[2, 2, 1-1, CN, 20250811]", "+I[4, 1, 1, CN, 
20250811]");
     }
+
+    /** Write Row data (with RowKind) to a specific branch using DataStream 
API. */
+    private void writeChangelogToBranch(String db, String tableName, String 
branch, Row... rows)
+            throws Exception {
+        FileStoreTable table = paimonTable(tableName + "$branch_" + branch);
+
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .checkpointIntervalMs(100)
+                        .parallelism(1)
+                        .build();
+
+        DataStream<Row> stream = env.fromCollection(Arrays.asList(rows));
+
+        new FlinkSinkBuilder(table)
+                .forRow(
+                        stream,
+                        DataTypes.ROW(
+                                DataTypes.FIELD("k", DataTypes.BIGINT()),
+                                DataTypes.FIELD("seq", DataTypes.BIGINT()),
+                                DataTypes.FIELD("v", DataTypes.STRING()),
+                                DataTypes.FIELD("dt", DataTypes.STRING())))
+                .build();
+        env.execute();
+    }
+
+    /**
+     * Collect n rows from a streaming iterator with a timeout. If no data 
arrives within
+     * timeoutSeconds, the iterator is closed and an AssertionError is thrown. 
This is necessary
+     * because it.next() blocks indefinitely when no data is available, and 
JUnit @Timeout cannot
+     * interrupt it.
+     */
+    private List<String> collectRows(CloseableIterator<Row> it, int n, int 
timeoutSeconds)
+            throws Exception {
+        List<String> result = new ArrayList<>();
+        for (int i = 0; i < n; i++) {
+            CompletableFuture<String> future =
+                    CompletableFuture.supplyAsync(() -> it.next().toString());
+            try {
+                result.add(future.get(timeoutSeconds, TimeUnit.SECONDS));
+            } catch (java.util.concurrent.TimeoutException e) {
+                future.cancel(true);
+                it.close();
+                throw new AssertionError(
+                        "Streaming read blocked for "
+                                + timeoutSeconds
+                                + "s after collecting "
+                                + result.size()
+                                + "/"
+                                + n
+                                + " rows. Collected so far: "
+                                + result);
+            }
+        }
+        return result;
+    }
+
+    /** Default collectRows with 30s timeout. */
+    private List<String> collectRows(CloseableIterator<Row> it, int n) throws 
Exception {
+        return collectRows(it, n, 30);
+    }
+
+    /**
+     * Tests the streaming read lifecycle for a chain table with 
changelog-producer=input.
+     *
+     * <p>Verifies: initial full read from delta-only → delta incremental 
visible with changelog
+     * records (-U/+U) → snapshot OVERWRITE has no effect → more delta visible 
→ stateless restart
+     * reads chain-merged state.
+     */
+    @Test
+    @Timeout(120)
+    public void testStreamingReadChainTableLifecycleWithInputChangelog() 
throws Exception {
+        // Create chain table with changelog-producer=input
+        sql(
+                "CREATE TABLE chain_life_cl ("
+                        + "  k BIGINT, seq BIGINT, v STRING, dt STRING"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + "  'primary-key' = 'dt,k',"
+                        + "  'bucket-key' = 'k',"
+                        + "  'bucket' = '2',"
+                        + "  'sequence.field' = 'seq',"
+                        + "  'merge-engine' = 'deduplicate',"
+                        + "  'changelog-producer' = 'input',"
+                        + "  'chain-table.enabled' = 'true',"
+                        + "  'partition.timestamp-pattern' = '$dt',"
+                        + "  'partition.timestamp-formatter' = 'yyyyMMdd',"
+                        + "  'continuous.discovery-interval' = '1ms'"
+                        + ")");
+
+        String db = tEnv.getCurrentDatabase();
+        sql("CALL sys.create_branch('%s.chain_life_cl', 'snapshot')", db);
+        sql("CALL sys.create_branch('%s.chain_life_cl', 'delta')", db);
+        for (String tbl :
+                new String[] {
+                    "chain_life_cl", "chain_life_cl$branch_snapshot", 
"chain_life_cl$branch_delta"
+                }) {
+            sql(
+                    "ALTER TABLE `%s` SET ("
+                            + "  'scan.fallback-snapshot-branch' = 'snapshot',"
+                            + "  'scan.fallback-delta-branch' = 'delta')",
+                    tbl);
+        }
+
+        // === Phase 1: Delta-only initial data (all inserts) ===
+        sql(
+                "INSERT INTO `chain_life_cl$branch_delta` PARTITION (dt = 
'20250808')"
+                        + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 
'base_3'),"
+                        + " (4, 1, 'base_4'), (5, 1, 'base_5')");
+
+        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
chain_life_cl").collect();
+
+        List<String> phase1 = collectRows(it, 5);
+        assertThat(phase1)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 1, base_1, 20250808]",
+                        "+I[2, 1, base_2, 20250808]",
+                        "+I[3, 1, base_3, 20250808]",
+                        "+I[4, 1, base_4, 20250808]",
+                        "+I[5, 1, base_5, 20250808]");
+
+        // === Phase 2: Write changelog data (with -U/+U for update) via 
DataStream API ===
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.UPDATE_BEFORE, 3L, 1L, "base_3", 
"20250809"),
+                Row.ofKind(RowKind.UPDATE_AFTER, 3L, 2L, "upd_3", "20250809"),
+                Row.ofKind(RowKind.INSERT, 6L, 1L, "new_6", "20250809"),
+                Row.ofKind(RowKind.INSERT, 7L, 1L, "new_7", "20250809"));
+
+        Thread.sleep(2000);
+        List<String> phase2 = collectRows(it, 4);
+        // changelog-producer=input: explicit -U/+U for updates
+        assertThat(phase2)
+                .containsExactlyInAnyOrder(
+                        "-U[3, 1, base_3, 20250809]",
+                        "+U[3, 2, upd_3, 20250809]",
+                        "+I[6, 1, new_6, 20250809]",
+                        "+I[7, 1, new_7, 20250809]");
+
+        // === Phase 3: Snapshot OVERWRITE should have NO effect ===
+        sql(
+                "INSERT OVERWRITE `chain_life_cl$branch_snapshot` PARTITION 
(dt = '20250808')"
+                        + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 
'base_3'),"
+                        + " (4, 1, 'base_4'), (5, 1, 'base_5')");
+
+        Thread.sleep(2000);
+
+        // Write delta AFTER snapshot — this proves snapshot writes don't 
trigger output.
+        // If snapshot writes were detected, we'd see duplicate or unexpected 
rows.
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 100L, 1L, "phase3_probe", 
"20250810"));
+
+        Thread.sleep(2000);
+        List<String> phase3 = collectRows(it, 1);
+        assertThat(phase3)
+                .as("Only delta write should produce output, snapshot 
OVERWRITE should be ignored")
+                .containsExactlyInAnyOrder("+I[100, 1, phase3_probe, 
20250810]");
+
+        // === Phase 4: Write more delta via DataStream API ===
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 8L, 1L, "new_8", "20250810"),
+                Row.ofKind(RowKind.INSERT, 9L, 1L, "new_9", "20250810"));
+
+        Thread.sleep(2000);
+        List<String> phase4 = collectRows(it, 2);
+        assertThat(phase4)
+                .containsExactlyInAnyOrder(
+                        "+I[8, 1, new_8, 20250810]", "+I[9, 1, new_9, 
20250810]");
+
+        // Terminate first streaming job
+        it.close();
+
+        // === Phase 5: Stateless restart ===
+        CloseableIterator<Row> it2 = sEnv.executeSql("SELECT * FROM 
chain_life_cl").collect();
+
+        // Phase 5 starting (matching batch semantics):
+        // - snapshot@20250808: k=1-5 (snapshot wins, delta@20250808 skipped 
since same partition
+        //   exists in snapshot; same values here since OVERWRITE wrote 
identical base data)
+        // - delta@20250809: changelog records (+U for update, +I for inserts)
+        // - delta@20250810: k=8,9,100 (delta-only, no snapshot for this 
partition)
+        // Total: 11 unique rows (PK=(dt,k) makes each (dt,k) pair distinct).
+        List<String> restart = collectRows(it2, 11);
+        assertThat(restart)
+                .containsExactlyInAnyOrder(
+                        "+I[1, 1, base_1, 20250808]",
+                        "+I[2, 1, base_2, 20250808]",
+                        "+I[3, 1, base_3, 20250808]",
+                        "+U[3, 2, upd_3, 20250809]",
+                        "+I[4, 1, base_4, 20250808]",
+                        "+I[5, 1, base_5, 20250808]",
+                        "+I[6, 1, new_6, 20250809]",
+                        "+I[7, 1, new_7, 20250809]",
+                        "+I[8, 1, new_8, 20250810]",
+                        "+I[9, 1, new_9, 20250810]",
+                        "+I[100, 1, phase3_probe, 20250810]");
+
+        // Continue writing delta
+        writeChangelogToBranch(
+                db,
+                "chain_life_cl",
+                "delta",
+                Row.ofKind(RowKind.INSERT, 10L, 1L, "new_10", "20250811"),
+                Row.ofKind(RowKind.INSERT, 11L, 1L, "new_11", "20250811"));
+
+        Thread.sleep(2000);
+        List<String> phase5b = collectRows(it2, 2);
+        assertThat(phase5b)
+                .containsExactlyInAnyOrder(
+                        "+I[10, 1, new_10, 20250811]", "+I[11, 1, new_11, 
20250811]");
+
+        it2.close();
+    }
+
+    /**
+     * Tests stateful restart of a chain table streaming read job using Flink 
checkpoint/restore.
+     *
+     * <p>Phase 1: Write initial delta data, start streaming job, verify read. 
Phase 2: Trigger a
+     * checkpoint (saves enumerator state including nextDeltaSnapshotId), then 
cancel the job. Phase
+     * 3: Write new delta data while the job is down. Phase 4: Restart from 
checkpoint — the
+     * restored scan should skip doFullLoad() and only read Phase 3's new 
data. Phase 5: Verify
+     * incremental streaming continues to work after restore.
+     */
+    @Test
+    @Timeout(180)
+    public void testStreamingReadChainTableStatefulRestart() throws Exception {
+        // Create chain table (source)
+        sql(
+                "CREATE TABLE chain_restart ("
+                        + "  k BIGINT, seq BIGINT, v STRING, dt STRING"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + "  'primary-key' = 'dt,k',"
+                        + "  'bucket-key' = 'k',"
+                        + "  'bucket' = '2',"
+                        + "  'sequence.field' = 'seq',"
+                        + "  'merge-engine' = 'deduplicate',"
+                        + "  'changelog-producer' = 'input',"
+                        + "  'chain-table.enabled' = 'true',"
+                        + "  'partition.timestamp-pattern' = '$dt',"
+                        + "  'partition.timestamp-formatter' = 'yyyyMMdd',"
+                        + "  'continuous.discovery-interval' = '1ms'"
+                        + ")");
+
+        // Create a Paimon PK sink table. The Paimon sink supports upsert
+        // (primary key), so the planner won't need ChangelogNormalize.
+        // Paimon sink does NOT implement CheckpointedFunction (it uses 
operator
+        // state for in-flight files, committed during checkpoint complete), 
so no
+        // buffer leakage on checkpoint recovery — unlike CollectSinkFunction.
+        sql(
+                "CREATE TABLE chain_restart_sink ("
+                        + "  k BIGINT, seq BIGINT, v STRING, dt STRING,"
+                        + "  PRIMARY KEY (dt, k) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + "  'bucket' = '2',"
+                        + "  'merge-engine' = 'deduplicate',"
+                        + "  'sequence.field' = 'seq'"
+                        + ")");
+
+        String db = tEnv.getCurrentDatabase();
+        sql("CALL sys.create_branch('%s.chain_restart', 'snapshot')", db);
+        sql("CALL sys.create_branch('%s.chain_restart', 'delta')", db);
+        for (String tbl :
+                new String[] {
+                    "chain_restart", "chain_restart$branch_snapshot", 
"chain_restart$branch_delta"
+                }) {
+            sql(
+                    "ALTER TABLE `%s` SET ("
+                            + "  'scan.fallback-snapshot-branch' = 'snapshot',"
+                            + "  'scan.fallback-delta-branch' = 'delta')",
+                    tbl);
+        }
+
+        // Configure checkpoint for stateful restart
+        org.apache.flink.configuration.Configuration config = 
sEnv.getConfig().getConfiguration();
+        config.setString("state.checkpoints.dir", "file://" + path + 
"/checkpoints");
+        config.set(
+                CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION,
+                ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
+        config.removeKey("execution.checkpointing.interval");
+
+        // Same SQL for both phases → operator graph matches → state recovery 
works
+        String streamSql = "INSERT INTO chain_restart_sink SELECT * FROM 
chain_restart";
+
+        // T4: Write snapshot data BEFORE starting streaming, so the starting 
phase
+        // exercises the snapshot+delta merge path (not just delta-only).
+        sql(
+                "INSERT INTO `chain_restart$branch_snapshot` PARTITION (dt = 
'20250807')"
+                        + " VALUES (10, 1, 'snap_10'), (11, 1, 'snap_11')");
+
+        // === Phase 1: Write initial delta data and start streaming INSERT 
INTO ===
+        sql(
+                "INSERT INTO `chain_restart$branch_delta` PARTITION (dt = 
'20250808')"
+                        + " VALUES (1, 1, 'base_1'), (2, 1, 'base_2'), (3, 1, 
'base_3')");
+
+        TableResult tableResult = sEnv.executeSql(streamSql);
+        //noinspection OptionalGetWithoutIsPresent
+        JobClient jobClient = tableResult.getJobClient().get();
+
+        // Wait for data to flow to the sink
+        Thread.sleep(5000);

Review Comment:
   **[Test, minor] Timing-based synchronization is flaky-prone, and debug 
prints are left in.**
   
   This class uses ~19 `Thread.sleep(2000-5000)` calls to synchronize with the 
streaming job, which tends to be flaky under CI load; there are also a few 
leftover `System.err.println("[TEST] ...")` debug statements. Consider polling 
for a condition instead of fixed sleeps, and removing the prints.



##########
paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Streaming scan for chain tables with a two-phase design:
+ *
+ * <ul>
+ *   <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per 
group) and delta
+ *       partitions that come after it. Older snapshot partitions are excluded 
as they are
+ *       considered outdated. Each primary key appears exactly once under its 
natural partition.
+ *       Unlike batch full scan, anchor-based chain merging is intentionally 
skipped to keep Phase 1
+ *       lightweight — this avoids split explosion in long-running jobs with 
many partitions.
+ *   <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta 
branch only, picking up
+ *       from where Phase 1 left off.
+ * </ul>
+ *
+ * <p>Checkpoint state is a single {@code Long} — the delta branch's next 
snapshot id. On stateful
+ * restart, Phase 1 is skipped and incremental streaming resumes from the 
checkpointed position. On
+ * stateless restart (null state), a fresh starting scan is performed.
+ */
+public class ChainTableStreamScan implements StreamDataTableScan {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableStreamScan.class);
+
+    private final ChainGroupReadTable chainGroupReadTable;
+
+    /** Phase 1: batch scan used to access snapshot branch data via {@code 
mainScan}. */
+    private final ChainGroupReadTable.ChainTableBatchScan batchScan;
+
+    /** Phase 2: delta-only stream scan. */
+    private final DataTableStreamScan deltaStreamScan;
+
+    /** Projector for splitting full partition into group and chain parts. */
+    private final ChainPartitionProjector partitionProjector;
+
+    /** Comparator for chain partition keys only. */
+    private final RecordComparator chainPartitionComparator;
+
+    /**
+     * Checkpoint state: the next delta snapshot id to read. Null before Phase 
1 completes; non-null
+     * once Phase 1 is done or after a stateful restore.
+     */
+    @Nullable private Long nextDeltaSnapshotId;
+
+    /** Whether the starting plan (Phase 1) has been completed. */
+    private boolean startingDone = false;
+
+    /** Predicates and shard for applying to local scans created in {@link 
#planStarting()}. */
+    private final List<Predicate> predicates = new ArrayList<>();
+
+    private int shardIndex = -1;
+
+    private int shardCount = -1;
+
+    public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
+        this.chainGroupReadTable = chainGroupReadTable;
+        this.batchScan =
+                new ChainGroupReadTable.ChainTableBatchScan(
+                        chainGroupReadTable.schema(), chainGroupReadTable);
+        this.deltaStreamScan = (DataTableStreamScan) 
chainGroupReadTable.other().newStreamScan();
+
+        // Initialize partition projector and chain comparator using the 
established pattern
+        // from ChainTableBatchScan.
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(
+                        chainGroupReadTable.coreOptions(),
+                        chainGroupReadTable.schema().partitionKeys());
+        this.partitionProjector =
+                new ChainPartitionProjector(
+                        chainGroupReadTable.schema().logicalPartitionType(), 
chainKeys.size());
+        this.chainPartitionComparator =
+                CodeGenUtils.newRecordComparator(
+                        
partitionProjector.chainPartitionType().getFieldTypes());
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (!startingDone) {
+            return StartingContext.EMPTY;
+        }
+        return deltaStreamScan.startingContext();
+    }
+
+    @Override
+    public TableScan.Plan plan() {
+        if (!startingDone) {
+            return planStarting();
+        }
+        TableScan.Plan plan = deltaStreamScan.plan();
+        // Never return SnapshotNotExistPlan — it would cause the Flink 
enumerator to
+        // set stopTriggerScan=true and permanently stop polling for new data.
+        if (plan instanceof SnapshotNotExistPlan) {
+            return new DataFilePlan<>(Collections.emptyList());
+        }
+        return plan;
+    }
+
+    /**
+     * Starting plan: outputs the latest snapshot partition (per group) and 
delta partitions that
+     * come after it. Older snapshot partitions are excluded. Each primary key 
appears exactly once
+     * under its natural partition.
+     *
+     * <p>Unlike batch full scan, anchor-based chain merging is not performed. 
This keeps Phase 1
+     * lightweight for long-running jobs.
+     */
+    private TableScan.Plan planStarting() {
+        FileStoreTable deltaTable = chainGroupReadTable.other();
+        String deltaBranch = deltaTable.coreOptions().branch();
+        String snapshotBranch = 
chainGroupReadTable.wrapped.coreOptions().branch();
+
+        Long latestId = captureDeltaPosition(deltaTable);
+
+        // 1. Read delta branch data at the pinned snapshot, grouped by 
partition.
+        Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition;
+        if (latestId != null) {
+            FileStoreTable pinnedDelta =
+                    deltaTable.copy(
+                            Collections.singletonMap("scan.snapshot-id", 
String.valueOf(latestId)));
+            DataTableScan pinnedDeltaScan = pinnedDelta.newScan();
+            applyPredicatesAndShard(pinnedDeltaScan);
+            deltaSplitsByPartition = groupByPartition(pinnedDeltaScan);
+        } else {
+            deltaSplitsByPartition = Collections.emptyMap();
+        }
+
+        // 2. Read all snapshot branch data, grouped by partition.
+        // Reuse batchScan.mainScan which has predicates/shard already applied.
+        Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition =
+                
chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null
+                        ? groupByPartition(batchScan.mainScan)
+                        : Collections.emptyMap();
+
+        // 3. Find the latest snapshot partition per group (based on chain 
partition keys).
+        //    Only output the latest snapshot partition and delta partitions 
after it.
+        Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>();
+        for (BinaryRow partition : snapshotSplitsByPartition.keySet()) {
+            Object groupKey = toGroupKey(partition);
+            BinaryRow existingLatest = 
latestChainPartitionPerGroup.get(groupKey);
+            if (existingLatest == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(existingLatest))
+                            > 0) {
+                latestChainPartitionPerGroup.put(groupKey, partition);
+            }
+        }
+
+        // 4. Build ChainSplits:
+        //    - For snapshot partitions: only include if chain key == latest 
for that group.
+        //    - For delta partitions: include if (a) chain key > latest for 
that group, or
+        //      (b) no snapshot exists for that group.
+        List<Split> allSplits = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
snapshotSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            if (chainPartitionComparator.compare(
+                            
partitionProjector.extractChainPartition(partition),
+                            
partitionProjector.extractChainPartition(latestPartition))
+                    == 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, snapshotBranch));
+                }
+            }
+        }
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
deltaSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            // Include delta partition if:
+            // - No snapshot exists for this group, OR
+            // - Chain key > latest snapshot chain key
+            if (latestPartition == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(latestPartition))
+                            > 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, deltaBranch));
+                }
+            }
+        }
+
+        LOG.info(
+                "ChainTableStreamScan.planStarting [snapshot={}, delta={}]: "
+                        + "{} delta partitions, {} snapshot partitions, "
+                        + "{} latest snapshot groups, {} total splits",
+                snapshotBranch,
+                deltaBranch,
+                deltaSplitsByPartition.size(),
+                snapshotSplitsByPartition.size(),
+                latestChainPartitionPerGroup.size(),
+                allSplits.size());
+
+        startingDone = true;
+        return new DataFilePlan<>(allSplits);
+    }
+
+    /**
+     * Captures the delta branch's latest snapshot id and positions the Phase 
2 stream scan to start
+     * from the next snapshot. This makes the Phase 1 / Phase 2 boundary 
deterministic: Phase 1
+     * reads delta data pinned at the returned snapshot id, Phase 2 starts 
from the snapshot after.
+     *
+     * @return the latest delta snapshot id, or {@code null} if the delta 
branch has no snapshots
+     */
+    @Nullable
+    private Long captureDeltaPosition(FileStoreTable deltaTable) {
+        SnapshotManager deltaSnapshotManager = deltaTable.snapshotManager();
+        Long latestId = deltaSnapshotManager.latestSnapshotId();
+        nextDeltaSnapshotId = latestId != null ? latestId + 1 : 
Snapshot.FIRST_SNAPSHOT_ID;
+        LOG.info(
+                "ChainTableStreamScan: pinned delta branch '{}' at snapshot 
{}, "
+                        + "nextDeltaSnapshotId={}",
+                deltaTable.coreOptions().branch(),
+                latestId,
+                nextDeltaSnapshotId);
+        deltaStreamScan.restore(nextDeltaSnapshotId);
+        return latestId;
+    }
+
+    /** Plans a scan and groups the resulting splits by partition. */
+    private static Map<BinaryRow, List<DataSplit>> 
groupByPartition(DataTableScan scan) {
+        Map<BinaryRow, List<DataSplit>> grouped = new LinkedHashMap<>();
+        for (Split s : scan.plan().splits()) {
+            DataSplit ds = (DataSplit) s;
+            grouped.computeIfAbsent(ds.partition(), k -> new 
ArrayList<>()).add(ds);
+        }
+        return grouped;
+    }
+
+    /**
+     * Extracts a stable group key from a full partition row. When there is no 
group partition (all
+     * fields are chain keys), returns a shared singleton to avoid zero-field 
{@link BinaryRow}
+     * instances that may have inconsistent {@code hashCode}/{@code equals} 
across different
+     * partitions.
+     */
+    private Object toGroupKey(BinaryRow fullPartition) {
+        if (!partitionProjector.hasGroupPartition()) {
+            return Collections.emptyList();
+        }
+        return partitionProjector.extractGroupPartition(fullPartition);
+    }
+
+    /**
+     * Converts a {@link DataSplit} to a {@link ChainSplit} where all files 
belong to the given
+     * branch. The partition value is preserved as-is (no rewriting).
+     */
+    private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, 
String branch) {

Review Comment:
   **[Maintainability] `ChainSplit` construction is duplicated in three 
places.**
   
   The per-file loop that builds `fileBranchMapping` + `fileBucketPathMapping` 
and then calls `new ChainSplit(...)` appears here in `dataSplitToChainSplit()`, 
and twice in `ChainGroupReadTable` (`plan()` around L249 and around L408). Any 
future change to how a `ChainSplit` is built (e.g. adding a field) has to be 
applied in all three; missing one would silently diverge the read/write paths.
   
   Suggest extracting a single factory, e.g. `ChainSplit.from(DataSplit 
dataSplit, String branch)`, and using it everywhere.



##########
paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Streaming scan for chain tables with a two-phase design:
+ *
+ * <ul>
+ *   <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per 
group) and delta
+ *       partitions that come after it. Older snapshot partitions are excluded 
as they are
+ *       considered outdated. Each primary key appears exactly once under its 
natural partition.
+ *       Unlike batch full scan, anchor-based chain merging is intentionally 
skipped to keep Phase 1
+ *       lightweight — this avoids split explosion in long-running jobs with 
many partitions.
+ *   <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta 
branch only, picking up
+ *       from where Phase 1 left off.
+ * </ul>
+ *
+ * <p>Checkpoint state is a single {@code Long} — the delta branch's next 
snapshot id. On stateful
+ * restart, Phase 1 is skipped and incremental streaming resumes from the 
checkpointed position. On
+ * stateless restart (null state), a fresh starting scan is performed.
+ */
+public class ChainTableStreamScan implements StreamDataTableScan {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableStreamScan.class);
+
+    private final ChainGroupReadTable chainGroupReadTable;
+
+    /** Phase 1: batch scan used to access snapshot branch data via {@code 
mainScan}. */
+    private final ChainGroupReadTable.ChainTableBatchScan batchScan;
+
+    /** Phase 2: delta-only stream scan. */
+    private final DataTableStreamScan deltaStreamScan;
+
+    /** Projector for splitting full partition into group and chain parts. */
+    private final ChainPartitionProjector partitionProjector;
+
+    /** Comparator for chain partition keys only. */
+    private final RecordComparator chainPartitionComparator;
+
+    /**
+     * Checkpoint state: the next delta snapshot id to read. Null before Phase 
1 completes; non-null
+     * once Phase 1 is done or after a stateful restore.
+     */
+    @Nullable private Long nextDeltaSnapshotId;
+
+    /** Whether the starting plan (Phase 1) has been completed. */
+    private boolean startingDone = false;
+
+    /** Predicates and shard for applying to local scans created in {@link 
#planStarting()}. */
+    private final List<Predicate> predicates = new ArrayList<>();
+
+    private int shardIndex = -1;
+
+    private int shardCount = -1;
+
+    public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
+        this.chainGroupReadTable = chainGroupReadTable;
+        this.batchScan =
+                new ChainGroupReadTable.ChainTableBatchScan(
+                        chainGroupReadTable.schema(), chainGroupReadTable);
+        this.deltaStreamScan = (DataTableStreamScan) 
chainGroupReadTable.other().newStreamScan();
+
+        // Initialize partition projector and chain comparator using the 
established pattern
+        // from ChainTableBatchScan.
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(
+                        chainGroupReadTable.coreOptions(),
+                        chainGroupReadTable.schema().partitionKeys());
+        this.partitionProjector =
+                new ChainPartitionProjector(
+                        chainGroupReadTable.schema().logicalPartitionType(), 
chainKeys.size());
+        this.chainPartitionComparator =
+                CodeGenUtils.newRecordComparator(
+                        
partitionProjector.chainPartitionType().getFieldTypes());
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (!startingDone) {
+            return StartingContext.EMPTY;
+        }
+        return deltaStreamScan.startingContext();
+    }
+
+    @Override
+    public TableScan.Plan plan() {
+        if (!startingDone) {
+            return planStarting();
+        }
+        TableScan.Plan plan = deltaStreamScan.plan();
+        // Never return SnapshotNotExistPlan — it would cause the Flink 
enumerator to
+        // set stopTriggerScan=true and permanently stop polling for new data.
+        if (plan instanceof SnapshotNotExistPlan) {
+            return new DataFilePlan<>(Collections.emptyList());
+        }
+        return plan;
+    }
+
+    /**
+     * Starting plan: outputs the latest snapshot partition (per group) and 
delta partitions that
+     * come after it. Older snapshot partitions are excluded. Each primary key 
appears exactly once
+     * under its natural partition.
+     *
+     * <p>Unlike batch full scan, anchor-based chain merging is not performed. 
This keeps Phase 1
+     * lightweight for long-running jobs.
+     */
+    private TableScan.Plan planStarting() {
+        FileStoreTable deltaTable = chainGroupReadTable.other();
+        String deltaBranch = deltaTable.coreOptions().branch();
+        String snapshotBranch = 
chainGroupReadTable.wrapped.coreOptions().branch();
+
+        Long latestId = captureDeltaPosition(deltaTable);
+
+        // 1. Read delta branch data at the pinned snapshot, grouped by 
partition.
+        Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition;
+        if (latestId != null) {
+            FileStoreTable pinnedDelta =
+                    deltaTable.copy(
+                            Collections.singletonMap("scan.snapshot-id", 
String.valueOf(latestId)));
+            DataTableScan pinnedDeltaScan = pinnedDelta.newScan();
+            applyPredicatesAndShard(pinnedDeltaScan);
+            deltaSplitsByPartition = groupByPartition(pinnedDeltaScan);
+        } else {
+            deltaSplitsByPartition = Collections.emptyMap();
+        }
+
+        // 2. Read all snapshot branch data, grouped by partition.
+        // Reuse batchScan.mainScan which has predicates/shard already applied.
+        Map<BinaryRow, List<DataSplit>> snapshotSplitsByPartition =
+                
chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null
+                        ? groupByPartition(batchScan.mainScan)
+                        : Collections.emptyMap();
+
+        // 3. Find the latest snapshot partition per group (based on chain 
partition keys).
+        //    Only output the latest snapshot partition and delta partitions 
after it.
+        Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>();
+        for (BinaryRow partition : snapshotSplitsByPartition.keySet()) {
+            Object groupKey = toGroupKey(partition);
+            BinaryRow existingLatest = 
latestChainPartitionPerGroup.get(groupKey);
+            if (existingLatest == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(existingLatest))
+                            > 0) {
+                latestChainPartitionPerGroup.put(groupKey, partition);
+            }
+        }
+
+        // 4. Build ChainSplits:
+        //    - For snapshot partitions: only include if chain key == latest 
for that group.
+        //    - For delta partitions: include if (a) chain key > latest for 
that group, or
+        //      (b) no snapshot exists for that group.
+        List<Split> allSplits = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
snapshotSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            if (chainPartitionComparator.compare(
+                            
partitionProjector.extractChainPartition(partition),
+                            
partitionProjector.extractChainPartition(latestPartition))
+                    == 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, snapshotBranch));
+                }
+            }
+        }
+
+        for (Map.Entry<BinaryRow, List<DataSplit>> entry : 
deltaSplitsByPartition.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            Object groupKey = toGroupKey(partition);
+            BinaryRow latestPartition = 
latestChainPartitionPerGroup.get(groupKey);
+            // Include delta partition if:
+            // - No snapshot exists for this group, OR
+            // - Chain key > latest snapshot chain key
+            if (latestPartition == null
+                    || chainPartitionComparator.compare(
+                                    
partitionProjector.extractChainPartition(partition),
+                                    
partitionProjector.extractChainPartition(latestPartition))
+                            > 0) {
+                for (DataSplit ds : entry.getValue()) {
+                    allSplits.add(dataSplitToChainSplit(ds, deltaBranch));
+                }
+            }
+        }
+
+        LOG.info(
+                "ChainTableStreamScan.planStarting [snapshot={}, delta={}]: "
+                        + "{} delta partitions, {} snapshot partitions, "
+                        + "{} latest snapshot groups, {} total splits",
+                snapshotBranch,
+                deltaBranch,
+                deltaSplitsByPartition.size(),
+                snapshotSplitsByPartition.size(),
+                latestChainPartitionPerGroup.size(),
+                allSplits.size());
+
+        startingDone = true;
+        return new DataFilePlan<>(allSplits);
+    }
+
+    /**
+     * Captures the delta branch's latest snapshot id and positions the Phase 
2 stream scan to start
+     * from the next snapshot. This makes the Phase 1 / Phase 2 boundary 
deterministic: Phase 1
+     * reads delta data pinned at the returned snapshot id, Phase 2 starts 
from the snapshot after.
+     *
+     * @return the latest delta snapshot id, or {@code null} if the delta 
branch has no snapshots
+     */
+    @Nullable
+    private Long captureDeltaPosition(FileStoreTable deltaTable) {
+        SnapshotManager deltaSnapshotManager = deltaTable.snapshotManager();
+        Long latestId = deltaSnapshotManager.latestSnapshotId();
+        nextDeltaSnapshotId = latestId != null ? latestId + 1 : 
Snapshot.FIRST_SNAPSHOT_ID;
+        LOG.info(
+                "ChainTableStreamScan: pinned delta branch '{}' at snapshot 
{}, "
+                        + "nextDeltaSnapshotId={}",
+                deltaTable.coreOptions().branch(),
+                latestId,
+                nextDeltaSnapshotId);
+        deltaStreamScan.restore(nextDeltaSnapshotId);
+        return latestId;
+    }
+
+    /** Plans a scan and groups the resulting splits by partition. */
+    private static Map<BinaryRow, List<DataSplit>> 
groupByPartition(DataTableScan scan) {
+        Map<BinaryRow, List<DataSplit>> grouped = new LinkedHashMap<>();
+        for (Split s : scan.plan().splits()) {
+            DataSplit ds = (DataSplit) s;
+            grouped.computeIfAbsent(ds.partition(), k -> new 
ArrayList<>()).add(ds);
+        }
+        return grouped;
+    }
+
+    /**
+     * Extracts a stable group key from a full partition row. When there is no 
group partition (all
+     * fields are chain keys), returns a shared singleton to avoid zero-field 
{@link BinaryRow}
+     * instances that may have inconsistent {@code hashCode}/{@code equals} 
across different
+     * partitions.
+     */
+    private Object toGroupKey(BinaryRow fullPartition) {
+        if (!partitionProjector.hasGroupPartition()) {
+            return Collections.emptyList();
+        }
+        return partitionProjector.extractGroupPartition(fullPartition);
+    }
+
+    /**
+     * Converts a {@link DataSplit} to a {@link ChainSplit} where all files 
belong to the given
+     * branch. The partition value is preserved as-is (no rewriting).
+     */
+    private static ChainSplit dataSplitToChainSplit(DataSplit dataSplit, 
String branch) {
+        HashMap<String, String> fileBranchMapping = new HashMap<>();
+        HashMap<String, String> fileBucketPathMapping = new HashMap<>();
+        for (DataFileMeta file : dataSplit.dataFiles()) {
+            fileBranchMapping.put(file.fileName(), branch);
+            fileBucketPathMapping.put(file.fileName(), dataSplit.bucketPath());
+        }
+        return new ChainSplit(
+                dataSplit.partition(),
+                dataSplit.dataFiles(),
+                fileBranchMapping,
+                fileBucketPathMapping);
+    }
+
+    @Override
+    public InnerTableScan withFilter(Predicate predicate) {
+        predicates.add(predicate);
+        batchScan.withFilter(predicate);
+        deltaStreamScan.withFilter(predicate);
+        return this;
+    }
+
+    @Override
+    public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        shardIndex = indexOfThisSubtask;
+        shardCount = numberOfParallelSubtasks;
+        batchScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+        deltaStreamScan.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+        return this;
+    }
+
+    /**
+     * Applies all previously set predicates and shard to a newly created 
scan. Used for the pinned
+     * delta scan in {@link #planStarting()}.
+     */
+    private void applyPredicatesAndShard(DataTableScan scan) {
+        for (Predicate p : predicates) {
+            scan.withFilter(p);
+        }
+        if (shardIndex >= 0) {
+            scan.withShard(shardIndex, shardCount);
+        }
+    }
+
+    @Nullable
+    @Override
+    public Long checkpoint() {
+        return nextDeltaSnapshotId;
+    }
+
+    @Nullable
+    @Override
+    public Long watermark() {
+        if (!startingDone) {
+            return null;
+        }
+        return deltaStreamScan.watermark();
+    }
+
+    @Override
+    public void restore(@Nullable Long nextSnapshotId) {

Review Comment:
   **[Minor] `restore(null)` does not reset `startingDone`, so it cannot 
actually re-run Phase 1.**
   
   When `nextSnapshotId == null` the `if` body is skipped, leaving 
`startingDone` unchanged. On an instance that already finished Phase 1 
(`startingDone == true`), a subsequent `restore(null)` + `plan()` enters Phase 
2 (`deltaStreamScan.plan()`) instead of re-running `planStarting()`.
   
   In the Flink runtime this is harmless because `restore` is always called on 
a fresh scan instance (`startingDone` defaults to false). But it contradicts 
the intent of `testStreamingReadRestoreAfterNewData`, whose comment states 
"restore(null) re-runs Phase 1". Suggest resetting `startingDone = false` here 
so the documented semantics hold.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to