This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a58e5e214 [core] Add committer-side bucket consistency check (#7793)
1a58e5e214 is described below

commit 1a58e5e214a0b4f516512b2841acbcd859ab6881
Author: WenjunMin <[email protected]>
AuthorDate: Thu May 14 00:08:54 2026 +0800

    [core] Add committer-side bucket consistency check (#7793)
    
    Add committer-side bucket consistency validation for write-only
    unordered append tables.
    
    after https://github.com/apache/paimon/pull/6741 When
    `bucket-append-ordered=false` and `write-only=true`, writers skip
    restoring previous files, so bucket-count validation can be bypassed
    after bucket rescale. This change adds an internal commit-side
    `checkSameBucket` path for fixed hash bucket tables to validate touched
    partitions before committing.
    
    The check is integrated with `ConflictDetection`, reuses the existing
    conflict path when available, and uses a bounded partition cache to
    avoid repeatedly checking the same partition within one committer
    lifecycle.
---
 .../paimon/operation/FileStoreCommitImpl.java      | 40 ++++++++++++-
 .../paimon/operation/commit/CommitScanner.java     | 46 ++++++++++++++-
 .../paimon/operation/commit/ConflictDetection.java | 65 ++++++++++++++++++++++
 .../BucketedAppendFileStoreWriteTest.java          | 41 ++++++++++++++
 .../apache/paimon/flink/AppendOnlyTableITCase.java | 22 ++++++++
 5 files changed, 209 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 102100b9fb..73f335fc02 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -189,7 +189,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestList = manifestListFactory.create();
         this.indexManifestFile = indexManifestFileFactory.create();
         this.rollback = rollback;
-        this.scanner = new CommitScanner(scanSupplier.get(), 
indexManifestFile, options);
+        this.scanner = new CommitScanner(scanSupplier, indexManifestFile, 
options);
         this.commitPreCallbacks = commitPreCallbacks;
         this.commitCallbacks = commitCallbacks;
         this.retryWaiter =
@@ -735,6 +735,36 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return retryCount + 1;
     }
 
+    private void checkSameBucketFromSnapshot(
+            List<ManifestEntry> deltaFiles, @Nullable Snapshot latestSnapshot) 
{
+        if (latestSnapshot == null) {
+            return;
+        }
+
+        Map<BinaryRow, Integer> expectedTotalBuckets =
+                conflictDetection.collectUncheckedBucketPartitions(deltaFiles);
+        if (expectedTotalBuckets.isEmpty()) {
+            return;
+        }
+
+        Map<BinaryRow, Integer> previousTotalBuckets =
+                scanner.readTotalBuckets(
+                        latestSnapshot, new 
ArrayList<>(expectedTotalBuckets.keySet()));
+        Optional<RuntimeException> exception =
+                conflictDetection.checkSameBucketByTotalBuckets(
+                        expectedTotalBuckets, previousTotalBuckets);
+        if (exception.isPresent()) {
+            throw exception.get();
+        }
+    }
+
+    private boolean shouldCheckSameBucket(CommitKind commitKind) {
+        return commitKind == CommitKind.APPEND
+                && bucketMode == BucketMode.HASH_FIXED
+                && options.writeOnly()
+                && !options.bucketAppendOrdered();
+    }
+
     /**
      * Try to overwrite partition.
      *
@@ -834,7 +864,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
         boolean discardDuplicate =
                 options.commitDiscardDuplicateFiles() && commitKind == 
CommitKind.APPEND;
-        if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
+        boolean checkConflicts = latestSnapshot != null && (discardDuplicate 
|| detectConflicts);
+        // By default, if checkConflicts is required, we do not have to do the 
extra check bucket
+        // here.
+        if (!checkConflicts && shouldCheckSameBucket(commitKind)) {
+            checkSameBucketFromSnapshot(deltaFiles, latestSnapshot);
+        }
+        if (checkConflicts) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
             if (changedPartitions == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index 9afe4500a4..e43f27ed89 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -34,7 +34,13 @@ import org.apache.paimon.table.source.ScanMode;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
 
 import static java.util.Collections.emptyList;
 
@@ -42,14 +48,20 @@ import static java.util.Collections.emptyList;
 public class CommitScanner {
 
     private final FileStoreScan scan;
+    private final Supplier<FileStoreScan> scanSupplier;
     private final IndexManifestFile indexManifestFile;
+    private final boolean dropStats;
 
     public CommitScanner(
-            FileStoreScan scan, IndexManifestFile indexManifestFile, 
CoreOptions options) {
-        this.scan = scan;
+            Supplier<FileStoreScan> scanSupplier,
+            IndexManifestFile indexManifestFile,
+            CoreOptions options) {
+        this.scanSupplier = scanSupplier;
+        this.scan = scanSupplier.get();
         this.indexManifestFile = indexManifestFile;
         // Stats in DELETE Manifest Entries is useless
-        if (options.manifestDeleteFileDropStats()) {
+        this.dropStats = options.manifestDeleteFileDropStats();
+        if (dropStats) {
             this.scan.dropStats();
         }
     }
@@ -89,6 +101,34 @@ public class CommitScanner {
         }
     }
 
+    public Map<BinaryRow, Integer> readTotalBuckets(
+            Snapshot snapshot, List<BinaryRow> changedPartitions) {
+        try {
+            Set<BinaryRow> remainingPartitions = new 
HashSet<>(changedPartitions);
+            Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+            FileStoreScan freshScan = scanSupplier.get();
+            if (dropStats) {
+                freshScan.dropStats();
+            }
+            Iterator<ManifestEntry> iterator =
+                    freshScan
+                            .withSnapshot(snapshot)
+                            .withKind(ScanMode.ALL)
+                            .withPartitionFilter(changedPartitions)
+                            .readFileIterator();
+            while (iterator.hasNext() && !remainingPartitions.isEmpty()) {
+                ManifestEntry entry = iterator.next();
+                int totalBucket = entry.totalBuckets();
+                if (totalBucket > 0 && 
remainingPartitions.remove(entry.partition())) {
+                    totalBuckets.put(entry.partition(), totalBucket);
+                }
+            }
+            return totalBuckets;
+        } catch (Throwable e) {
+            throw new RuntimeException("Cannot read total buckets from changed 
partitions.", e);
+        }
+    }
+
     public CommitChanges readOverwriteChanges(
             int numBucket,
             List<ManifestEntry> changes,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 5b3f76697c..46d5c43619 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -71,6 +71,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
 public class ConflictDetection {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ConflictDetection.class);
+    private static final int SAME_BUCKET_CHECK_CACHE_MAX_SIZE = 1000;
 
     private final String tableName;
     private final String commitUser;
@@ -84,6 +85,13 @@ public class ConflictDetection {
     private final IndexFileHandler indexFileHandler;
     private final SnapshotManager snapshotManager;
     private final CommitScanner commitScanner;
+    private final Map<BinaryRow, Boolean> sameBucketCheckedPartitions =
+            new LinkedHashMap<BinaryRow, 
Boolean>(SAME_BUCKET_CHECK_CACHE_MAX_SIZE, 0.75f, false) {
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<BinaryRow, 
Boolean> eldest) {
+                    return size() > SAME_BUCKET_CHECK_CACHE_MAX_SIZE;
+                }
+            };
 
     private @Nullable PartitionExpire partitionExpire;
     private @Nullable Long rowIdCheckFromSnapshot = null;
@@ -223,6 +231,39 @@ public class ConflictDetection {
         return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, 
deltaIndexEntries);
     }
 
+    public <T extends FileEntry> Map<BinaryRow, Integer> 
collectUncheckedBucketPartitions(
+            List<T> deltaEntries) {
+        Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+        for (T entry : deltaEntries) {
+            if (entry.kind() != FileKind.ADD
+                    || entry.totalBuckets() <= 0
+                    || 
sameBucketCheckedPartitions.containsKey(entry.partition())) {
+                continue;
+            }
+
+            Integer previous = totalBuckets.putIfAbsent(entry.partition(), 
entry.totalBuckets());
+            if (previous != null && previous != entry.totalBuckets()) {
+                throwBucketNumMismatch(entry.partition(), 
entry.totalBuckets(), previous);
+            }
+        }
+        return totalBuckets;
+    }
+
+    public Optional<RuntimeException> checkSameBucketByTotalBuckets(
+            Map<BinaryRow, Integer> expectedTotalBuckets,
+            Map<BinaryRow, Integer> previousTotalBuckets) {
+        for (Map.Entry<BinaryRow, Integer> entry : 
expectedTotalBuckets.entrySet()) {
+            Integer previous = previousTotalBuckets.get(entry.getKey());
+            if (previous != null && !Objects.equals(previous, 
entry.getValue())) {
+                return Optional.of(bucketNumMismatch(entry.getKey(), 
entry.getValue(), previous));
+            }
+        }
+        for (BinaryRow partition : expectedTotalBuckets.keySet()) {
+            sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
+        }
+        return Optional.empty();
+    }
+
     private Optional<RuntimeException> checkBucketKeepSame(
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
@@ -239,6 +280,9 @@ public class ConflictDetection {
             if (entry.totalBuckets() <= 0) {
                 continue;
             }
+            if (sameBucketCheckedPartitions.containsKey(entry.partition())) {
+                continue;
+            }
 
             if (!totalBuckets.containsKey(entry.partition())) {
                 totalBuckets.put(entry.partition(), entry.totalBuckets());
@@ -266,9 +310,30 @@ public class ConflictDetection {
             LOG.warn("", conflictException.getLeft());
             return Optional.of(conflictException.getRight());
         }
+        for (BinaryRow partition : totalBuckets.keySet()) {
+            sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
+        }
         return Optional.empty();
     }
 
+    private void throwBucketNumMismatch(
+            BinaryRow partition, int numBuckets, int previousNumBuckets) {
+        throw bucketNumMismatch(partition, numBuckets, previousNumBuckets);
+    }
+
+    private RuntimeException bucketNumMismatch(
+            BinaryRow partition, int numBuckets, int previousNumBuckets) {
+        String partInfo =
+                partitionType.getFieldCount() > 0
+                        ? "partition {" + 
pathFactory.getPartitionString(partition) + "}"
+                        : "table";
+        return new RuntimeException(
+                String.format(
+                        "Try to write %s with a new bucket num %d, but the 
previous bucket num is %d. "
+                                + "Please switch to batch mode, and perform 
INSERT OVERWRITE to rescale current data layout first.",
+                        partInfo, numBuckets, previousNumBuckets));
+    }
+
     private Optional<RuntimeException> checkKeyRange(
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
index af1becc465..556209b2d3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java
@@ -45,11 +45,15 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
 import static org.apache.paimon.CoreOptions.WRITE_MAX_WRITERS_TO_SPILL;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 
 /** Tests for {@link BucketedAppendFileStoreWrite}. */
 public class BucketedAppendFileStoreWriteTest {
@@ -172,6 +176,43 @@ public class BucketedAppendFileStoreWriteTest {
         return (FileStoreTable) catalog.getTable(identifier);
     }
 
+    @Test
+    public void testIgnorePreviousFilesChecksPartitionBucketNumber() throws 
Exception {
+        FileStoreTable table = createFileStoreTable().copy(bucketOptions(2, 
false, false));
+        BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) 
table.store().newWrite("ss");
+        StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();
+
+        write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        FileStoreTable rescaledTable = table.copy(bucketOptions(4, false, 
true));
+        write = (BaseAppendFileStoreWrite) 
rescaledTable.store().newWrite("ss");
+        write.write(partition(1), 1, GenericRow.of(1, 1, 0));
+        List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
+        Assertions.assertThat(commitMessages).isNotEmpty();
+        Assertions.assertThatThrownBy(
+                        () ->
+                                rescaledTable
+                                        .newStreamWriteBuilder()
+                                        .newCommit()
+                                        .commit(1, commitMessages))
+                .hasMessageContaining("new bucket num 4")
+                .hasMessageContaining("previous bucket num is 2");
+
+        write = (BaseAppendFileStoreWrite) 
rescaledTable.store().newWrite("ss");
+        write.write(partition(2), 2, GenericRow.of(2, 2, 0));
+        rescaledTable.newStreamWriteBuilder().newCommit().commit(2, 
write.prepareCommit(false, 2));
+    }
+
+    private Map<String, String> bucketOptions(
+            int bucket, boolean bucketAppendOrdered, boolean writeOnly) {
+        Map<String, String> options = new HashMap<>();
+        options.put(BUCKET.key(), String.valueOf(bucket));
+        options.put(BUCKET_APPEND_ORDERED.key(), 
String.valueOf(bucketAppendOrdered));
+        options.put(WRITE_ONLY.key(), String.valueOf(writeOnly));
+        return options;
+    }
+
     private BinaryRow partition(int i) {
         BinaryRow binaryRow = new BinaryRow(1);
         BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 9c1796fcf4..f14c9854df 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -102,6 +102,28 @@ public class AppendOnlyTableITCase extends 
CatalogITCaseBase {
         assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), 
Row.of("BBB"));
     }
 
+    @Test
+    public void testInsertIntoCheckSameBucketAndInsertOverwriteRescale() {
+        batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')");
+        batchSql(
+                "ALTER TABLE append_table SET ("
+                        + "'bucket' = '2', "
+                        + "'bucket-append-ordered' = 'false', "
+                        + "'write-only' = 'true')");
+
+        assertThatThrownBy(() -> batchSql("INSERT INTO append_table VALUES (3, 
'CCC')"))
+                .rootCause()
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        "Try to write table with a new bucket num 2, but the 
previous bucket num is 1. "
+                                + "Please switch to batch mode, and perform 
INSERT OVERWRITE to rescale current data layout first.");
+
+        batchSql("INSERT OVERWRITE append_table VALUES (3, 'CCC'), (4, 
'DDD')");
+
+        assertThat(batchSql("SELECT * FROM append_table"))
+                .containsExactlyInAnyOrder(Row.of(3, "CCC"), Row.of(4, "DDD"));
+    }
+
     @Test
     public void testReadWriteWithExternalPathRoundRobinStrategy1() {
         String externalPaths =

Reply via email to