This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 43376f1976 [core] Reduce file store scan for overwriting if new 
changes doesn't touch the target partitions (#7894)
43376f1976 is described below

commit 43376f1976ec20adb3094ac7cd2267a3e9347ba5
Author: yuzelin <[email protected]>
AuthorDate: Wed May 20 19:41:32 2026 +0800

    [core] Reduce file store scan for overwriting if new changes doesn't touch 
the target partitions (#7894)
---
 .../paimon/operation/FileStoreCommitImpl.java      |  11 +-
 .../paimon/operation/commit/CommitScanner.java     |  61 ++--
 .../operation/commit/OverwriteChangesProvider.java | 230 +++++++++++++++
 .../commit/OverwriteChangesProviderTest.java       | 323 +++++++++++++++++++++
 4 files changed, 576 insertions(+), 49 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 29ac8b5a3e..0032d69adf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -190,7 +190,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestList = manifestListFactory.create();
         this.indexManifestFile = indexManifestFileFactory.create();
         this.rollback = rollback;
-        this.scanner = new CommitScanner(scanSupplier, indexManifestFile, 
options);
+        this.scanner = new CommitScanner(scanSupplier, snapshotManager, 
indexManifestFile, options);
         this.commitPreCallbacks = commitPreCallbacks;
         this.commitCallbacks = commitCallbacks;
         this.retryWaiter =
@@ -784,13 +784,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             @Nullable Long watermark,
             Map<String, String> properties) {
         return tryCommit(
-                latestSnapshot ->
-                        scanner.readOverwriteChanges(
-                                options.bucket(),
-                                changes,
-                                indexFiles,
-                                latestSnapshot,
-                                partitionFilter),
+                scanner.overwriteChangesProvider(
+                        options.bucket(), changes, indexFiles, 
partitionFilter),
                 identifier,
                 watermark,
                 properties,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index e43f27ed89..e63e76590d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -21,15 +21,14 @@ package org.apache.paimon.operation.commit;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -42,22 +41,23 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 
-import static java.util.Collections.emptyList;
-
 /** Manifest entries scanner for commit. */
 public class CommitScanner {
 
     private final FileStoreScan scan;
     private final Supplier<FileStoreScan> scanSupplier;
+    private final SnapshotManager snapshotManager;
     private final IndexManifestFile indexManifestFile;
     private final boolean dropStats;
 
     public CommitScanner(
             Supplier<FileStoreScan> scanSupplier,
+            SnapshotManager snapshotManager,
             IndexManifestFile indexManifestFile,
             CoreOptions options) {
         this.scanSupplier = scanSupplier;
         this.scan = scanSupplier.get();
+        this.snapshotManager = snapshotManager;
         this.indexManifestFile = indexManifestFile;
         // Stats in DELETE Manifest Entries is useless
         this.dropStats = options.manifestDeleteFileDropStats();
@@ -129,46 +129,25 @@ public class CommitScanner {
         }
     }
 
-    public CommitChanges readOverwriteChanges(
+    /**
+     * Returns a stateful {@link CommitChangesProvider} for overwrite 
operations. The returned
+     * provider caches the current files of the target partitions across 
retries and only walks
+     * delta manifests when the latest snapshot advances, avoiding repeated 
full scans on every
+     * commit retry.
+     */
+    public CommitChangesProvider overwriteChangesProvider(
             int numBucket,
             List<ManifestEntry> changes,
             List<IndexManifestEntry> indexFiles,
-            @Nullable Snapshot latestSnapshot,
             @Nullable PartitionPredicate partitionFilter) {
-        List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
-        List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
-        if (latestSnapshot != null) {
-            scan.withSnapshot(latestSnapshot)
-                    .withPartitionFilter(partitionFilter)
-                    .withKind(ScanMode.ALL);
-            if (numBucket != BucketMode.POSTPONE_BUCKET) {
-                // bucket = -2 can only be overwritten in postpone bucket 
tables
-                scan.withBucketFilter(bucket -> bucket >= 0);
-            }
-            List<ManifestEntry> currentEntries = scan.plan().files();
-            for (ManifestEntry entry : currentEntries) {
-                changesWithOverwrite.add(
-                        ManifestEntry.create(
-                                FileKind.DELETE,
-                                entry.partition(),
-                                entry.bucket(),
-                                entry.totalBuckets(),
-                                entry.file()));
-            }
-
-            // collect index files
-            if (latestSnapshot.indexManifest() != null) {
-                List<IndexManifestEntry> entries =
-                        indexManifestFile.read(latestSnapshot.indexManifest());
-                for (IndexManifestEntry entry : entries) {
-                    if (partitionFilter == null || 
partitionFilter.test(entry.partition())) {
-                        indexChangesWithOverwrite.add(entry.toDeleteEntry());
-                    }
-                }
-            }
-        }
-        changesWithOverwrite.addAll(changes);
-        indexChangesWithOverwrite.addAll(indexFiles);
-        return new CommitChanges(changesWithOverwrite, emptyList(), 
indexChangesWithOverwrite);
+        return new OverwriteChangesProvider(
+                scanSupplier,
+                snapshotManager,
+                indexManifestFile,
+                dropStats,
+                numBucket,
+                changes,
+                indexFiles,
+                partitionFilter);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
new file mode 100644
index 0000000000..be8a4068be
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * A stateful {@link CommitChangesProvider} for overwrite operations that 
caches the current files
+ * of the target partitions across commit retries to avoid repeated full scans.
+ *
+ * <p>On retry, if the latest snapshot has changed, the cache is reused only 
when the snapshots in
+ * between have not touched the target partitions; otherwise it is rebuilt by 
a full scan.
+ */
+public class OverwriteChangesProvider implements CommitChangesProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OverwriteChangesProvider.class);
+
+    private final Supplier<FileStoreScan> scanSupplier;
+    private final SnapshotManager snapshotManager;
+    private final IndexManifestFile indexManifestFile;
+    private final boolean dropStats;
+    private final int numBucket;
+    private final List<ManifestEntry> newChanges;
+    private final List<IndexManifestEntry> newIndexFiles;
+    @Nullable private final PartitionPredicate partitionFilter;
+
+    @Nullable private Snapshot cachedSnapshot;
+    private final List<ManifestEntry> cachedManifestEntries = new 
ArrayList<>();
+    private final List<IndexManifestEntry> cachedIndexManifestEntries = new 
ArrayList<>();
+
+    @VisibleForTesting int fullScanManifestCount;
+    @VisibleForTesting int fullScanIndexCount;
+    @VisibleForTesting int deltaProbeCount;
+
+    public OverwriteChangesProvider(
+            Supplier<FileStoreScan> scanSupplier,
+            SnapshotManager snapshotManager,
+            IndexManifestFile indexManifestFile,
+            boolean dropStats,
+            int numBucket,
+            List<ManifestEntry> newChanges,
+            List<IndexManifestEntry> newIndexFiles,
+            @Nullable PartitionPredicate partitionFilter) {
+        this.scanSupplier = scanSupplier;
+        this.snapshotManager = snapshotManager;
+        this.indexManifestFile = indexManifestFile;
+        this.dropStats = dropStats;
+        this.numBucket = numBucket;
+        this.newChanges = newChanges;
+        this.newIndexFiles = newIndexFiles;
+        this.partitionFilter = partitionFilter;
+    }
+
+    @Override
+    public CommitChanges provide(@Nullable Snapshot latestSnapshot) {
+        if (latestSnapshot == null) {
+            return new CommitChanges(newChanges, Collections.emptyList(), 
newIndexFiles);
+        }
+
+        if (cachedSnapshot == null) {
+            fullScanManifestEntries(latestSnapshot);
+            fullScanIndexManifestEntries(latestSnapshot);
+            cachedSnapshot = latestSnapshot;
+        } else {
+            if (cachedSnapshot.id() > latestSnapshot.id()) {
+                throw new IllegalStateException(
+                        "Cached snapshot id "
+                                + cachedSnapshot.id()
+                                + " is greater than latest snapshot id "
+                                + latestSnapshot.id());
+            }
+
+            if (cachedSnapshot.id() < latestSnapshot.id()) {
+                updateCache(latestSnapshot);
+            }
+        }
+
+        return buildResult();
+    }
+
+    private void fullScanManifestEntries(Snapshot latestSnapshot) {
+        cachedManifestEntries.clear();
+        FileStoreScan scan =
+                newScan()
+                        .withSnapshot(latestSnapshot)
+                        .withPartitionFilter(partitionFilter)
+                        .withKind(ScanMode.ALL);
+        cachedManifestEntries.addAll(scan.plan().files());
+        fullScanManifestCount++;
+    }
+
+    private void fullScanIndexManifestEntries(Snapshot latestSnapshot) {
+        cachedIndexManifestEntries.clear();
+        if (latestSnapshot.indexManifest() != null) {
+            for (IndexManifestEntry entry :
+                    indexManifestFile.read(latestSnapshot.indexManifest())) {
+                if (partitionFilter == null || 
partitionFilter.test(entry.partition())) {
+                    cachedIndexManifestEntries.add(entry);
+                }
+            }
+        }
+        fullScanIndexCount++;
+    }
+
+    private void updateCache(Snapshot latestSnapshot) {
+        if (!canUseManifestEntriesCache(latestSnapshot)) {
+            fullScanManifestEntries(latestSnapshot);
+        }
+        if (!canUseIndexManifestEntriesCache(latestSnapshot)) {
+            fullScanIndexManifestEntries(latestSnapshot);
+        }
+        cachedSnapshot = latestSnapshot;
+    }
+
+    private boolean canUseManifestEntriesCache(Snapshot latestSnapshot) {
+        if (partitionFilter == null) {
+            // If overwrite the whole table, any commit between snapshots must 
touch the target,
+            // skip the delta probe and force a full scan
+            return false;
+        }
+        for (long id = cachedSnapshot.id() + 1; id <= latestSnapshot.id(); 
id++) {
+            deltaProbeCount++;
+            try {
+                Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
+                if (snapshot.commitKind() != CommitKind.APPEND) {
+                    // Only APPEND snapshots produce a reliable DELTA manifest 
for probing. For
+                    // example, OVERWRITE snapshot might rewrite or reorganize 
manifests in ways
+                    // that make the DELTA probe unreliable.
+                    return false;
+                }
+                FileStoreScan scan =
+                        newScan()
+                                .withSnapshot(snapshot)
+                                .withPartitionFilter(partitionFilter)
+                                .withKind(ScanMode.DELTA);
+                Iterator<ManifestEntry> iterator = scan.readFileIterator();
+                if (iterator.hasNext()) {
+                    LOG.info(
+                            "Cannot advance cache from {} to {} because 
snapshot {} has changed target partitions.",
+                            cachedSnapshot.id(),
+                            latestSnapshot.id(),
+                            id);
+                    return false;
+                }
+            } catch (Exception e) {
+                // For example, the snapshot is being expired. Using full scan 
is safe.
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean canUseIndexManifestEntriesCache(Snapshot latestSnapshot) {
+        return Objects.equals(cachedSnapshot.indexManifest(), 
latestSnapshot.indexManifest());
+    }
+
+    private FileStoreScan newScan() {
+        FileStoreScan scan = scanSupplier.get();
+        if (dropStats) {
+            scan.dropStats();
+        }
+        if (numBucket != BucketMode.POSTPONE_BUCKET) {
+            scan.withBucketFilter(bucket -> bucket >= 0);
+        }
+        return scan;
+    }
+
+    private CommitChanges buildResult() {
+        List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+        List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
+
+        for (ManifestEntry entry : cachedManifestEntries) {
+            changesWithOverwrite.add(
+                    ManifestEntry.create(
+                            FileKind.DELETE,
+                            entry.partition(),
+                            entry.bucket(),
+                            entry.totalBuckets(),
+                            entry.file()));
+        }
+        for (IndexManifestEntry entry : cachedIndexManifestEntries) {
+            indexChangesWithOverwrite.add(entry.toDeleteEntry());
+        }
+
+        changesWithOverwrite.addAll(newChanges);
+        indexChangesWithOverwrite.addAll(newIndexFiles);
+
+        return new CommitChanges(
+                changesWithOverwrite, Collections.emptyList(), 
indexChangesWithOverwrite);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
new file mode 100644
index 0000000000..7aa8a16023
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OverwriteChangesProvider}. */
+public class OverwriteChangesProviderTest {
+
+    private TestKeyValueGenerator gen;
+    @TempDir java.nio.file.Path tempDir;
+
+    @BeforeEach
+    public void beforeEach() {
+        gen = new TestKeyValueGenerator();
+    }
+
+    @Test
+    public void testManifestEntriesCache() throws Exception {
+        TestFileStore store = createStore(1, Collections.emptyMap());
+        KeyValue targetRecord = record("20260501", 8);
+        BinaryRow targetPartition = gen.getPartition(targetRecord);
+        store.commitData(
+                Arrays.asList(targetRecord, record("20260502", 8)), 
gen::getPartition, kv -> 0);
+
+        Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+        PartitionPredicate partitionFilter = partitionFilter(store, 
targetPartition);
+        OverwriteChangesProvider provider =
+                provider(store, partitionFilter, Collections.emptyList());
+
+        CommitChanges first = provider.provide(snapshot1);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+        assertThat(provider.deltaProbeCount).isZero();
+
+        store.commitData(
+                Collections.singletonList(record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+        CommitChanges afterUnrelatedAppend = provider.provide(snapshot2);
+
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+        assertThat(provider.deltaProbeCount).isEqualTo(1);
+        assertThat(identifiers(afterUnrelatedAppend.tableFiles))
+                
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+
+        store.commitData(
+                Collections.singletonList(record("20260501", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+        CommitChanges afterTargetAppend = provider.provide(snapshot3);
+
+        assertThat(provider.fullScanManifestCount).isEqualTo(2);
+        assertThat(provider.deltaProbeCount).isEqualTo(2);
+        assertThat(identifiers(afterTargetAppend.tableFiles))
+                .containsExactlyInAnyOrderElementsOf(
+                        currentIdentifiers(store, snapshot3, partitionFilter));
+        assertThat(afterTargetAppend.tableFiles).allMatch(entry -> 
entry.kind() == FileKind.DELETE);
+    }
+
+    @Test
+    public void testIndexManifestEntriesCache() throws Exception {
+        TestFileStore store = createStore(2, Collections.emptyMap());
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+        KeyValue targetRecord = record("20260501", 8);
+        BinaryRow targetPartition = gen.getPartition(targetRecord);
+        store.commitDataIndex(
+                targetRecord,
+                gen::getPartition,
+                0,
+                indexFileHandler.hashIndex(targetPartition, 0).write(new int[] 
{1, 2, 3}));
+        Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+        PartitionPredicate partitionFilter = partitionFilter(store, 
targetPartition);
+        OverwriteChangesProvider provider =
+                provider(store, partitionFilter, Collections.emptyList());
+
+        CommitChanges first = provider.provide(snapshot1);
+        assertThat(provider.fullScanIndexCount).isEqualTo(1);
+        assertThat(first.indexFiles).hasSize(1);
+        assertThat(first.indexFiles.get(0).kind()).isEqualTo(FileKind.DELETE);
+        
assertThat(first.indexFiles.get(0).partition()).isEqualTo(targetPartition);
+
+        KeyValue unrelatedRecordWithIndex = record("20260502", 8);
+        BinaryRow unrelatedPartition = 
gen.getPartition(unrelatedRecordWithIndex);
+        store.commitDataIndex(
+                unrelatedRecordWithIndex,
+                gen::getPartition,
+                0,
+                indexFileHandler.hashIndex(unrelatedPartition, 0).write(new 
int[] {4, 5, 6}));
+        Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+
+        
assertThat(snapshot2.indexManifest()).isNotEqualTo(snapshot1.indexManifest());
+        CommitChanges afterUnrelatedIndexChange = provider.provide(snapshot2);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+        assertThat(provider.deltaProbeCount).isEqualTo(1);
+        assertThat(provider.fullScanIndexCount).isEqualTo(2);
+        assertThat(afterUnrelatedIndexChange.indexFiles)
+                .containsExactlyInAnyOrderElementsOf(first.indexFiles);
+
+        store.commitData(
+                Collections.singletonList(record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+
+        
assertThat(snapshot3.indexManifest()).isEqualTo(snapshot2.indexManifest());
+        CommitChanges afterUnrelatedDataChange = provider.provide(snapshot3);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+        assertThat(provider.deltaProbeCount).isEqualTo(2);
+        assertThat(provider.fullScanIndexCount).isEqualTo(2);
+        assertThat(afterUnrelatedDataChange.indexFiles)
+                .containsExactlyInAnyOrderElementsOf(first.indexFiles);
+    }
+
+    @Test
+    public void testFullTableOverwriteSkipsDeltaProbe() throws Exception {
+        TestFileStore store = createStore(1, Collections.emptyMap());
+        store.commitData(
+                Collections.singletonList(record("20260501", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+        // partitionFilter == null means overwrite the whole table
+        OverwriteChangesProvider provider = provider(store, null, 
Collections.emptyList());
+        provider.provide(snapshot1);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+        assertThat(provider.deltaProbeCount).isZero();
+
+        store.commitData(
+                Collections.singletonList(record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+        provider.provide(snapshot2);
+
+        // partitionFilter == null short-circuits delta probe and forces 
fullScan
+        assertThat(provider.fullScanManifestCount).isEqualTo(2);
+        assertThat(provider.deltaProbeCount).isZero();
+    }
+
+    @Test
+    public void testExpiredSnapshotFallbacksToFullScan() throws Exception {
+        TestFileStore store = createStore(1, Collections.emptyMap());
+        KeyValue targetRecord = record("20260501", 8);
+        BinaryRow targetPartition = gen.getPartition(targetRecord);
+        store.commitData(
+                Arrays.asList(targetRecord, record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+        PartitionPredicate partitionFilter = partitionFilter(store, 
targetPartition);
+        OverwriteChangesProvider provider =
+                provider(store, partitionFilter, Collections.emptyList());
+        CommitChanges first = provider.provide(snapshot1);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+
+        store.commitData(
+                Collections.singletonList(record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+        store.commitData(
+                Collections.singletonList(record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+
+        // Simulate snapshot expiration of the middle snapshot.
+        store.snapshotManager().deleteSnapshot(snapshot2.id());
+
+        CommitChanges afterExpired = provider.provide(snapshot3);
+
+        // Missing middle snapshot should not fail the commit; fall back to a 
full scan.
+        assertThat(provider.fullScanManifestCount).isEqualTo(2);
+        assertThat(identifiers(afterExpired.tableFiles))
+                
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+    }
+
+    @Test
+    public void testNonAppendSnapshotInvalidatesCache() throws Exception {
+        TestFileStore store = createStore(1, Collections.emptyMap());
+        KeyValue targetRecord = record("20260501", 8);
+        BinaryRow targetPartition = gen.getPartition(targetRecord);
+        store.commitData(
+                Arrays.asList(targetRecord, record("20260502", 8)), 
gen::getPartition, kv -> 0);
+        Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+        PartitionPredicate partitionFilter = partitionFilter(store, 
targetPartition);
+        OverwriteChangesProvider provider =
+                provider(store, partitionFilter, Collections.emptyList());
+        CommitChanges first = provider.provide(snapshot1);
+        assertThat(provider.fullScanManifestCount).isEqualTo(1);
+
+        // Overwrite an unrelated partition — commitKind != APPEND must 
invalidate the cache.
+        Map<String, String> unrelated = new HashMap<>();
+        unrelated.put("dt", "20260502");
+        unrelated.put("hr", "8");
+        store.overwriteData(
+                Collections.singletonList(record("20260502", 8)),
+                gen::getPartition,
+                kv -> 0,
+                unrelated);
+        Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+        CommitChanges afterOverwrite = provider.provide(snapshot2);
+        assertThat(provider.fullScanManifestCount).isEqualTo(2);
+        assertThat(provider.deltaProbeCount).isEqualTo(1);
+        assertThat(identifiers(afterOverwrite.tableFiles))
+                
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+
+        // COMPACT snapshot also invalidates the cache.
+        try (FileStoreCommitImpl commit = store.newCommit()) {
+            commit.compactManifest();
+        }
+        Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+        CommitChanges afterCompact = provider.provide(snapshot3);
+        assertThat(provider.fullScanManifestCount).isEqualTo(3);
+        assertThat(provider.deltaProbeCount).isEqualTo(2);
+        assertThat(identifiers(afterCompact.tableFiles))
+                
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+    }
+
+    private OverwriteChangesProvider provider(
+            TestFileStore store,
+            PartitionPredicate partitionFilter,
+            List<ManifestEntry> newChanges) {
+        return new OverwriteChangesProvider(
+                store::newScan,
+                store.snapshotManager(),
+                store.indexManifestFileFactory().create(),
+                store.options().manifestDeleteFileDropStats(),
+                store.options().bucket(),
+                newChanges,
+                Collections.emptyList(),
+                partitionFilter);
+    }
+
+    private PartitionPredicate partitionFilter(TestFileStore store, BinaryRow 
partition) {
+        return PartitionPredicate.fromMultiple(
+                store.partitionType(), Collections.singletonList(partition));
+    }
+
+    private List<FileEntry.Identifier> currentIdentifiers(
+            TestFileStore store, Snapshot snapshot, PartitionPredicate 
partitionFilter) {
+        return identifiers(
+                store.newScan()
+                        .withSnapshot(snapshot)
+                        .withPartitionFilter(partitionFilter)
+                        .plan()
+                        .files());
+    }
+
+    private List<FileEntry.Identifier> identifiers(List<ManifestEntry> 
entries) {
+        return 
entries.stream().map(ManifestEntry::identifier).collect(Collectors.toList());
+    }
+
+    private KeyValue record(String dt, int hr) {
+        return gen.nextPartitionedData(RowKind.INSERT, dt, hr);
+    }
+
+    private TestFileStore createStore(int numBucket, Map<String, String> 
options) throws Exception {
+        Path path = new Path(tempDir.toUri());
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(new LocalFileIO(), path),
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                                TestKeyValueGenerator.getPrimaryKeys(
+                                        
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+                                options,
+                                null));
+        return new TestFileStore.Builder(
+                        "avro",
+                        TraceableFileIO.SCHEME + "://" + tempDir,
+                        numBucket,
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                        DeduplicateMergeFunction.factory(),
+                        tableSchema)
+                .changelogProducer(CoreOptions.ChangelogProducer.NONE)
+                .build();
+    }
+}

Reply via email to