This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ada9fa288 StrictModeChecker don't throw exception if the partitions 
of checked snapshots don't overlapped (#7799)
4ada9fa288 is described below

commit 4ada9fa28815b497b7a35533cc17b308017456bc
Author: yuzelin <[email protected]>
AuthorDate: Mon May 11 13:44:43 2026 +0800

    StrictModeChecker don't throw exception if the partitions of checked 
snapshots don't overlapped (#7799)
---
 .../paimon/operation/FileStoreCommitImpl.java      |  13 +-
 .../paimon/operation/commit/StrictModeChecker.java |  76 +++--
 .../apache/paimon/table/sink/TableCommitTest.java  | 344 +++++++++++++++++++--
 3 files changed, 384 insertions(+), 49 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 2e931d16cf..102100b9fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -209,10 +209,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         .map(
                                 id ->
                                         new StrictModeChecker(
-                                                snapshotManager,
-                                                commitUser,
-                                                scanSupplier.get(),
-                                                id))
+                                                snapshotManager, commitUser, 
scanSupplier, id))
                         .orElse(null);
         this.conflictDetection = conflictDetectFactory.create(scanner);
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
@@ -816,8 +813,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             }
         }
 
+        List<BinaryRow> changedPartitions = null;
         if (strictModeChecker != null) {
-            strictModeChecker.check(newSnapshotId, commitKind);
+            changedPartitions = changedPartitions(deltaFiles, indexFiles);
+            strictModeChecker.check(newSnapshotId, commitKind, 
changedPartitions);
             strictModeChecker.update(newSnapshotId - 1);
         }
 
