This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1a58e5e214 [core] Add committer-side bucket consistency check (#7793)
1a58e5e214 is described below
commit 1a58e5e214a0b4f516512b2841acbcd859ab6881
Author: WenjunMin <[email protected]>
AuthorDate: Thu May 14 00:08:54 2026 +0800
[core] Add committer-side bucket consistency check (#7793)
Add committer-side bucket consistency validation for write-only
unordered append tables.
after https://github.com/apache/paimon/pull/6741 When
`bucket-append-ordered=false` and `write-only=true`, writers skip
restoring previous files, so bucket-count validation can be bypassed
after bucket rescale. This change adds an internal commit-side
`checkSameBucket` path for fixed hash bucket tables to validate touched
partitions before committing.
The check is integrated with `ConflictDetection`, reuses the existing
conflict path when available, and uses a bounded partition cache to
avoid repeatedly checking the same partition within one committer
lifecycle.
---
.../paimon/operation/FileStoreCommitImpl.java | 40 ++++++++++++-
.../paimon/operation/commit/CommitScanner.java | 46 ++++++++++++++-
.../paimon/operation/commit/ConflictDetection.java | 65 ++++++++++++++++++++++
.../BucketedAppendFileStoreWriteTest.java | 41 ++++++++++++++
.../apache/paimon/flink/AppendOnlyTableITCase.java | 22 ++++++++
5 files changed, 209 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 102100b9fb..73f335fc02 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -189,7 +189,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
- this.scanner = new CommitScanner(scanSupplier.get(),
indexManifestFile, options);
+ this.scanner = new CommitScanner(scanSupplier, indexManifestFile,
options);
this.commitPreCallbacks = commitPreCallbacks;
this.commitCallbacks = commitCallbacks;
this.retryWaiter =
@@ -735,6 +735,36 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return retryCount + 1;
}
+ private void checkSameBucketFromSnapshot(
+ List<ManifestEntry> deltaFiles, @Nullable Snapshot latestSnapshot)
{
+ if (latestSnapshot == null) {
+ return;
+ }
+
+ Map<BinaryRow, Integer> expectedTotalBuckets =
+ conflictDetection.collectUncheckedBucketPartitions(deltaFiles);
+ if (expectedTotalBuckets.isEmpty()) {
+ return;
+ }
+
+ Map<BinaryRow, Integer> previousTotalBuckets =
+ scanner.readTotalBuckets(
+ latestSnapshot, new
ArrayList<>(expectedTotalBuckets.keySet()));
+ Optional<RuntimeException> exception =
+ conflictDetection.checkSameBucketByTotalBuckets(
+ expectedTotalBuckets, previousTotalBuckets);
+ if (exception.isPresent()) {
+ throw exception.get();
+ }
+ }
+
+ private boolean shouldCheckSameBucket(CommitKind commitKind) {
+ return commitKind == CommitKind.APPEND
+ && bucketMode == BucketMode.HASH_FIXED
+ && options.writeOnly()
+ && !options.bucketAppendOrdered();
+ }
+
/**
* Try to overwrite partition.
*
@@ -834,7 +864,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
boolean discardDuplicate =
options.commitDiscardDuplicateFiles() && commitKind ==
CommitKind.APPEND;
- if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
+ boolean checkConflicts = latestSnapshot != null && (discardDuplicate
|| detectConflicts);
+ // By default, if checkConflicts is required, we do not have to do the
extra check bucket
+ // here.
+ if (!checkConflicts && shouldCheckSameBucket(commitKind)) {
+ checkSameBucketFromSnapshot(deltaFiles, latestSnapshot);
+ }
+ if (checkConflicts) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
if (changedPartitions == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index 9afe4500a4..e43f27ed89 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -34,7 +34,13 @@ import org.apache.paimon.table.source.ScanMode;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
import static java.util.Collections.emptyList;
@@ -42,14 +48,20 @@ import static java.util.Collections.emptyList;
public class CommitScanner {
private final FileStoreScan scan;
+ private final Supplier<FileStoreScan> scanSupplier;
private final IndexManifestFile indexManifestFile;
+ private final boolean dropStats;
public CommitScanner(
- FileStoreScan scan, IndexManifestFile indexManifestFile,
CoreOptions options) {
- this.scan = scan;
+ Supplier<FileStoreScan> scanSupplier,
+ IndexManifestFile indexManifestFile,
+ CoreOptions options) {
+ this.scanSupplier = scanSupplier;
+ this.scan = scanSupplier.get();
this.indexManifestFile = indexManifestFile;
// Stats in DELETE Manifest Entries is useless
- if (options.manifestDeleteFileDropStats()) {
+ this.dropStats = options.manifestDeleteFileDropStats();
+ if (dropStats) {
this.scan.dropStats();
}
}
@@ -89,6 +101,34 @@ public class CommitScanner {
}
}
+ public Map<BinaryRow, Integer> readTotalBuckets(
+ Snapshot snapshot, List<BinaryRow> changedPartitions) {
+ try {
+ Set<BinaryRow> remainingPartitions = new
HashSet<>(changedPartitions);
+ Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+ FileStoreScan freshScan = scanSupplier.get();
+ if (dropStats) {
+ freshScan.dropStats();
+ }
+ Iterator<ManifestEntry> iterator =
+ freshScan
+ .withSnapshot(snapshot)
+ .withKind(ScanMode.ALL)
+ .withPartitionFilter(changedPartitions)
+ .readFileIterator();
+ while (iterator.hasNext() && !remainingPartitions.isEmpty()) {
+ ManifestEntry entry = iterator.next();
+ int totalBucket = entry.totalBuckets();
+ if (totalBucket > 0 &&
remainingPartitions.remove(entry.partition())) {
+ totalBuckets.put(entry.partition(), totalBucket);
+ }
+ }
+ return totalBuckets;
+ } catch (Throwable e) {
+ throw new RuntimeException("Cannot read total buckets from changed
partitions.", e);
+ }
+ }
+
public CommitChanges readOverwriteChanges(
int numBucket,
List<ManifestEntry> changes,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 5b3f76697c..46d5c43619 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -71,6 +71,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
public class ConflictDetection {
private static final Logger LOG =
LoggerFactory.getLogger(ConflictDetection.class);
+ private static final int SAME_BUCKET_CHECK_CACHE_MAX_SIZE = 1000;
private final String tableName;
private final String commitUser;
@@ -84,6 +85,13 @@ public class ConflictDetection {
private final IndexFileHandler indexFileHandler;
private final SnapshotManager snapshotManager;
private final CommitScanner commitScanner;
+ private final Map<BinaryRow, Boolean> sameBucketCheckedPartitions =
+ new LinkedHashMap<BinaryRow,
Boolean>(SAME_BUCKET_CHECK_CACHE_MAX_SIZE, 0.75f, false) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<BinaryRow,
Boolean> eldest) {
+ return size() > SAME_BUCKET_CHECK_CACHE_MAX_SIZE;
+ }
+ };
private @Nullable PartitionExpire partitionExpire;
private @Nullable Long rowIdCheckFromSnapshot = null;
@@ -223,6 +231,39 @@ public class ConflictDetection {
return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries,
deltaIndexEntries);
}
+ public <T extends FileEntry> Map<BinaryRow, Integer>
collectUncheckedBucketPartitions(
+ List<T> deltaEntries) {
+ Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+ for (T entry : deltaEntries) {
+ if (entry.kind() != FileKind.ADD
+ || entry.totalBuckets() <= 0
+ ||
sameBucketCheckedPartitions.containsKey(entry.partition())) {
+ continue;
+ }
+
+ Integer previous = totalBuckets.putIfAbsent(entry.partition(),
entry.totalBuckets());
+ if (previous != null && previous != entry.totalBuckets()) {
+ throwBucketNumMismatch(entry.partition(),
entry.totalBuckets(), previous);
+ }
+ }
+ return totalBuckets;
+ }
+
+ public Optional<RuntimeException> checkSameBucketByTotalBuckets(
+ Map<BinaryRow, Integer> expectedTotalBuckets,
+ Map<BinaryRow, Integer> previousTotalBuckets) {
+ for (Map.Entry<BinaryRow, Integer> entry :
expectedTotalBuckets.entrySet()) {
+ Integer previous = previousTotalBuckets.get(entry.getKey());
+ if (previous != null && !Objects.equals(previous,
entry.getValue())) {
+ return Optional.of(bucketNumMismatch(entry.getKey(),
entry.getValue(), previous));
+ }
+ }
+ for (BinaryRow partition : expectedTotalBuckets.keySet()) {
+ sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
+ }
+ return Optional.empty();
+ }
+
private Optional<RuntimeException> checkBucketKeepSame(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
@@ -239,6 +280,9 @@ public class ConflictDetection {
if (entry.totalBuckets() <= 0) {
continue;
}
+ if (sameBucketCheckedPartitions.containsKey(entry.partition())) {
+ continue;
+ }
if (!totalBuckets.containsKey(entry.partition())) {
totalBuckets.put(entry.partition(), entry.totalBuckets());
@@ -266,9 +310,30 @@ public class ConflictDetection {
LOG.warn("", conflictException.getLeft());
return Optional.of(conflictException.getRight());
}
+ for (BinaryRow partition : totalBuckets.keySet()) {
+ sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
+ }
return Optional.empty();
}
+ private void throwBucketNumMismatch(
+ BinaryRow partition, int numBuckets, int previousNumBuckets) {
+ throw bucketNumMismatch(partition, numBuckets, previousNumBuckets);
+ }
+
+ private RuntimeException bucketNumMismatch(
+ BinaryRow partition, int numBuckets, int previousNumBuckets) {
+ String partInfo =
+ partitionType.getFieldCount() > 0
+ ? "partition {" +
pathFactory.getPartitionString(partition) + "}"
+ : "table";
+ return new RuntimeException(
+ String.format(
+ "Try to write %s with a new bucket num %d, but the
previous bucket num is %d. "
+ + "Please switch to batch mode, and perform
INSERT OVERWRITE to rescale current data layout first.",
+ partInfo, numBuckets, previousNumBuckets));
+ }
+
private Optional<RuntimeException> checkKeyRange(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
index af1becc465..556209b2d3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
@@ -45,11 +45,15 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
import static org.apache.paimon.CoreOptions.WRITE_MAX_WRITERS_TO_SPILL;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
/** Tests for {@link BucketedAppendFileStoreWrite}. */
public class BucketedAppendFileStoreWriteTest {
@@ -172,6 +176,43 @@ public class BucketedAppendFileStoreWriteTest {
return (FileStoreTable) catalog.getTable(identifier);
}
+ @Test
+ public void testIgnorePreviousFilesChecksPartitionBucketNumber() throws
Exception {
+ FileStoreTable table = createFileStoreTable().copy(bucketOptions(2,
false, false));
+ BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite)
table.store().newWrite("ss");
+ StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+
+ write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ FileStoreTable rescaledTable = table.copy(bucketOptions(4, false,
true));
+ write = (BaseAppendFileStoreWrite)
rescaledTable.store().newWrite("ss");
+ write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+ List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+ Assertions.assertThat(commitMessages).isNotEmpty();
+ Assertions.assertThatThrownBy(
+ () ->
+ rescaledTable
+ .newStreamWriteBuilder()
+ .newCommit()
+ .commit(1, commitMessages))
+ .hasMessageContaining("new bucket num 4")
+ .hasMessageContaining("previous bucket num is 2");
+
+ write = (BaseAppendFileStoreWrite)
rescaledTable.store().newWrite("ss");
+ write.write(partition(2), 2, GenericRow.of(2, 2, 0));
+ rescaledTable.newStreamWriteBuilder().newCommit().commit(2,
write.prepareCommit(false, 2));
+ }
+
+ private Map<String, String> bucketOptions(
+ int bucket, boolean bucketAppendOrdered, boolean writeOnly) {
+ Map<String, String> options = new HashMap<>();
+ options.put(BUCKET.key(), String.valueOf(bucket));
+ options.put(BUCKET_APPEND_ORDERED.key(),
String.valueOf(bucketAppendOrdered));
+ options.put(WRITE_ONLY.key(), String.valueOf(writeOnly));
+ return options;
+ }
+
private BinaryRow partition(int i) {
BinaryRow binaryRow = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 9c1796fcf4..f14c9854df 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -102,6 +102,28 @@ public class AppendOnlyTableITCase extends
CatalogITCaseBase {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"),
Row.of("BBB"));
}
+ @Test
+ public void testInsertIntoCheckSameBucketAndInsertOverwriteRescale() {
+ batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')");
+ batchSql(
+ "ALTER TABLE append_table SET ("
+ + "'bucket' = '2', "
+ + "'bucket-append-ordered' = 'false', "
+ + "'write-only' = 'true')");
+
+ assertThatThrownBy(() -> batchSql("INSERT INTO append_table VALUES (3,
'CCC')"))
+ .rootCause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "Try to write table with a new bucket num 2, but the
previous bucket num is 1. "
+ + "Please switch to batch mode, and perform
INSERT OVERWRITE to rescale current data layout first.");
+
+ batchSql("INSERT OVERWRITE append_table VALUES (3, 'CCC'), (4,
'DDD')");
+
+ assertThat(batchSql("SELECT * FROM append_table"))
+ .containsExactlyInAnyOrder(Row.of(3, "CCC"), Row.of(4, "DDD"));
+ }
+
@Test
public void testReadWriteWithExternalPathRoundRobinStrategy1() {
String externalPaths =