JingsongLi commented on code in PR #8262:
URL: https://github.com/apache/paimon/pull/8262#discussion_r3445513694


##########
paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java:
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.PartitionPredicateVisitor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.DataTableStreamScan;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.SnapshotNotExistPlan;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.utils.ChainPartitionProjector;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Streaming scan for chain tables with a two-phase design:
+ *
+ * <ul>
+ *   <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per 
group) and delta
+ *       partitions that come after it. Older snapshot partitions are excluded 
as they are
+ *       considered outdated. Each primary key appears exactly once under its 
natural partition.
+ *       Unlike batch full scan, anchor-based chain merging is intentionally 
skipped to keep Phase 1
+ *       lightweight — this avoids split explosion in long-running jobs with 
many partitions.
+ *   <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta 
branch only, picking up
+ *       from where Phase 1 left off.
+ * </ul>
+ *
+ * <p>Checkpoint state is a single {@code Long} — the delta branch's next 
snapshot id. On stateful
+ * restart, Phase 1 is skipped and incremental streaming resumes from the 
checkpointed position. On
+ * stateless restart (null state), a fresh starting scan is performed.
+ */
+public class ChainTableStreamScan implements StreamDataTableScan {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableStreamScan.class);
+
+    private final ChainGroupReadTable chainGroupReadTable;
+
+    /** Phase 1: batch scan used to access snapshot branch data via {@code 
mainScan}. */
+    private final ChainGroupReadTable.ChainTableBatchScan batchScan;
+
+    /** Phase 2: delta-only stream scan. */
+    private final DataTableStreamScan deltaStreamScan;
+
+    /** Projector for splitting full partition into group and chain parts. */
+    private final ChainPartitionProjector partitionProjector;
+
+    /** Comparator for chain partition keys only. */
+    private final RecordComparator chainPartitionComparator;
+
+    /** Partition keys of the table, used to reject partition filters in 
streaming mode. */
+    private final List<String> partitionKeys;
+
+    /**
+     * Checkpoint state: the next delta snapshot id to read. Null before Phase 
1 completes; non-null
+     * once Phase 1 is done or after a stateful restore.
+     */
+    @Nullable private Long nextDeltaSnapshotId;
+
+    /** Whether the starting plan (Phase 1) has been completed. */
+    private boolean startingDone = false;
+
+    /** Predicates and shard for applying to local scans created in {@link 
#planStarting()}. */
+    private final List<Predicate> predicates = new ArrayList<>();
+
+    private int shardIndex = -1;
+
+    private int shardCount = -1;
+
+    public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) {
+        this.chainGroupReadTable = chainGroupReadTable;
+        this.batchScan =
+                new ChainGroupReadTable.ChainTableBatchScan(
+                        chainGroupReadTable.schema(), chainGroupReadTable);
+        this.deltaStreamScan = (DataTableStreamScan) 
chainGroupReadTable.other().newStreamScan();
+
+        // Initialize partition projector and chain comparator using the 
established pattern
+        // from ChainTableBatchScan.
+        List<String> chainKeys =
+                ChainTableUtils.chainPartitionKeys(
+                        chainGroupReadTable.coreOptions(),
+                        chainGroupReadTable.schema().partitionKeys());
+        this.partitionProjector =
+                new ChainPartitionProjector(
+                        chainGroupReadTable.schema().logicalPartitionType(), 
chainKeys.size());
+        this.chainPartitionComparator =
+                CodeGenUtils.newRecordComparator(
+                        
partitionProjector.chainPartitionType().getFieldTypes());
+        this.partitionKeys = chainGroupReadTable.schema().partitionKeys();
+    }
+
+    @Override
+    public StartingContext startingContext() {
+        if (!startingDone) {
+            return StartingContext.EMPTY;
+        }
+        return deltaStreamScan.startingContext();
+    }
+
+    @Override
+    public TableScan.Plan plan() {
+        if (!startingDone) {
+            return planStarting();
+        }
+        TableScan.Plan plan = deltaStreamScan.plan();
+        // Never return SnapshotNotExistPlan — it would cause the Flink 
enumerator to
+        // set stopTriggerScan=true and permanently stop polling for new data.
+        if (plan instanceof SnapshotNotExistPlan) {
+            return new DataFilePlan<>(Collections.emptyList());
+        }
+        return plan;
+    }
+
+    /**
+     * Starting plan: outputs the latest snapshot partition (per group) and 
delta partitions that
+     * come after it. Older snapshot partitions are excluded. Each primary key 
appears exactly once
+     * under its natural partition.
+     *
+     * <p>Unlike batch full scan, anchor-based chain merging is not performed. 
This keeps Phase 1
+     * lightweight for long-running jobs.
+     */
+    private TableScan.Plan planStarting() {
+        FileStoreTable deltaTable = chainGroupReadTable.other();
+        String deltaBranch = deltaTable.coreOptions().branch();
+        String snapshotBranch = 
chainGroupReadTable.wrapped.coreOptions().branch();
+
+        Long latestId = captureDeltaPosition(deltaTable);
+
+        // 1. Read delta branch data at the pinned snapshot, grouped by 
partition.
+        Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition;
+        if (latestId != null) {
+            FileStoreTable pinnedDelta =
+                    deltaTable.copy(
+                            Collections.singletonMap("scan.snapshot-id", 
String.valueOf(latestId)));
+            DataTableScan pinnedDeltaScan = pinnedDelta.newScan();
+            applyPredicatesAndShard(pinnedDeltaScan);
+            deltaSplitsByPartition = groupByPartition(pinnedDeltaScan);
+        } else {
+            deltaSplitsByPartition = Collections.emptyMap();
+        }
+
+        // 2. List snapshot partitions (lightweight — partition metadata only, 
no file I/O).
+        //    Find the latest chain partition per group, then scan only those 
partitions for files.
+        //    This avoids reading file manifests for hundreds of historical 
partitions that will be
+        //    discarded (only the latest per group is kept).
+        Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>();
+        if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() 
!= null) {
+            DataTableScan partitionListingScan = 
chainGroupReadTable.wrapped.newScan();
+            applyPredicatesAndShard(partitionListingScan);

Review Comment:
   Phase 1 uses this scan to discover the latest snapshot partition per group, 
but `applyPredicatesAndShard` also applies row predicates before 
`listPartitions()`. That makes the chain boundary depend on the query predicate 
instead of the snapshot branch state. For example, if the latest snapshot 
partition no longer has `k = 1` but an older delta partition still does, 
`SELECT ... WHERE k = 1` can make this listing miss the latest snapshot 
partition and then include the old delta row, even though that partition should 
be considered outdated by the latest full snapshot. Please keep the 
partition-discovery scan free of row/shard filters and apply those predicates 
only when scanning the already-selected snapshot/delta splits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to