JingsongLi commented on code in PR #8262: URL: https://github.com/apache/paimon/pull/8262#discussion_r3445513694
########## paimon-core/src/main/java/org/apache/paimon/table/ChainTableStreamScan.java: ########## @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.codegen.CodeGenUtils; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.predicate.PartitionPredicateVisitor; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.DataFilePlan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DataTableScan; +import org.apache.paimon.table.source.DataTableStreamScan; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.SnapshotNotExistPlan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamDataTableScan; +import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.table.source.snapshot.StartingContext; +import org.apache.paimon.utils.ChainPartitionProjector; +import org.apache.paimon.utils.ChainTableUtils; +import org.apache.paimon.utils.SnapshotManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Streaming scan for chain tables with a two-phase design: + * + * <ul> + * <li><b>Phase 1 (Starting):</b> Outputs the latest snapshot partition (per group) and delta + * partitions that come after it. Older snapshot partitions are excluded as they are + * considered outdated. Each primary key appears exactly once under its natural partition. + * Unlike batch full scan, anchor-based chain merging is intentionally skipped to keep Phase 1 + * lightweight — this avoids split explosion in long-running jobs with many partitions. + * <li><b>Phase 2 (Incremental):</b> Stream new snapshots from the delta branch only, picking up + * from where Phase 1 left off. + * </ul> + * + * <p>Checkpoint state is a single {@code Long} — the delta branch's next snapshot id. On stateful + * restart, Phase 1 is skipped and incremental streaming resumes from the checkpointed position. On + * stateless restart (null state), a fresh starting scan is performed. + */ +public class ChainTableStreamScan implements StreamDataTableScan { + + private static final Logger LOG = LoggerFactory.getLogger(ChainTableStreamScan.class); + + private final ChainGroupReadTable chainGroupReadTable; + + /** Phase 1: batch scan used to access snapshot branch data via {@code mainScan}. */ + private final ChainGroupReadTable.ChainTableBatchScan batchScan; + + /** Phase 2: delta-only stream scan. */ + private final DataTableStreamScan deltaStreamScan; + + /** Projector for splitting full partition into group and chain parts. */ + private final ChainPartitionProjector partitionProjector; + + /** Comparator for chain partition keys only. */ + private final RecordComparator chainPartitionComparator; + + /** Partition keys of the table, used to reject partition filters in streaming mode. */ + private final List<String> partitionKeys; + + /** + * Checkpoint state: the next delta snapshot id to read. Null before Phase 1 completes; non-null + * once Phase 1 is done or after a stateful restore. + */ + @Nullable private Long nextDeltaSnapshotId; + + /** Whether the starting plan (Phase 1) has been completed. */ + private boolean startingDone = false; + + /** Predicates and shard for applying to local scans created in {@link #planStarting()}. */ + private final List<Predicate> predicates = new ArrayList<>(); + + private int shardIndex = -1; + + private int shardCount = -1; + + public ChainTableStreamScan(ChainGroupReadTable chainGroupReadTable) { + this.chainGroupReadTable = chainGroupReadTable; + this.batchScan = + new ChainGroupReadTable.ChainTableBatchScan( + chainGroupReadTable.schema(), chainGroupReadTable); + this.deltaStreamScan = (DataTableStreamScan) chainGroupReadTable.other().newStreamScan(); + + // Initialize partition projector and chain comparator using the established pattern + // from ChainTableBatchScan. + List<String> chainKeys = + ChainTableUtils.chainPartitionKeys( + chainGroupReadTable.coreOptions(), + chainGroupReadTable.schema().partitionKeys()); + this.partitionProjector = + new ChainPartitionProjector( + chainGroupReadTable.schema().logicalPartitionType(), chainKeys.size()); + this.chainPartitionComparator = + CodeGenUtils.newRecordComparator( + partitionProjector.chainPartitionType().getFieldTypes()); + this.partitionKeys = chainGroupReadTable.schema().partitionKeys(); + } + + @Override + public StartingContext startingContext() { + if (!startingDone) { + return StartingContext.EMPTY; + } + return deltaStreamScan.startingContext(); + } + + @Override + public TableScan.Plan plan() { + if (!startingDone) { + return planStarting(); + } + TableScan.Plan plan = deltaStreamScan.plan(); + // Never return SnapshotNotExistPlan — it would cause the Flink enumerator to + // set stopTriggerScan=true and permanently stop polling for new data. + if (plan instanceof SnapshotNotExistPlan) { + return new DataFilePlan<>(Collections.emptyList()); + } + return plan; + } + + /** + * Starting plan: outputs the latest snapshot partition (per group) and delta partitions that + * come after it. Older snapshot partitions are excluded. Each primary key appears exactly once + * under its natural partition. + * + * <p>Unlike batch full scan, anchor-based chain merging is not performed. This keeps Phase 1 + * lightweight for long-running jobs. + */ + private TableScan.Plan planStarting() { + FileStoreTable deltaTable = chainGroupReadTable.other(); + String deltaBranch = deltaTable.coreOptions().branch(); + String snapshotBranch = chainGroupReadTable.wrapped.coreOptions().branch(); + + Long latestId = captureDeltaPosition(deltaTable); + + // 1. Read delta branch data at the pinned snapshot, grouped by partition. + Map<BinaryRow, List<DataSplit>> deltaSplitsByPartition; + if (latestId != null) { + FileStoreTable pinnedDelta = + deltaTable.copy( + Collections.singletonMap("scan.snapshot-id", String.valueOf(latestId))); + DataTableScan pinnedDeltaScan = pinnedDelta.newScan(); + applyPredicatesAndShard(pinnedDeltaScan); + deltaSplitsByPartition = groupByPartition(pinnedDeltaScan); + } else { + deltaSplitsByPartition = Collections.emptyMap(); + } + + // 2. List snapshot partitions (lightweight — partition metadata only, no file I/O). + // Find the latest chain partition per group, then scan only those partitions for files. + // This avoids reading file manifests for hundreds of historical partitions that will be + // discarded (only the latest per group is kept). + Map<Object, BinaryRow> latestChainPartitionPerGroup = new HashMap<>(); + if (chainGroupReadTable.wrapped.snapshotManager().latestSnapshotId() != null) { + DataTableScan partitionListingScan = chainGroupReadTable.wrapped.newScan(); + applyPredicatesAndShard(partitionListingScan); Review Comment: Phase 1 uses this scan to discover the latest snapshot partition per group, but `applyPredicatesAndShard` also applies row predicates before `listPartitions()`. That makes the chain boundary depend on the query predicate instead of the snapshot branch state. For example, if the latest snapshot partition no longer has `k = 1` but an older delta partition still does, `SELECT ... WHERE k = 1` can make this listing miss the latest snapshot partition and then include the old delta row, even though that partition should be considered outdated by the latest full snapshot. Please keep the partition-discovery scan free of row/shard filters and apply those predicates only when scanning the already-selected snapshot/delta splits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