@@ -838,7 +837,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
-            List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, 
indexFiles);
+            if (changedPartitions == null) {
+                changedPartitions = changedPartitions(deltaFiles, indexFiles);
+            }
             CommitFailRetryResult commitFailRetry =
                     retryResult instanceof CommitFailRetryResult
                             ? (CommitFailRetryResult) retryResult
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index b0084b70be..2fd9c85ab2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -21,34 +21,41 @@ package org.apache.paimon.operation.commit;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
 
 /** A checker to check strict mode based on last safe snapshot. */
 public class StrictModeChecker {
 
     private final SnapshotManager snapshotManager;
     private final String commitUser;
-    private final FileStoreScan scan;
+    private final Supplier<FileStoreScan> scanSupplier;
 
     private long strictModeLastSafeSnapshot;
 
     public StrictModeChecker(
             SnapshotManager snapshotManager,
             String commitUser,
-            FileStoreScan scan,
+            Supplier<FileStoreScan> scanSupplier,
             long strictModeLastSafeSnapshot) {
         this.snapshotManager = snapshotManager;
         this.commitUser = commitUser;
-        this.scan = scan;
+        this.scanSupplier = scanSupplier;
         this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
     }
 
-    public void check(long newSnapshotId, CommitKind newCommitKind) {
+    public void check(
+            long newSnapshotId, CommitKind newCommitKind, List<BinaryRow> 
newChangedPartitions) {
+        Set<BinaryRow> newPartitions = new HashSet<>(newChangedPartitions);
         for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
             Snapshot snapshot = snapshotManager.snapshot(id);
             if (snapshot.commitUser().equals(commitUser)) {
@@ -56,32 +63,37 @@ public class StrictModeChecker {
             }
             if (snapshot.commitKind() == CommitKind.COMPACT
                     || snapshot.commitKind() == CommitKind.OVERWRITE) {
-                throw new RuntimeException(
-                        String.format(
-                                "When trying to commit snapshot %d, "
-                                        + "commit user %s has found a %s 
snapshot (id: %d) by another user %s. "
-                                        + "Giving up committing as %s is set.",
-                                newSnapshotId,
-                                commitUser,
-                                snapshot.commitKind().name(),
-                                id,
-                                snapshot.commitUser(),
-                                
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+                if (hasOverlappedPartition(snapshot, newPartitions)) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "When trying to commit snapshot %d, "
+                                            + "commit user %s has found a %s 
snapshot (id: %d) by another user %s "
+                                            + "which modified the same 
partition. Giving up committing as %s is set.",
+                                    newSnapshotId,
+                                    commitUser,
+                                    snapshot.commitKind().name(),
+                                    id,
+                                    snapshot.commitUser(),
+                                    
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+                }
             }
             if (snapshot.commitKind() == CommitKind.APPEND
                     && newCommitKind == CommitKind.OVERWRITE) {
                 Iterator<ManifestEntry> entries =
-                        scan.withSnapshot(snapshot)
+                        scanSupplier
+                                .get()
+                                .withSnapshot(snapshot)
                                 .withKind(ScanMode.DELTA)
                                 .onlyReadRealBuckets()
                                 .dropStats()
                                 .readFileIterator();
-                if (entries.hasNext()) {
+                if (hasOverlappedPartition(entries, newPartitions)) {
                     throw new RuntimeException(
                             String.format(
                                     "When trying to commit snapshot %d, "
                                             + "commit user %s has found a 
APPEND snapshot (id: %d) by another user %s "
-                                            + "which committed files to fixed 
bucket. Giving up committing as %s is set.",
+                                            + "which committed files to fixed 
bucket on the same partition. "
+                                            + "Giving up committing as %s is 
set.",
                                     newSnapshotId,
                                     commitUser,
                                     id,
@@ -92,6 +104,34 @@ public class StrictModeChecker {
         }
     }
 
+    private boolean hasOverlappedPartition(Snapshot snapshot, Set<BinaryRow> 
newPartitions) {
+        if (newPartitions.isEmpty()) {
+            return false;
+        }
+        Iterator<ManifestEntry> entries =
+                scanSupplier
+                        .get()
+                        .withSnapshot(snapshot)
+                        .withKind(ScanMode.DELTA)
+                        .dropStats()
+                        .readFileIterator();
+        return hasOverlappedPartition(entries, newPartitions);
+    }
+
+    private boolean hasOverlappedPartition(
+            Iterator<ManifestEntry> entries, Set<BinaryRow> newPartitions) {
+        if (newPartitions.isEmpty()) {
+            return false;
+        }
+        while (entries.hasNext()) {
+            ManifestEntry entry = entries.next();
+            if (newPartitions.contains(entry.partition())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public void update(long newSafeSnapshot) {
         strictModeLastSafeSnapshot = newSafeSnapshot;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 3ccf99476a..a04f82b53e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -388,8 +389,8 @@ public class TableCommitTest {
         String path = tempDir.toString();
         RowType rowType =
                 RowType.of(
-                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
-                        new String[] {"k", "v"});
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
 
         Options options = new Options();
         options.set(CoreOptions.PATH, path);
@@ -400,8 +401,8 @@ public class TableCommitTest {
                         new SchemaManager(LocalFileIO.create(), new 
Path(path)),
                         new Schema(
                                 rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.singletonList("k"),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
                                 options.toMap(),
                                 ""));
         FileStoreTable table =
@@ -410,35 +411,131 @@ public class TableCommitTest {
                         new Path(path),
                         tableSchema,
                         CatalogEnvironment.empty());
+        BinaryRow pt1 = partitionRow(1);
+        BinaryRow pt2 = partitionRow(2);
+
         String user1 = UUID.randomUUID().toString();
         TableWriteImpl<?> write1 = table.newWrite(user1);
         TableCommitImpl commit1 = table.newCommit(user1);
 
-        write1.write(GenericRow.of(0, 0L));
-        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        write1.write(GenericRow.of(1, 0, 0L));
+        write1.compact(pt1, 0, true);
         commit1.commit(1, write1.prepareCommit(true, 1));
 
         // test skip this commit check
-
         String user2 = UUID.randomUUID().toString();
         table = 
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
         TableWriteImpl<?> write2 = table.newWrite(user2);
         TableCommitImpl commit2 = table.newCommit(user2);
 
-        write2.write(GenericRow.of(1, 1L));
+        write2.write(GenericRow.of(2, 1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
-        // COMPACT commit should be checked
+        // COMPACT on a different partition should be ignored
+        write1.write(GenericRow.of(1, 4, 4L));
+        write1.compact(pt1, 0, true);
+        commit1.commit(2, write1.prepareCommit(true, 2));
 
-        write1.write(GenericRow.of(4, 4L));
-        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        write2.write(GenericRow.of(2, 5, 5L));
+        assertThatCode(() -> commit2.commit(2, write2.prepareCommit(false, 2)))
+                .doesNotThrowAnyException();
+
+        // COMPACT on the same partition should be checked and fail
+        write1.write(GenericRow.of(2, 6, 6L));
+        write1.compact(pt2, 0, true);
         commit1.commit(3, write1.prepareCommit(true, 3));
 
-        write2.write(GenericRow.of(5, 5L));
+        write2.write(GenericRow.of(2, 7, 7L));
         assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false, 
3)))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessageContaining(
                         "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write1.close();
+        commit1.close();
+        write2.close();
+        commit2.close();
+    }
+
+    @Test
+    public void testStrictModeForOverwrite() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        BinaryRow pt1 = partitionRow(1);
+        BinaryRow pt2 = partitionRow(2);
+
+        // user1 writes pt=1 and pt=2
+        String user1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write1 = table.newWrite(user1);
+        TableCommitImpl commit1 = table.newCommit(user1);
+        write1.write(GenericRow.of(1, 0, 0L));
+        write1.write(GenericRow.of(2, 0, 0L));
+        commit1.commit(1, write1.prepareCommit(false, 1));
+
+        // test skip this commit check
+        String user2 = UUID.randomUUID().toString();
+        FileStoreTable tableWithStrict =
+                
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+        TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+
+        write2.write(GenericRow.of(1, 1, 1L));
+        write2.compact(pt1, 0, true);
+        commit2.commit(1, write2.prepareCommit(true, 1));
+
+        // user1 OVERWRITE on pt=1, snapshot 3
+        TableCommitImpl commit1Ow1 = 
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+        write1.write(GenericRow.of(1, 4, 4L));
+        commit1Ow1.commit(2, write1.prepareCommit(false, 2));
+
+        // user2 COMPACT on pt=2: OVERWRITE pt=1 does not overlap so no error
+        write2.write(GenericRow.of(2, 5, 5L));
+        write2.compact(pt2, 0, true);
+        assertThatCode(() -> commit2.commit(2, write2.prepareCommit(true, 2)))
+                .doesNotThrowAnyException();
+
+        // user1 OVERWRITE on pt=1 again, snapshot 5
+        TableCommitImpl commit1Ow2 = 
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+        write1.write(GenericRow.of(1, 6, 6L));
+        commit1Ow2.commit(3, write1.prepareCommit(false, 3));
+
+        // user2 COMPACT on pt=1: OVERWRITE pt=1 overlaps, should throw
+        write2.write(GenericRow.of(1, 7, 7L));
+        write2.compact(pt1, 0, true);
+        assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(true, 
3)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write1.close();
+        commit1.close();
+        commit1Ow1.close();
+        commit1Ow2.close();
+        write2.close();
+        commit2.close();
     }
 
     @Test
@@ -446,8 +543,8 @@ public class TableCommitTest {
         String path = tempDir.toString();
         RowType rowType =
                 RowType.of(
-                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
-                        new String[] {"k", "v"});
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
 
         Options options = new Options();
         options.set(CoreOptions.PATH, path);
@@ -458,8 +555,8 @@ public class TableCommitTest {
                         new SchemaManager(LocalFileIO.create(), new 
Path(path)),
                         new Schema(
                                 rowType.getFields(),
-                                Collections.emptyList(),
-                                Collections.singletonList("k"),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
                                 options.toMap(),
                                 ""));
         FileStoreTable table =
@@ -468,13 +565,15 @@ public class TableCommitTest {
                         new Path(path),
                         tableSchema,
                         CatalogEnvironment.empty());
+        BinaryRow pt1 = partitionRow(1);
+
         String user1 = UUID.randomUUID().toString();
         FileStoreTable fixedBucketWriteTable = table;
         TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
         TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
 
-        write1.write(GenericRow.of(0, 0L));
-        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        write1.write(GenericRow.of(1, 0, 0L));
+        write1.compact(pt1, 0, true);
         commit1.commit(1, write1.prepareCommit(true, 1));
 
         // test skip this commit check
@@ -482,9 +581,9 @@ public class TableCommitTest {
         String user2 = UUID.randomUUID().toString();
         table = 
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
         TableWriteImpl<?> write2 = table.newWrite(user2);
-        TableCommitImpl commit2 = 
table.newCommit(user2).withOverwrite(Collections.emptyMap());
+        TableCommitImpl commit2 = 
table.newCommit(user2).withOverwrite(singletonMap("pt", "1"));
 
-        write2.write(GenericRow.of(1, 1L));
+        write2.write(GenericRow.of(1, 1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
         // APPEND with postpone bucket files should be ignored
@@ -496,25 +595,133 @@ public class TableCommitTest {
         FileStoreTable postponeWriteTable = 
fixedBucketWriteTable.copy(postponeWriteOptions);
         write1 = postponeWriteTable.newWrite(user1);
         commit1 = postponeWriteTable.newCommit(user1);
-        write1.write(GenericRow.of(2, 2L));
+        write1.write(GenericRow.of(1, 2, 2L));
         commit1.commit(2, write1.prepareCommit(false, 2));
 
-        write2.write(GenericRow.of(3, 3L));
+        write2.write(GenericRow.of(1, 3, 3L));
         commit2.commit(2, write2.prepareCommit(false, 2));
 
-        // APPEND with fixed bucket files should be checked
+        // APPEND with fixed bucket files on a different partition should be 
ignored
         write1.close();
         commit1.close();
         write1 = fixedBucketWriteTable.newWrite(user1);
         commit1 = fixedBucketWriteTable.newCommit(user1);
-        write1.write(GenericRow.of(4, 4L));
+        write1.write(GenericRow.of(2, 4, 4L));
         commit1.commit(3, write1.prepareCommit(false, 3));
 
-        write2.write(GenericRow.of(5, 5L));
+        write2.write(GenericRow.of(1, 5, 5L));
+        assertThatCode(() -> commit2.commit(3, write2.prepareCommit(false, 3)))
+                .doesNotThrowAnyException();
+
+        // APPEND with fixed bucket files on the overwritten partition should 
be checked
+        write1.close();
+        commit1.close();
+        write1 = fixedBucketWriteTable.newWrite(user1);
+        commit1 = fixedBucketWriteTable.newCommit(user1);
+        write1.write(GenericRow.of(1, 6, 6L));
+        commit1.commit(4, write1.prepareCommit(false, 4));
+
+        write2.write(GenericRow.of(1, 7, 7L));
+        assertThatThrownBy(() -> commit2.commit(4, write2.prepareCommit(false, 
4)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write1.close();
+        commit1.close();
+        write2.close();
+        commit2.close();
+    }
+
+    private static BinaryRow partitionRow(int pt) {
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeInt(0, pt);
+        writer.complete();
+        return row;
+    }
+
+    @Test
+    public void testStrictModeScanStateNotLeakedAcrossSnapshots() throws 
Exception {
+        // Regression test: the FileStoreScan used by StrictModeChecker is 
mutable.
+        // When the APPEND branch (for an earlier APPEND snapshot) calls
+        // onlyReadRealBuckets(), the flag is latched on the shared scan 
instance and
+        // must not leak into the later COMPACT/OVERWRITE branch, otherwise 
entries
+        // whose bucket < 0 (e.g. postpone bucket) are silently filtered out 
and an
+        // overlapping OVERWRITE snapshot can be missed.
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable fixedTable =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        // Switch to postpone bucket so that APPEND/OVERWRITE files land in 
bucket=-2.
+        Map<String, String> postponeOptions = new HashMap<>();
+        postponeOptions.put(CoreOptions.BUCKET.key(), "-2");
+        
postponeOptions.put(CoreOptions.POSTPONE_BATCH_WRITE_FIXED_BUCKET.key(), 
"false");
+        FileStoreTable postponeTable = fixedTable.copy(postponeOptions);
+
+        String user1 = UUID.randomUUID().toString();
+
+        // snapshot 1: APPEND on pt=1 with postpone bucket (bucket=-2).
+        // When later checked in the APPEND branch, onlyReadRealBuckets() will 
filter
+        // these entries out, so this snapshot alone does not throw; but it 
DOES
+        // latch onlyReadRealBuckets=true onto the shared scan instance.
+        TableWriteImpl<?> write1 = postponeTable.newWrite(user1);
+        TableCommitImpl commit1 = postponeTable.newCommit(user1);
+        write1.write(GenericRow.of(1, 0, 0L));
+        commit1.commit(1, write1.prepareCommit(false, 1));
+        write1.close();
+        commit1.close();
+
+        // snapshot 2: OVERWRITE on pt=1, also writing postpone bucket files.
+        // The OVERWRITE branch should detect the partition overlap against a 
later
+        // user's OVERWRITE on pt=1.
+        write1 = postponeTable.newWrite(user1);
+        commit1 = 
postponeTable.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+        write1.write(GenericRow.of(1, 1, 1L));
+        commit1.commit(2, write1.prepareCommit(false, 2));
+        write1.close();
+        commit1.close();
+
+        // user2 commits OVERWRITE on pt=1 with strict mode enabled.
+        // Expected: throw because snapshot 2 (another user's OVERWRITE) 
touches pt=1.
+        // With the leaking onlyReadRealBuckets flag, snapshot 2's bucket=-2 
entries
+        // get filtered out and the conflict is silently missed.
+        String user2 = UUID.randomUUID().toString();
+        FileStoreTable tableWithStrict =
+                
fixedTable.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "0"));
+        TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+        TableCommitImpl commit2 =
+                
tableWithStrict.newCommit(user2).withOverwrite(singletonMap("pt", "1"));
+        write2.write(GenericRow.of(1, 2, 2L));
         assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false, 
3)))
                 .isInstanceOf(RuntimeException.class)
                 .hasMessageContaining(
                         "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write2.close();
+        commit2.close();
     }
 
     @Test
@@ -573,6 +780,93 @@ public class TableCommitTest {
         commit2.commit(2, write2.prepareCommit(false, 3));
     }
 
+    @Test
+    public void testStrictModeForNonPartitionedTable() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        String user1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write1 = table.newWrite(user1);
+        TableCommitImpl commit1 = table.newCommit(user1);
+        write1.write(GenericRow.of(0, 0L));
+        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit1.commit(1, write1.prepareCommit(true, 1));
+
+        // test skip this commit check
+        String user2 = UUID.randomUUID().toString();
+        FileStoreTable tableWithStrict =
+                
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+        TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+
+        write2.write(GenericRow.of(1, 1L));
+        commit2.commit(1, write2.prepareCommit(false, 1));
+
+        // For a non-partitioned table, any COMPACT snapshot from another user 
must be
+        // detected as a conflict since all entries share BinaryRow.EMPTY_ROW.
+        write1.write(GenericRow.of(2, 2L));
+        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit1.commit(2, write1.prepareCommit(true, 2));
+
+        write2.write(GenericRow.of(3, 3L));
+        assertThatThrownBy(() -> commit2.commit(2, write2.prepareCommit(false, 
2)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        // APPEND with fixed bucket files should be caught when user2 commits 
OVERWRITE,
+        // since non-partitioned table entries share BinaryRow.EMPTY_ROW so 
they always
+        // overlap with user2's OVERWRITE target.
+        TableWriteImpl<?> write1Append = table.newWrite(user1);
+        TableCommitImpl commit1Append = table.newCommit(user1);
+        write1Append.write(GenericRow.of(4, 4L));
+        commit1Append.commit(3, write1Append.prepareCommit(false, 3));
+
+        FileStoreTable tableWithStrict2 =
+                
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "3"));
+        TableWriteImpl<?> write2Ow = tableWithStrict2.newWrite(user2);
+        TableCommitImpl commit2Ow =
+                
tableWithStrict2.newCommit(user2).withOverwrite(Collections.emptyMap());
+        write2Ow.write(GenericRow.of(5, 5L));
+        assertThatThrownBy(() -> commit2Ow.commit(4, 
write2Ow.prepareCommit(false, 4)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+
+        write1Append.close();
+        commit1Append.close();
+        write2Ow.close();
+        commit2Ow.close();
+
+        write1.close();
+        commit1.close();
+        write2.close();
+        commit2.close();
+    }
+
     @Test
     public void testExpireForEmptyCommit() throws Exception {
         String path = tempDir.toString();

Reply via email to