This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 43376f1976 [core] Reduce file store scan for overwriting if new
changes doesn't touch the target partitions (#7894)
43376f1976 is described below
commit 43376f1976ec20adb3094ac7cd2267a3e9347ba5
Author: yuzelin <[email protected]>
AuthorDate: Wed May 20 19:41:32 2026 +0800
[core] Reduce file store scan for overwriting if new changes doesn't touch
the target partitions (#7894)
---
.../paimon/operation/FileStoreCommitImpl.java | 11 +-
.../paimon/operation/commit/CommitScanner.java | 61 ++--
.../operation/commit/OverwriteChangesProvider.java | 230 +++++++++++++++
.../commit/OverwriteChangesProviderTest.java | 323 +++++++++++++++++++++
4 files changed, 576 insertions(+), 49 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 29ac8b5a3e..0032d69adf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -190,7 +190,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
- this.scanner = new CommitScanner(scanSupplier, indexManifestFile,
options);
+ this.scanner = new CommitScanner(scanSupplier, snapshotManager,
indexManifestFile, options);
this.commitPreCallbacks = commitPreCallbacks;
this.commitCallbacks = commitCallbacks;
this.retryWaiter =
@@ -784,13 +784,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Nullable Long watermark,
Map<String, String> properties) {
return tryCommit(
- latestSnapshot ->
- scanner.readOverwriteChanges(
- options.bucket(),
- changes,
- indexFiles,
- latestSnapshot,
- partitionFilter),
+ scanner.overwriteChangesProvider(
+ options.bucket(), changes, indexFiles,
partitionFilter),
identifier,
watermark,
properties,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index e43f27ed89..e63e76590d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -21,15 +21,14 @@ package org.apache.paimon.operation.commit;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -42,22 +41,23 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
-import static java.util.Collections.emptyList;
-
/** Manifest entries scanner for commit. */
public class CommitScanner {
private final FileStoreScan scan;
private final Supplier<FileStoreScan> scanSupplier;
+ private final SnapshotManager snapshotManager;
private final IndexManifestFile indexManifestFile;
private final boolean dropStats;
public CommitScanner(
Supplier<FileStoreScan> scanSupplier,
+ SnapshotManager snapshotManager,
IndexManifestFile indexManifestFile,
CoreOptions options) {
this.scanSupplier = scanSupplier;
this.scan = scanSupplier.get();
+ this.snapshotManager = snapshotManager;
this.indexManifestFile = indexManifestFile;
// Stats in DELETE Manifest Entries is useless
this.dropStats = options.manifestDeleteFileDropStats();
@@ -129,46 +129,25 @@ public class CommitScanner {
}
}
- public CommitChanges readOverwriteChanges(
+ /**
+ * Returns a stateful {@link CommitChangesProvider} for overwrite
operations. The returned
+ * provider caches the current files of the target partitions across
retries and only walks
+ * delta manifests when the latest snapshot advances, avoiding repeated
full scans on every
+ * commit retry.
+ */
+ public CommitChangesProvider overwriteChangesProvider(
int numBucket,
List<ManifestEntry> changes,
List<IndexManifestEntry> indexFiles,
- @Nullable Snapshot latestSnapshot,
@Nullable PartitionPredicate partitionFilter) {
- List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
- List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
- if (latestSnapshot != null) {
- scan.withSnapshot(latestSnapshot)
- .withPartitionFilter(partitionFilter)
- .withKind(ScanMode.ALL);
- if (numBucket != BucketMode.POSTPONE_BUCKET) {
- // bucket = -2 can only be overwritten in postpone bucket
tables
- scan.withBucketFilter(bucket -> bucket >= 0);
- }
- List<ManifestEntry> currentEntries = scan.plan().files();
- for (ManifestEntry entry : currentEntries) {
- changesWithOverwrite.add(
- ManifestEntry.create(
- FileKind.DELETE,
- entry.partition(),
- entry.bucket(),
- entry.totalBuckets(),
- entry.file()));
- }
-
- // collect index files
- if (latestSnapshot.indexManifest() != null) {
- List<IndexManifestEntry> entries =
- indexManifestFile.read(latestSnapshot.indexManifest());
- for (IndexManifestEntry entry : entries) {
- if (partitionFilter == null ||
partitionFilter.test(entry.partition())) {
- indexChangesWithOverwrite.add(entry.toDeleteEntry());
- }
- }
- }
- }
- changesWithOverwrite.addAll(changes);
- indexChangesWithOverwrite.addAll(indexFiles);
- return new CommitChanges(changesWithOverwrite, emptyList(),
indexChangesWithOverwrite);
+ return new OverwriteChangesProvider(
+ scanSupplier,
+ snapshotManager,
+ indexManifestFile,
+ dropStats,
+ numBucket,
+ changes,
+ indexFiles,
+ partitionFilter);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
new file mode 100644
index 0000000000..be8a4068be
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/OverwriteChangesProvider.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * A stateful {@link CommitChangesProvider} for overwrite operations that
caches the current files
+ * of the target partitions across commit retries to avoid repeated full scans.
+ *
+ * <p>On retry, if the latest snapshot has changed, the cache is reused only
when the snapshots in
+ * between have not touched the target partitions; otherwise it is rebuilt by
a full scan.
+ */
+public class OverwriteChangesProvider implements CommitChangesProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OverwriteChangesProvider.class);
+
+ private final Supplier<FileStoreScan> scanSupplier;
+ private final SnapshotManager snapshotManager;
+ private final IndexManifestFile indexManifestFile;
+ private final boolean dropStats;
+ private final int numBucket;
+ private final List<ManifestEntry> newChanges;
+ private final List<IndexManifestEntry> newIndexFiles;
+ @Nullable private final PartitionPredicate partitionFilter;
+
+ @Nullable private Snapshot cachedSnapshot;
+ private final List<ManifestEntry> cachedManifestEntries = new
ArrayList<>();
+ private final List<IndexManifestEntry> cachedIndexManifestEntries = new
ArrayList<>();
+
+ @VisibleForTesting int fullScanManifestCount;
+ @VisibleForTesting int fullScanIndexCount;
+ @VisibleForTesting int deltaProbeCount;
+
+ public OverwriteChangesProvider(
+ Supplier<FileStoreScan> scanSupplier,
+ SnapshotManager snapshotManager,
+ IndexManifestFile indexManifestFile,
+ boolean dropStats,
+ int numBucket,
+ List<ManifestEntry> newChanges,
+ List<IndexManifestEntry> newIndexFiles,
+ @Nullable PartitionPredicate partitionFilter) {
+ this.scanSupplier = scanSupplier;
+ this.snapshotManager = snapshotManager;
+ this.indexManifestFile = indexManifestFile;
+ this.dropStats = dropStats;
+ this.numBucket = numBucket;
+ this.newChanges = newChanges;
+ this.newIndexFiles = newIndexFiles;
+ this.partitionFilter = partitionFilter;
+ }
+
+ @Override
+ public CommitChanges provide(@Nullable Snapshot latestSnapshot) {
+ if (latestSnapshot == null) {
+ return new CommitChanges(newChanges, Collections.emptyList(),
newIndexFiles);
+ }
+
+ if (cachedSnapshot == null) {
+ fullScanManifestEntries(latestSnapshot);
+ fullScanIndexManifestEntries(latestSnapshot);
+ cachedSnapshot = latestSnapshot;
+ } else {
+ if (cachedSnapshot.id() > latestSnapshot.id()) {
+ throw new IllegalStateException(
+ "Cached snapshot id "
+ + cachedSnapshot.id()
+ + " is greater than latest snapshot id "
+ + latestSnapshot.id());
+ }
+
+ if (cachedSnapshot.id() < latestSnapshot.id()) {
+ updateCache(latestSnapshot);
+ }
+ }
+
+ return buildResult();
+ }
+
+ private void fullScanManifestEntries(Snapshot latestSnapshot) {
+ cachedManifestEntries.clear();
+ FileStoreScan scan =
+ newScan()
+ .withSnapshot(latestSnapshot)
+ .withPartitionFilter(partitionFilter)
+ .withKind(ScanMode.ALL);
+ cachedManifestEntries.addAll(scan.plan().files());
+ fullScanManifestCount++;
+ }
+
+ private void fullScanIndexManifestEntries(Snapshot latestSnapshot) {
+ cachedIndexManifestEntries.clear();
+ if (latestSnapshot.indexManifest() != null) {
+ for (IndexManifestEntry entry :
+ indexManifestFile.read(latestSnapshot.indexManifest())) {
+ if (partitionFilter == null ||
partitionFilter.test(entry.partition())) {
+ cachedIndexManifestEntries.add(entry);
+ }
+ }
+ }
+ fullScanIndexCount++;
+ }
+
+ private void updateCache(Snapshot latestSnapshot) {
+ if (!canUseManifestEntriesCache(latestSnapshot)) {
+ fullScanManifestEntries(latestSnapshot);
+ }
+ if (!canUseIndexManifestEntriesCache(latestSnapshot)) {
+ fullScanIndexManifestEntries(latestSnapshot);
+ }
+ cachedSnapshot = latestSnapshot;
+ }
+
+ private boolean canUseManifestEntriesCache(Snapshot latestSnapshot) {
+ if (partitionFilter == null) {
+ // If overwrite the whole table, any commit between snapshots must
touch the target,
+ // skip the delta probe and force a full scan
+ return false;
+ }
+ for (long id = cachedSnapshot.id() + 1; id <= latestSnapshot.id();
id++) {
+ deltaProbeCount++;
+ try {
+ Snapshot snapshot = snapshotManager.tryGetSnapshot(id);
+ if (snapshot.commitKind() != CommitKind.APPEND) {
+ // Only APPEND snapshots produce a reliable DELTA manifest
for probing. For
+ // example, OVERWRITE snapshot might rewrite or reorganize
manifests in ways
+ // that make the DELTA probe unreliable.
+ return false;
+ }
+ FileStoreScan scan =
+ newScan()
+ .withSnapshot(snapshot)
+ .withPartitionFilter(partitionFilter)
+ .withKind(ScanMode.DELTA);
+ Iterator<ManifestEntry> iterator = scan.readFileIterator();
+ if (iterator.hasNext()) {
+ LOG.info(
+ "Cannot advance cache from {} to {} because
snapshot {} has changed target partitions.",
+ cachedSnapshot.id(),
+ latestSnapshot.id(),
+ id);
+ return false;
+ }
+ } catch (Exception e) {
+ // For example, the snapshot is being expired. Using full scan
is safe.
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean canUseIndexManifestEntriesCache(Snapshot latestSnapshot) {
+ return Objects.equals(cachedSnapshot.indexManifest(),
latestSnapshot.indexManifest());
+ }
+
+ private FileStoreScan newScan() {
+ FileStoreScan scan = scanSupplier.get();
+ if (dropStats) {
+ scan.dropStats();
+ }
+ if (numBucket != BucketMode.POSTPONE_BUCKET) {
+ scan.withBucketFilter(bucket -> bucket >= 0);
+ }
+ return scan;
+ }
+
+ private CommitChanges buildResult() {
+ List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+ List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
+
+ for (ManifestEntry entry : cachedManifestEntries) {
+ changesWithOverwrite.add(
+ ManifestEntry.create(
+ FileKind.DELETE,
+ entry.partition(),
+ entry.bucket(),
+ entry.totalBuckets(),
+ entry.file()));
+ }
+ for (IndexManifestEntry entry : cachedIndexManifestEntries) {
+ indexChangesWithOverwrite.add(entry.toDeleteEntry());
+ }
+
+ changesWithOverwrite.addAll(newChanges);
+ indexChangesWithOverwrite.addAll(newIndexFiles);
+
+ return new CommitChanges(
+ changesWithOverwrite, Collections.emptyList(),
indexChangesWithOverwrite);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
new file mode 100644
index 0000000000..7aa8a16023
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/OverwriteChangesProviderTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link OverwriteChangesProvider}. */
+public class OverwriteChangesProviderTest {
+
+ private TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeEach
+ public void beforeEach() {
+ gen = new TestKeyValueGenerator();
+ }
+
+ @Test
+ public void testManifestEntriesCache() throws Exception {
+ TestFileStore store = createStore(1, Collections.emptyMap());
+ KeyValue targetRecord = record("20260501", 8);
+ BinaryRow targetPartition = gen.getPartition(targetRecord);
+ store.commitData(
+ Arrays.asList(targetRecord, record("20260502", 8)),
gen::getPartition, kv -> 0);
+
+ Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+ PartitionPredicate partitionFilter = partitionFilter(store,
targetPartition);
+ OverwriteChangesProvider provider =
+ provider(store, partitionFilter, Collections.emptyList());
+
+ CommitChanges first = provider.provide(snapshot1);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+ assertThat(provider.deltaProbeCount).isZero();
+
+ store.commitData(
+ Collections.singletonList(record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+ CommitChanges afterUnrelatedAppend = provider.provide(snapshot2);
+
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+ assertThat(provider.deltaProbeCount).isEqualTo(1);
+ assertThat(identifiers(afterUnrelatedAppend.tableFiles))
+
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+
+ store.commitData(
+ Collections.singletonList(record("20260501", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+ CommitChanges afterTargetAppend = provider.provide(snapshot3);
+
+ assertThat(provider.fullScanManifestCount).isEqualTo(2);
+ assertThat(provider.deltaProbeCount).isEqualTo(2);
+ assertThat(identifiers(afterTargetAppend.tableFiles))
+ .containsExactlyInAnyOrderElementsOf(
+ currentIdentifiers(store, snapshot3, partitionFilter));
+ assertThat(afterTargetAppend.tableFiles).allMatch(entry ->
entry.kind() == FileKind.DELETE);
+ }
+
+ @Test
+ public void testIndexManifestEntriesCache() throws Exception {
+ TestFileStore store = createStore(2, Collections.emptyMap());
+ IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+ KeyValue targetRecord = record("20260501", 8);
+ BinaryRow targetPartition = gen.getPartition(targetRecord);
+ store.commitDataIndex(
+ targetRecord,
+ gen::getPartition,
+ 0,
+ indexFileHandler.hashIndex(targetPartition, 0).write(new int[]
{1, 2, 3}));
+ Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+ PartitionPredicate partitionFilter = partitionFilter(store,
targetPartition);
+ OverwriteChangesProvider provider =
+ provider(store, partitionFilter, Collections.emptyList());
+
+ CommitChanges first = provider.provide(snapshot1);
+ assertThat(provider.fullScanIndexCount).isEqualTo(1);
+ assertThat(first.indexFiles).hasSize(1);
+ assertThat(first.indexFiles.get(0).kind()).isEqualTo(FileKind.DELETE);
+
assertThat(first.indexFiles.get(0).partition()).isEqualTo(targetPartition);
+
+ KeyValue unrelatedRecordWithIndex = record("20260502", 8);
+ BinaryRow unrelatedPartition =
gen.getPartition(unrelatedRecordWithIndex);
+ store.commitDataIndex(
+ unrelatedRecordWithIndex,
+ gen::getPartition,
+ 0,
+ indexFileHandler.hashIndex(unrelatedPartition, 0).write(new
int[] {4, 5, 6}));
+ Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+
+
assertThat(snapshot2.indexManifest()).isNotEqualTo(snapshot1.indexManifest());
+ CommitChanges afterUnrelatedIndexChange = provider.provide(snapshot2);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+ assertThat(provider.deltaProbeCount).isEqualTo(1);
+ assertThat(provider.fullScanIndexCount).isEqualTo(2);
+ assertThat(afterUnrelatedIndexChange.indexFiles)
+ .containsExactlyInAnyOrderElementsOf(first.indexFiles);
+
+ store.commitData(
+ Collections.singletonList(record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+
+
assertThat(snapshot3.indexManifest()).isEqualTo(snapshot2.indexManifest());
+ CommitChanges afterUnrelatedDataChange = provider.provide(snapshot3);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+ assertThat(provider.deltaProbeCount).isEqualTo(2);
+ assertThat(provider.fullScanIndexCount).isEqualTo(2);
+ assertThat(afterUnrelatedDataChange.indexFiles)
+ .containsExactlyInAnyOrderElementsOf(first.indexFiles);
+ }
+
+ @Test
+ public void testFullTableOverwriteSkipsDeltaProbe() throws Exception {
+ TestFileStore store = createStore(1, Collections.emptyMap());
+ store.commitData(
+ Collections.singletonList(record("20260501", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+ // partitionFilter == null means overwrite the whole table
+ OverwriteChangesProvider provider = provider(store, null,
Collections.emptyList());
+ provider.provide(snapshot1);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+ assertThat(provider.deltaProbeCount).isZero();
+
+ store.commitData(
+ Collections.singletonList(record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+ provider.provide(snapshot2);
+
+ // partitionFilter == null short-circuits delta probe and forces
fullScan
+ assertThat(provider.fullScanManifestCount).isEqualTo(2);
+ assertThat(provider.deltaProbeCount).isZero();
+ }
+
+ @Test
+ public void testExpiredSnapshotFallbacksToFullScan() throws Exception {
+ TestFileStore store = createStore(1, Collections.emptyMap());
+ KeyValue targetRecord = record("20260501", 8);
+ BinaryRow targetPartition = gen.getPartition(targetRecord);
+ store.commitData(
+ Arrays.asList(targetRecord, record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+ PartitionPredicate partitionFilter = partitionFilter(store,
targetPartition);
+ OverwriteChangesProvider provider =
+ provider(store, partitionFilter, Collections.emptyList());
+ CommitChanges first = provider.provide(snapshot1);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+
+ store.commitData(
+ Collections.singletonList(record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+ store.commitData(
+ Collections.singletonList(record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+
+ // Simulate snapshot expiration of the middle snapshot.
+ store.snapshotManager().deleteSnapshot(snapshot2.id());
+
+ CommitChanges afterExpired = provider.provide(snapshot3);
+
+ // Missing middle snapshot should not fail the commit; fall back to a
full scan.
+ assertThat(provider.fullScanManifestCount).isEqualTo(2);
+ assertThat(identifiers(afterExpired.tableFiles))
+
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+ }
+
+ @Test
+ public void testNonAppendSnapshotInvalidatesCache() throws Exception {
+ TestFileStore store = createStore(1, Collections.emptyMap());
+ KeyValue targetRecord = record("20260501", 8);
+ BinaryRow targetPartition = gen.getPartition(targetRecord);
+ store.commitData(
+ Arrays.asList(targetRecord, record("20260502", 8)),
gen::getPartition, kv -> 0);
+ Snapshot snapshot1 = store.snapshotManager().latestSnapshot();
+
+ PartitionPredicate partitionFilter = partitionFilter(store,
targetPartition);
+ OverwriteChangesProvider provider =
+ provider(store, partitionFilter, Collections.emptyList());
+ CommitChanges first = provider.provide(snapshot1);
+ assertThat(provider.fullScanManifestCount).isEqualTo(1);
+
+ // Overwrite an unrelated partition — commitKind != APPEND must
invalidate the cache.
+ Map<String, String> unrelated = new HashMap<>();
+ unrelated.put("dt", "20260502");
+ unrelated.put("hr", "8");
+ store.overwriteData(
+ Collections.singletonList(record("20260502", 8)),
+ gen::getPartition,
+ kv -> 0,
+ unrelated);
+ Snapshot snapshot2 = store.snapshotManager().latestSnapshot();
+ CommitChanges afterOverwrite = provider.provide(snapshot2);
+ assertThat(provider.fullScanManifestCount).isEqualTo(2);
+ assertThat(provider.deltaProbeCount).isEqualTo(1);
+ assertThat(identifiers(afterOverwrite.tableFiles))
+
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+
+ // COMPACT snapshot also invalidates the cache.
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ commit.compactManifest();
+ }
+ Snapshot snapshot3 = store.snapshotManager().latestSnapshot();
+ CommitChanges afterCompact = provider.provide(snapshot3);
+ assertThat(provider.fullScanManifestCount).isEqualTo(3);
+ assertThat(provider.deltaProbeCount).isEqualTo(2);
+ assertThat(identifiers(afterCompact.tableFiles))
+
.containsExactlyInAnyOrderElementsOf(identifiers(first.tableFiles));
+ }
+
+ private OverwriteChangesProvider provider(
+ TestFileStore store,
+ PartitionPredicate partitionFilter,
+ List<ManifestEntry> newChanges) {
+ return new OverwriteChangesProvider(
+ store::newScan,
+ store.snapshotManager(),
+ store.indexManifestFileFactory().create(),
+ store.options().manifestDeleteFileDropStats(),
+ store.options().bucket(),
+ newChanges,
+ Collections.emptyList(),
+ partitionFilter);
+ }
+
+ private PartitionPredicate partitionFilter(TestFileStore store, BinaryRow
partition) {
+ return PartitionPredicate.fromMultiple(
+ store.partitionType(), Collections.singletonList(partition));
+ }
+
+ private List<FileEntry.Identifier> currentIdentifiers(
+ TestFileStore store, Snapshot snapshot, PartitionPredicate
partitionFilter) {
+ return identifiers(
+ store.newScan()
+ .withSnapshot(snapshot)
+ .withPartitionFilter(partitionFilter)
+ .plan()
+ .files());
+ }
+
+ private List<FileEntry.Identifier> identifiers(List<ManifestEntry>
entries) {
+ return
entries.stream().map(ManifestEntry::identifier).collect(Collectors.toList());
+ }
+
+ private KeyValue record(String dt, int hr) {
+ return gen.nextPartitionedData(RowKind.INSERT, dt, hr);
+ }
+
+ private TestFileStore createStore(int numBucket, Map<String, String>
options) throws Exception {
+ Path path = new Path(tempDir.toUri());
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(new LocalFileIO(), path),
+ new Schema(
+
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+
TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ options,
+ null));
+ return new TestFileStore.Builder(
+ "avro",
+ TraceableFileIO.SCHEME + "://" + tempDir,
+ numBucket,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ tableSchema)
+ .changelogProducer(CoreOptions.ChangelogProducer.NONE)
+ .build();
+ }
+}