This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ab05eba6f0 [core] StrictModeChecker should also check index (#7884)
ab05eba6f0 is described below
commit ab05eba6f0fd4b2a4696608bcb1be50ce201d633
Author: yuzelin <[email protected]>
AuthorDate: Mon May 18 18:58:31 2026 +0800
[core] StrictModeChecker should also check index (#7884)
---
.../paimon/operation/FileStoreCommitImpl.java | 6 +-
.../paimon/operation/commit/StrictModeChecker.java | 28 +++-
.../apache/paimon/table/sink/TableCommitTest.java | 185 +++++++++++++++++++++
3 files changed, 217 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ffdfe8523d..29ac8b5a3e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -210,7 +210,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.map(
id ->
new StrictModeChecker(
- snapshotManager, commitUser,
scanSupplier, id))
+ snapshotManager,
+ commitUser,
+ scanSupplier,
+ indexManifestFile,
+ id))
.orElse(null);
this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile,
indexManifestFile);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index 2fd9c85ab2..fa2a972051 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -22,6 +22,8 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.ScanMode;
@@ -39,6 +41,7 @@ public class StrictModeChecker {
private final SnapshotManager snapshotManager;
private final String commitUser;
private final Supplier<FileStoreScan> scanSupplier;
+ private final IndexManifestFile indexManifestFile;
private long strictModeLastSafeSnapshot;
@@ -46,10 +49,12 @@ public class StrictModeChecker {
SnapshotManager snapshotManager,
String commitUser,
Supplier<FileStoreScan> scanSupplier,
+ IndexManifestFile indexManifestFile,
long strictModeLastSafeSnapshot) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
this.scanSupplier = scanSupplier;
+ this.indexManifestFile = indexManifestFile;
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
}
@@ -115,7 +120,28 @@ public class StrictModeChecker {
.withKind(ScanMode.DELTA)
.dropStats()
.readFileIterator();
- return hasOverlappedPartition(entries, newPartitions);
+ if (hasOverlappedPartition(entries, newPartitions)) {
+ return true;
+ }
+
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null) {
+ return false;
+ }
+ // Fast exit: if this snapshot's indexManifest file name equals the
+ // previous snapshot's, no index file was added/removed by this commit
+ // (writeIndexFiles reuses the previous file when newIndexFiles is
empty).
+ long prevId = snapshot.id() - 1;
+ if (snapshotManager.snapshotExists(prevId)
+ &&
indexManifest.equals(snapshotManager.snapshot(prevId).indexManifest())) {
+ return false;
+ }
+ for (IndexManifestEntry entry : indexManifestFile.read(indexManifest))
{
+ if (newPartitions.contains(entry.partition())) {
+ return true;
+ }
+ }
+ return false;
}
private boolean hasOverlappedPartition(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index a04f82b53e..25abbaf12b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -22,8 +22,11 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
@@ -538,6 +541,188 @@ public class TableCommitTest {
commit2.close();
}
+ @Test
+ public void testStrictModeForDvOnlyOverwrite() throws Exception {
+ // Regression test for the partition-overlap check on DV-only OVERWRITE
+ // snapshots: such snapshots have no data-file delta, so the check must
+ // also look at index-manifest delta.
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.BUCKET_KEY, "k");
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ BinaryRow pt1 = partitionRow(1);
+ BinaryRow pt2 = partitionRow(2);
+
+ // user1 writes pt=1 and pt=2 -> snapshot 1 (APPEND)
+ String user1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write1 = table.newWrite(user1);
+ TableCommitImpl commit1 = table.newCommit(user1);
+ write1.write(GenericRow.of(1, 0, 0L));
+ write1.write(GenericRow.of(2, 0, 0L));
+ commit1.commit(1, write1.prepareCommit(false, 1));
+
+ // user2 with strict mode, last-safe-snapshot=2 (skip its own snapshot
2)
+ String user2 = UUID.randomUUID().toString();
+ FileStoreTable tableWithStrict =
+
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+ TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+ TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+ write2.write(GenericRow.of(1, 1, 1L));
+ commit2.commit(1, write2.prepareCommit(false, 1));
+
+ // user1 DV-only OVERWRITE on pt=2 -> snapshot 3 (OVERWRITE, no
data-file delta)
+ commitDvOnly(table, user1, pt2, 2);
+
+ // user2 COMPACT on pt=1: DV-only on pt=2 does not overlap -> no error
+ write2.write(GenericRow.of(1, 5, 5L));
+ write2.compact(pt1, 0, true);
+ assertThatCode(() -> commit2.commit(2, write2.prepareCommit(true, 2)))
+ .doesNotThrowAnyException();
+
+ // user1 DV-only OVERWRITE on pt=1 -> snapshot 5 (OVERWRITE, no
data-file delta)
+ commitDvOnly(table, user1, pt1, 3);
+
+ // user2 COMPACT on pt=1: DV-only on pt=1 overlaps -> must throw
+ write2.write(GenericRow.of(1, 7, 7L));
+ write2.compact(pt1, 0, true);
+ assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(true,
3)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write1.close();
+ commit1.close();
+ write2.close();
+ commit2.close();
+ }
+
+ @Test
+ public void testStrictModeShortCircuitsWhenIndexManifestUnchanged() throws
Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.BUCKET_KEY, "k");
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ options.set(CoreOptions.DELETION_VECTORS_ENABLED, true);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ BinaryRow pt3 = partitionRow(3);
+
+ // user1 writes pt=1 and pt=3 -> snapshot 1 (APPEND)
+ String user1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write1 = table.newWrite(user1);
+ TableCommitImpl commit1 = table.newCommit(user1);
+ write1.write(GenericRow.of(1, 0, 0L));
+ write1.write(GenericRow.of(3, 0, 0L));
+ commit1.commit(1, write1.prepareCommit(false, 1));
+
+ // user1 commits a DV on pt=3 -> snapshot 2 (OVERWRITE, dv-only).
+ // indexManifest now records pt=3.
+ commitDvOnly(table, user1, pt3, 2);
+
+ // user1 OVERWRITE on pt=1 -> snapshot 3 (OVERWRITE).
+ // pt=1 has no DV, so writeIndexFiles produces no new index file and
+ // snapshot 3 reuses snapshot 2's indexManifest file name.
+ TableCommitImpl commit1Ow =
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+ write1.write(GenericRow.of(1, 9, 9L));
+ commit1Ow.commit(3, write1.prepareCommit(false, 3));
+
+ // user2 with last-safe=2 writes pt=3 -> APPEND.
+ // Snapshot 3 is OVERWRITE on pt=1, its data delta does not touch pt=3,
+ // and although its indexManifest still contains pt=3 (inherited from
+ // snapshot 2), the short-circuit must skip the full-set intersection
+ // and avoid throwing.
+ String user2 = UUID.randomUUID().toString();
+ FileStoreTable tableWithStrict =
+
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+ TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+ TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+ write2.write(GenericRow.of(3, 1, 1L));
+ assertThatCode(() -> commit2.commit(1, write2.prepareCommit(false, 1)))
+ .doesNotThrowAnyException();
+
+ write1.close();
+ commit1.close();
+ commit1Ow.close();
+ write2.close();
+ commit2.close();
+ }
+
+ private void commitDvOnly(FileStoreTable table, String user, BinaryRow
partition, long commitId)
+ throws Exception {
+ // Pick a real data file in the partition; ConflictDetection requires
the
+ // DV to reference an existing data file name.
+ String dataFileName =
+
table.store().newScan().withPartitionFilter(Collections.singletonList(partition))
+ .plan().files().stream()
+ .findFirst()
+ .map(e -> e.file().fileName())
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "No data file in partition for
DV commit"));
+ IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
+ BucketedDvMaintainer dvMaintainer =
+ BucketedDvMaintainer.factory(indexFileHandler)
+ .create(partition, 0, Collections.emptyList());
+ dvMaintainer.notifyNewDeletion(dataFileName, 0);
+ IndexFileMeta dvIndex = dvMaintainer.writeDeletionVectorsIndex().get();
+ try (TableCommitImpl commit = table.newCommit(user)) {
+ commit.commit(
+ commitId,
+ Collections.singletonList(
+ new CommitMessageImpl(
+ partition,
+ 0,
+ 1,
+ DataIncrement.indexIncrement(
+
Collections.singletonList(dvIndex)),
+ CompactIncrement.emptyIncrement())));
+ }
+ }
+
@Test
public void testStrictModeForOverwriteCheckAppend() throws Exception {
String path = tempDir.toString();