This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4ada9fa288 StrictModeChecker don't throw exception if the partitions
of checked snapshots don't overlapped (#7799)
4ada9fa288 is described below
commit 4ada9fa28815b497b7a35533cc17b308017456bc
Author: yuzelin <[email protected]>
AuthorDate: Mon May 11 13:44:43 2026 +0800
StrictModeChecker don't throw exception if the partitions of checked
snapshots don't overlapped (#7799)
---
.../paimon/operation/FileStoreCommitImpl.java | 13 +-
.../paimon/operation/commit/StrictModeChecker.java | 76 +++--
.../apache/paimon/table/sink/TableCommitTest.java | 344 +++++++++++++++++++--
3 files changed, 384 insertions(+), 49 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 2e931d16cf..102100b9fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -209,10 +209,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.map(
id ->
new StrictModeChecker(
- snapshotManager,
- commitUser,
- scanSupplier.get(),
- id))
+ snapshotManager, commitUser,
scanSupplier, id))
.orElse(null);
this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile,
indexManifestFile);
@@ -816,8 +813,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ List<BinaryRow> changedPartitions = null;
if (strictModeChecker != null) {
- strictModeChecker.check(newSnapshotId, commitKind);
+ changedPartitions = changedPartitions(deltaFiles, indexFiles);
+ strictModeChecker.check(newSnapshotId, commitKind,
changedPartitions);
strictModeChecker.update(newSnapshotId - 1);
}
@@ -838,7 +837,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
- List<BinaryRow> changedPartitions = changedPartitions(deltaFiles,
indexFiles);
+ if (changedPartitions == null) {
+ changedPartitions = changedPartitions(deltaFiles, indexFiles);
+ }
CommitFailRetryResult commitFailRetry =
retryResult instanceof CommitFailRetryResult
? (CommitFailRetryResult) retryResult
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
index b0084b70be..2fd9c85ab2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -21,34 +21,41 @@ package org.apache.paimon.operation.commit;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
/** A checker to check strict mode based on last safe snapshot. */
public class StrictModeChecker {
private final SnapshotManager snapshotManager;
private final String commitUser;
- private final FileStoreScan scan;
+ private final Supplier<FileStoreScan> scanSupplier;
private long strictModeLastSafeSnapshot;
public StrictModeChecker(
SnapshotManager snapshotManager,
String commitUser,
- FileStoreScan scan,
+ Supplier<FileStoreScan> scanSupplier,
long strictModeLastSafeSnapshot) {
this.snapshotManager = snapshotManager;
this.commitUser = commitUser;
- this.scan = scan;
+ this.scanSupplier = scanSupplier;
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
}
- public void check(long newSnapshotId, CommitKind newCommitKind) {
+ public void check(
+ long newSnapshotId, CommitKind newCommitKind, List<BinaryRow>
newChangedPartitions) {
+ Set<BinaryRow> newPartitions = new HashSet<>(newChangedPartitions);
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.commitUser().equals(commitUser)) {
@@ -56,32 +63,37 @@ public class StrictModeChecker {
}
if (snapshot.commitKind() == CommitKind.COMPACT
|| snapshot.commitKind() == CommitKind.OVERWRITE) {
- throw new RuntimeException(
- String.format(
- "When trying to commit snapshot %d, "
- + "commit user %s has found a %s
snapshot (id: %d) by another user %s. "
- + "Giving up committing as %s is set.",
- newSnapshotId,
- commitUser,
- snapshot.commitKind().name(),
- id,
- snapshot.commitUser(),
-
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+ if (hasOverlappedPartition(snapshot, newPartitions)) {
+ throw new RuntimeException(
+ String.format(
+ "When trying to commit snapshot %d, "
+ + "commit user %s has found a %s
snapshot (id: %d) by another user %s "
+ + "which modified the same
partition. Giving up committing as %s is set.",
+ newSnapshotId,
+ commitUser,
+ snapshot.commitKind().name(),
+ id,
+ snapshot.commitUser(),
+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+ }
}
if (snapshot.commitKind() == CommitKind.APPEND
&& newCommitKind == CommitKind.OVERWRITE) {
Iterator<ManifestEntry> entries =
- scan.withSnapshot(snapshot)
+ scanSupplier
+ .get()
+ .withSnapshot(snapshot)
.withKind(ScanMode.DELTA)
.onlyReadRealBuckets()
.dropStats()
.readFileIterator();
- if (entries.hasNext()) {
+ if (hasOverlappedPartition(entries, newPartitions)) {
throw new RuntimeException(
String.format(
"When trying to commit snapshot %d, "
+ "commit user %s has found a
APPEND snapshot (id: %d) by another user %s "
- + "which committed files to fixed
bucket. Giving up committing as %s is set.",
+ + "which committed files to fixed
bucket on the same partition. "
+ + "Giving up committing as %s is
set.",
newSnapshotId,
commitUser,
id,
@@ -92,6 +104,34 @@ public class StrictModeChecker {
}
}
+ private boolean hasOverlappedPartition(Snapshot snapshot, Set<BinaryRow>
newPartitions) {
+ if (newPartitions.isEmpty()) {
+ return false;
+ }
+ Iterator<ManifestEntry> entries =
+ scanSupplier
+ .get()
+ .withSnapshot(snapshot)
+ .withKind(ScanMode.DELTA)
+ .dropStats()
+ .readFileIterator();
+ return hasOverlappedPartition(entries, newPartitions);
+ }
+
+ private boolean hasOverlappedPartition(
+ Iterator<ManifestEntry> entries, Set<BinaryRow> newPartitions) {
+ if (newPartitions.isEmpty()) {
+ return false;
+ }
+ while (entries.hasNext()) {
+ ManifestEntry entry = entries.next();
+ if (newPartitions.contains(entry.partition())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void update(long newSafeSnapshot) {
strictModeLastSafeSnapshot = newSafeSnapshot;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 3ccf99476a..a04f82b53e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -388,8 +389,8 @@ public class TableCommitTest {
String path = tempDir.toString();
RowType rowType =
RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
- new String[] {"k", "v"});
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
Options options = new Options();
options.set(CoreOptions.PATH, path);
@@ -400,8 +401,8 @@ public class TableCommitTest {
new SchemaManager(LocalFileIO.create(), new
Path(path)),
new Schema(
rowType.getFields(),
- Collections.emptyList(),
- Collections.singletonList("k"),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
options.toMap(),
""));
FileStoreTable table =
@@ -410,35 +411,131 @@ public class TableCommitTest {
new Path(path),
tableSchema,
CatalogEnvironment.empty());
+ BinaryRow pt1 = partitionRow(1);
+ BinaryRow pt2 = partitionRow(2);
+
String user1 = UUID.randomUUID().toString();
TableWriteImpl<?> write1 = table.newWrite(user1);
TableCommitImpl commit1 = table.newCommit(user1);
- write1.write(GenericRow.of(0, 0L));
- write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ write1.write(GenericRow.of(1, 0, 0L));
+ write1.compact(pt1, 0, true);
commit1.commit(1, write1.prepareCommit(true, 1));
// test skip this commit check
-
String user2 = UUID.randomUUID().toString();
table =
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
TableWriteImpl<?> write2 = table.newWrite(user2);
TableCommitImpl commit2 = table.newCommit(user2);
- write2.write(GenericRow.of(1, 1L));
+ write2.write(GenericRow.of(2, 1, 1L));
commit2.commit(1, write2.prepareCommit(false, 1));
- // COMPACT commit should be checked
+ // COMPACT on a different partition should be ignored
+ write1.write(GenericRow.of(1, 4, 4L));
+ write1.compact(pt1, 0, true);
+ commit1.commit(2, write1.prepareCommit(true, 2));
- write1.write(GenericRow.of(4, 4L));
- write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ write2.write(GenericRow.of(2, 5, 5L));
+ assertThatCode(() -> commit2.commit(2, write2.prepareCommit(false, 2)))
+ .doesNotThrowAnyException();
+
+ // COMPACT on the same partition should be checked and fail
+ write1.write(GenericRow.of(2, 6, 6L));
+ write1.compact(pt2, 0, true);
commit1.commit(3, write1.prepareCommit(true, 3));
- write2.write(GenericRow.of(5, 5L));
+ write2.write(GenericRow.of(2, 7, 7L));
assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false,
3)))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining(
"Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write1.close();
+ commit1.close();
+ write2.close();
+ commit2.close();
+ }
+
+ @Test
+ public void testStrictModeForOverwrite() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ BinaryRow pt1 = partitionRow(1);
+ BinaryRow pt2 = partitionRow(2);
+
+ // user1 writes pt=1 and pt=2
+ String user1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write1 = table.newWrite(user1);
+ TableCommitImpl commit1 = table.newCommit(user1);
+ write1.write(GenericRow.of(1, 0, 0L));
+ write1.write(GenericRow.of(2, 0, 0L));
+ commit1.commit(1, write1.prepareCommit(false, 1));
+
+ // test skip this commit check
+ String user2 = UUID.randomUUID().toString();
+ FileStoreTable tableWithStrict =
+
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+ TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+ TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+
+ write2.write(GenericRow.of(1, 1, 1L));
+ write2.compact(pt1, 0, true);
+ commit2.commit(1, write2.prepareCommit(true, 1));
+
+ // user1 OVERWRITE on pt=1, snapshot 3
+ TableCommitImpl commit1Ow1 =
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+ write1.write(GenericRow.of(1, 4, 4L));
+ commit1Ow1.commit(2, write1.prepareCommit(false, 2));
+
+ // user2 COMPACT on pt=2: OVERWRITE pt=1 does not overlap so no error
+ write2.write(GenericRow.of(2, 5, 5L));
+ write2.compact(pt2, 0, true);
+ assertThatCode(() -> commit2.commit(2, write2.prepareCommit(true, 2)))
+ .doesNotThrowAnyException();
+
+ // user1 OVERWRITE on pt=1 again, snapshot 5
+ TableCommitImpl commit1Ow2 =
table.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+ write1.write(GenericRow.of(1, 6, 6L));
+ commit1Ow2.commit(3, write1.prepareCommit(false, 3));
+
+ // user2 COMPACT on pt=1: OVERWRITE pt=1 overlaps, should throw
+ write2.write(GenericRow.of(1, 7, 7L));
+ write2.compact(pt1, 0, true);
+ assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(true,
3)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write1.close();
+ commit1.close();
+ commit1Ow1.close();
+ commit1Ow2.close();
+ write2.close();
+ commit2.close();
}
@Test
@@ -446,8 +543,8 @@ public class TableCommitTest {
String path = tempDir.toString();
RowType rowType =
RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
- new String[] {"k", "v"});
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
Options options = new Options();
options.set(CoreOptions.PATH, path);
@@ -458,8 +555,8 @@ public class TableCommitTest {
new SchemaManager(LocalFileIO.create(), new
Path(path)),
new Schema(
rowType.getFields(),
- Collections.emptyList(),
- Collections.singletonList("k"),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
options.toMap(),
""));
FileStoreTable table =
@@ -468,13 +565,15 @@ public class TableCommitTest {
new Path(path),
tableSchema,
CatalogEnvironment.empty());
+ BinaryRow pt1 = partitionRow(1);
+
String user1 = UUID.randomUUID().toString();
FileStoreTable fixedBucketWriteTable = table;
TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
- write1.write(GenericRow.of(0, 0L));
- write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ write1.write(GenericRow.of(1, 0, 0L));
+ write1.compact(pt1, 0, true);
commit1.commit(1, write1.prepareCommit(true, 1));
// test skip this commit check
@@ -482,9 +581,9 @@ public class TableCommitTest {
String user2 = UUID.randomUUID().toString();
table =
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
TableWriteImpl<?> write2 = table.newWrite(user2);
- TableCommitImpl commit2 =
table.newCommit(user2).withOverwrite(Collections.emptyMap());
+ TableCommitImpl commit2 =
table.newCommit(user2).withOverwrite(singletonMap("pt", "1"));
- write2.write(GenericRow.of(1, 1L));
+ write2.write(GenericRow.of(1, 1, 1L));
commit2.commit(1, write2.prepareCommit(false, 1));
// APPEND with postpone bucket files should be ignored
@@ -496,25 +595,133 @@ public class TableCommitTest {
FileStoreTable postponeWriteTable =
fixedBucketWriteTable.copy(postponeWriteOptions);
write1 = postponeWriteTable.newWrite(user1);
commit1 = postponeWriteTable.newCommit(user1);
- write1.write(GenericRow.of(2, 2L));
+ write1.write(GenericRow.of(1, 2, 2L));
commit1.commit(2, write1.prepareCommit(false, 2));
- write2.write(GenericRow.of(3, 3L));
+ write2.write(GenericRow.of(1, 3, 3L));
commit2.commit(2, write2.prepareCommit(false, 2));
- // APPEND with fixed bucket files should be checked
+ // APPEND with fixed bucket files on a different partition should be
ignored
write1.close();
commit1.close();
write1 = fixedBucketWriteTable.newWrite(user1);
commit1 = fixedBucketWriteTable.newCommit(user1);
- write1.write(GenericRow.of(4, 4L));
+ write1.write(GenericRow.of(2, 4, 4L));
commit1.commit(3, write1.prepareCommit(false, 3));
- write2.write(GenericRow.of(5, 5L));
+ write2.write(GenericRow.of(1, 5, 5L));
+ assertThatCode(() -> commit2.commit(3, write2.prepareCommit(false, 3)))
+ .doesNotThrowAnyException();
+
+ // APPEND with fixed bucket files on the overwritten partition should
be checked
+ write1.close();
+ commit1.close();
+ write1 = fixedBucketWriteTable.newWrite(user1);
+ commit1 = fixedBucketWriteTable.newCommit(user1);
+ write1.write(GenericRow.of(1, 6, 6L));
+ commit1.commit(4, write1.prepareCommit(false, 4));
+
+ write2.write(GenericRow.of(1, 7, 7L));
+ assertThatThrownBy(() -> commit2.commit(4, write2.prepareCommit(false,
4)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write1.close();
+ commit1.close();
+ write2.close();
+ commit2.close();
+ }
+
+ private static BinaryRow partitionRow(int pt) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, pt);
+ writer.complete();
+ return row;
+ }
+
+ @Test
+ public void testStrictModeScanStateNotLeakedAcrossSnapshots() throws
Exception {
+ // Regression test: the FileStoreScan used by StrictModeChecker is
mutable.
+ // When the APPEND branch (for an earlier APPEND snapshot) calls
+ // onlyReadRealBuckets(), the flag is latched on the shared scan
instance and
+ // must not leak into the later COMPACT/OVERWRITE branch, otherwise
entries
+ // whose bucket < 0 (e.g. postpone bucket) are silently filtered out
and an
+ // overlapping OVERWRITE snapshot can be missed.
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ options.toMap(),
+ ""));
+ FileStoreTable fixedTable =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ // Switch to postpone bucket so that APPEND/OVERWRITE files land in
bucket=-2.
+ Map<String, String> postponeOptions = new HashMap<>();
+ postponeOptions.put(CoreOptions.BUCKET.key(), "-2");
+
postponeOptions.put(CoreOptions.POSTPONE_BATCH_WRITE_FIXED_BUCKET.key(),
"false");
+ FileStoreTable postponeTable = fixedTable.copy(postponeOptions);
+
+ String user1 = UUID.randomUUID().toString();
+
+ // snapshot 1: APPEND on pt=1 with postpone bucket (bucket=-2).
+ // When later checked in the APPEND branch, onlyReadRealBuckets() will
filter
+ // these entries out, so this snapshot alone does not throw; but it
DOES
+ // latch onlyReadRealBuckets=true onto the shared scan instance.
+ TableWriteImpl<?> write1 = postponeTable.newWrite(user1);
+ TableCommitImpl commit1 = postponeTable.newCommit(user1);
+ write1.write(GenericRow.of(1, 0, 0L));
+ commit1.commit(1, write1.prepareCommit(false, 1));
+ write1.close();
+ commit1.close();
+
+ // snapshot 2: OVERWRITE on pt=1, also writing postpone bucket files.
+ // The OVERWRITE branch should detect the partition overlap against a
later
+ // user's OVERWRITE on pt=1.
+ write1 = postponeTable.newWrite(user1);
+ commit1 =
postponeTable.newCommit(user1).withOverwrite(singletonMap("pt", "1"));
+ write1.write(GenericRow.of(1, 1, 1L));
+ commit1.commit(2, write1.prepareCommit(false, 2));
+ write1.close();
+ commit1.close();
+
+ // user2 commits OVERWRITE on pt=1 with strict mode enabled.
+ // Expected: throw because snapshot 2 (another user's OVERWRITE)
touches pt=1.
+ // With the leaking onlyReadRealBuckets flag, snapshot 2's bucket=-2
entries
+ // get filtered out and the conflict is silently missed.
+ String user2 = UUID.randomUUID().toString();
+ FileStoreTable tableWithStrict =
+
fixedTable.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "0"));
+ TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+ TableCommitImpl commit2 =
+
tableWithStrict.newCommit(user2).withOverwrite(singletonMap("pt", "1"));
+ write2.write(GenericRow.of(1, 2, 2L));
assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false,
3)))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining(
"Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write2.close();
+ commit2.close();
}
@Test
@@ -573,6 +780,93 @@ public class TableCommitTest {
commit2.commit(2, write2.prepareCommit(false, 3));
}
+ @Test
+ public void testStrictModeForNonPartitionedTable() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ String user1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write1 = table.newWrite(user1);
+ TableCommitImpl commit1 = table.newCommit(user1);
+ write1.write(GenericRow.of(0, 0L));
+ write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit1.commit(1, write1.prepareCommit(true, 1));
+
+ // test skip this commit check
+ String user2 = UUID.randomUUID().toString();
+ FileStoreTable tableWithStrict =
+
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+ TableWriteImpl<?> write2 = tableWithStrict.newWrite(user2);
+ TableCommitImpl commit2 = tableWithStrict.newCommit(user2);
+
+ write2.write(GenericRow.of(1, 1L));
+ commit2.commit(1, write2.prepareCommit(false, 1));
+
+ // For a non-partitioned table, any COMPACT snapshot from another user
must be
+ // detected as a conflict since all entries share BinaryRow.EMPTY_ROW.
+ write1.write(GenericRow.of(2, 2L));
+ write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit1.commit(2, write1.prepareCommit(true, 2));
+
+ write2.write(GenericRow.of(3, 3L));
+ assertThatThrownBy(() -> commit2.commit(2, write2.prepareCommit(false,
2)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ // APPEND with fixed bucket files should be caught when user2 commits
OVERWRITE,
+ // since non-partitioned table entries share BinaryRow.EMPTY_ROW so
they always
+ // overlap with user2's OVERWRITE target.
+ TableWriteImpl<?> write1Append = table.newWrite(user1);
+ TableCommitImpl commit1Append = table.newCommit(user1);
+ write1Append.write(GenericRow.of(4, 4L));
+ commit1Append.commit(3, write1Append.prepareCommit(false, 3));
+
+ FileStoreTable tableWithStrict2 =
+
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "3"));
+ TableWriteImpl<?> write2Ow = tableWithStrict2.newWrite(user2);
+ TableCommitImpl commit2Ow =
+
tableWithStrict2.newCommit(user2).withOverwrite(Collections.emptyMap());
+ write2Ow.write(GenericRow.of(5, 5L));
+ assertThatThrownBy(() -> commit2Ow.commit(4,
write2Ow.prepareCommit(false, 4)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+
+ write1Append.close();
+ commit1Append.close();
+ write2Ow.close();
+ commit2Ow.close();
+
+ write1.close();
+ commit1.close();
+ write2.close();
+ commit2.close();
+ }
+
@Test
public void testExpireForEmptyCommit() throws Exception {
String path = tempDir.toString();