This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ab05eba6f0 [core] StrictModeChecker should also check index (#7884)
ab05eba6f0 is described below

commit ab05eba6f0fd4b2a4696608bcb1be50ce201d633
Author: yuzelin <[email protected]>
AuthorDate: Mon May 18 18:58:31 2026 +0800

    [core] StrictModeChecker should also check index (#7884)
---
 .../paimon/operation/FileStoreCommitImpl.java      |   6 +-
 .../paimon/operation/commit/StrictModeChecker.java |  28 +++-
 .../apache/paimon/table/sink/TableCommitTest.java  | 185 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ffdfe8523d..29ac8b5a3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -210,7 +210,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         .map(
                                 id ->
                                         new StrictModeChecker(
-                                                snapshotManager, commitUser, 
scanSupplier, id))
+                                                snapshotManager,
+                                                commitUser,
+                                                scanSupplier,
+                                                indexManifestFile,
+                                                id))
                         .orElse(null);
         this.conflictDetection = conflictDetectFactory.create(scanner);
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index 2fd9c85ab2..fa2a972051 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -22,6 +22,8 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.ScanMode;
@@ -39,6 +41,7 @@ public class StrictModeChecker {
     private final SnapshotManager snapshotManager;
     private final String commitUser;
     private final Supplier<FileStoreScan> scanSupplier;
+    private final IndexManifestFile indexManifestFile;
 
     private long strictModeLastSafeSnapshot;
 
@@ -46,10 +49,12 @@ public class StrictModeChecker {
             SnapshotManager snapshotManager,
             String commitUser,
             Supplier<FileStoreScan> scanSupplier,
+            IndexManifestFile indexManifestFile,
             long strictModeLastSafeSnapshot) {
         this.snapshotManager = snapshotManager;
         this.commitUser = commitUser;
         this.scanSupplier = scanSupplier;
+        this.indexManifestFile = indexManifestFile;
         this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
     }
 
@@ -115,7 +120,28 @@ public class StrictModeChecker {
                         .withKind(ScanMode.DELTA)
                         .dropStats()
                         .readFileIterator();
-        return hasOverlappedPartition(entries, newPartitions);
+        if (hasOverlappedPartition(entries, newPartitions)) {
+            return true;
+        }
+
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            return false;
+        }
+        // Fast exit: if this snapshot's indexManifest file name equals the
+        // previous snapshot's, no index file was added/removed by this commit
+        // (writeIndexFiles reuses the previous file when newIndexFiles is 
empty).
+        long prevId = snapshot.id() - 1;
+        if (snapshotManager.snapshotExists(prevId)
+                && 
indexManifest.equals(snapshotManager.snapshot(prevId).indexManifest())) {
+            return false;
+        }
+        for (IndexManifestEntry entry : indexManifestFile.read(indexManifest)) 
{
+            if (newPartitions.contains(entry.partition())) {
+                return true;
+            }
+        }
+        return false;
     }
 
     private boolean hasOverlappedPartition(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index a04f82b53e..25abbaf12b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -22,8 +22,11 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
@@ -538,6 +541,188 @@ public class TableCommitTest {
         commit2.close();
     }
 
+    @Test
+    public void testStrictModeForDvOnlyOverwrite() throws Exception {
+        // Regression test for the partition-overlap check on DV-only OVERWRITE
+        // snapshots: such snapshots have no data-file delta, so the check must
+        // also look at index-manifest delta.
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.BUCKET_KEY, "k");
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        BinaryRow pt1 = partitionRow(1);
+        BinaryRow pt2 = partitionRow(2);
+
+        // user1 writes pt=1 and pt=2 -> snapshot 1 (APPEND)
+        String user1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write1 = table.newWrite(user1);
+        TableCommitImpl commit1 = table.newCommit(user1);
+        write1.write(GenericRow.of(1, 0, 0L));
+        write1.write(GenericRow.of(2, 0, 0L));
+        commit1.commit(1, write1.prepareCommit(false, 1));
+
+        // user2 with strict mode, last-safe-snapshot=2 (skip its own snapshot 
2)
+        String user2 = UUID.randomUUID().toString();
+        FileStoreTable tableWithStrict =
+                
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+        TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+        write2.write(GenericRow.of(1, 1, 1L));
+        commit2.commit(1, write2.prepareCommit(false, 1));
+
+        // user1 DV-only OVERWRITE on pt=2 -> snapshot 3 (OVERWRITE, no 
data-file delta)
+        commitDvOnly(table, user1, pt2, 2);
+
+        // user2 COMPACT on pt=1: DV-only on pt=2 does not overlap -> no error
+        write2.write(GenericRow.of(1, 5, 5L));
+        write2.compact(pt1, 0, true);
+        assertThatCode(() -> commit2.commit(2, write2.prepareCommit(true, 2)))
+                .doesNotThrowAnyException();
+
+        // user1 DV-only OVERWRITE on pt=1 -> snapshot 5 (OVERWRITE, no 
data-file delta)
+        commitDvOnly(table, user1, pt1, 3);
+
+        // user2 COMPACT on pt=1: DV-only on pt=1 overlaps -> must throw
+        write2.write(GenericRow.of(1, 7, 7L));
+        write2.compact(pt1, 0, true);
+        assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(true, 
3)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write1.close();
+        commit1.close();
+        write2.close();
+        commit2.close();
+    }
+
+    @Test
+    public void testStrictModeShortCircuitsWhenIndexManifestUnchanged() throws 
Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.BUCKET_KEY, "k");
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        BinaryRow pt3 = partitionRow(3);
+
+        // user1 writes pt=1 and pt=3 -> snapshot 1 (APPEND)
+        String user1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write1 = table.newWrite(user1);
+        TableCommitImpl commit1 = table.newCommit(user1);
+        write1.write(GenericRow.of(1, 0, 0L));
+        write1.write(GenericRow.of(3, 0, 0L));
+        commit1.commit(1, write1.prepareCommit(false, 1));
+
+        // user1 commits a DV on pt=3 -> snapshot 2 (OVERWRITE, dv-only).
+        // indexManifest now records pt=3.
+        commitDvOnly(table, user1, pt3, 2);
+
+        // user1 OVERWRITE on pt=1 -> snapshot 3 (OVERWRITE).
+        // pt=1 has no DV, so writeIndexFiles produces no new index file and
+        // snapshot 3 reuses snapshot 2's indexManifest file name.
+        TableCommitImpl commit1Ow = 
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+        write1.write(GenericRow.of(1, 9, 9L));
+        commit1Ow.commit(3, write1.prepareCommit(false, 3));
+
+        // user2 with last-safe=2 writes pt=3 -> APPEND.
+        // Snapshot 3 is OVERWRITE on pt=1, its data delta does not touch pt=3,
+        // and although its indexManifest still contains pt=3 (inherited from
+        // snapshot 2), the short-circuit must skip the full-set intersection
+        // and avoid throwing.
+        String user2 = UUID.randomUUID().toString();
+        FileStoreTable tableWithStrict =
+                
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+        TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+        write2.write(GenericRow.of(3, 1, 1L));
+        assertThatCode(() -> commit2.commit(1, write2.prepareCommit(false, 1)))
+                .doesNotThrowAnyException();
+
+        write1.close();
+        commit1.close();
+        commit1Ow.close();
+        write2.close();
+        commit2.close();
+    }
+
+    private void commitDvOnly(FileStoreTable table, String user, BinaryRow 
partition, long commitId)
+            throws Exception {
+        // Pick a real data file in the partition; ConflictDetection requires 
the
+        // DV to reference an existing data file name.
+        String dataFileName =
+                
table.store().newScan().withPartitionFilter(Collections.singletonList(partition))
+                        .plan().files().stream()
+                        .findFirst()
+                        .map(e -> e.file().fileName())
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "No data file in partition for 
DV commit"));
+        IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
+        BucketedDvMaintainer dvMaintainer =
+                BucketedDvMaintainer.factory(indexFileHandler)
+                        .create(partition, 0, Collections.emptyList());
+        dvMaintainer.notifyNewDeletion(dataFileName, 0);
+        IndexFileMeta dvIndex = dvMaintainer.writeDeletionVectorsIndex().get();
+        try (TableCommitImpl commit = table.newCommit(user)) {
+            commit.commit(
+                    commitId,
+                    Collections.singletonList(
+                            new CommitMessageImpl(
+                                    partition,
+                                    0,
+                                    1,
+                                    DataIncrement.indexIncrement(
+                                            
Collections.singletonList(dvIndex)),
+                                    CompactIncrement.emptyIncrement())));
+        }
+    }
+
     @Test
     public void testStrictModeForOverwriteCheckAppend() throws Exception {
         String path = tempDir.toString();

Reply via email to