This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8264611381 [core] Total buckets should remain the same within the same 
partition when commit (#5725)
8264611381 is described below

commit 8264611381d75e889c13083acfb21d8bfcf7a42f
Author: tsreaper <[email protected]>
AuthorDate: Tue Jun 10 18:28:05 2025 +0800

    [core] Total buckets should remain the same within the same partition when 
commit (#5725)
---
 .../apache/paimon/manifest/ExpireFileEntry.java    |  3 ++
 .../java/org/apache/paimon/manifest/FileEntry.java |  2 +
 .../org/apache/paimon/manifest/ManifestEntry.java  |  1 +
 .../apache/paimon/manifest/SimpleFileEntry.java    | 23 ++++++++-
 .../paimon/operation/FileStoreCommitImpl.java      | 50 ++++++++++++++++++--
 .../apache/paimon/table/sink/TableCommitTest.java  | 55 ++++++++++++++++++++++
 6 files changed, 129 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 5d6d68144e..48ab6d5d5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -35,6 +35,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
             FileKind kind,
             BinaryRow partition,
             int bucket,
+            int totalBuckets,
             int level,
             String fileName,
             List<String> extraFiles,
@@ -47,6 +48,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
                 kind,
                 partition,
                 bucket,
+                totalBuckets,
                 level,
                 fileName,
                 extraFiles,
@@ -66,6 +68,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
                 entry.kind(),
                 entry.partition(),
                 entry.bucket(),
+                entry.totalBuckets(),
                 entry.level(),
                 entry.fileName(),
                 entry.file().extraFiles(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index dd77759de1..a09bea2acf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -50,6 +50,8 @@ public interface FileEntry {
 
     int bucket();
 
+    int totalBuckets();
+
     int level();
 
     String fileName();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 3cb5733a38..75ab52ac75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -115,6 +115,7 @@ public class ManifestEntry implements FileEntry {
         return file.extraFiles();
     }
 
+    @Override
     public int totalBuckets() {
         return totalBuckets;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index c8708db0b8..14ae43c349 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -32,6 +32,7 @@ public class SimpleFileEntry implements FileEntry {
     private final FileKind kind;
     private final BinaryRow partition;
     private final int bucket;
+    private final int totalBuckets;
     private final int level;
     private final String fileName;
     private final List<String> extraFiles;
@@ -44,6 +45,7 @@ public class SimpleFileEntry implements FileEntry {
             FileKind kind,
             BinaryRow partition,
             int bucket,
+            int totalBuckets,
             int level,
             String fileName,
             List<String> extraFiles,
@@ -54,6 +56,7 @@ public class SimpleFileEntry implements FileEntry {
         this.kind = kind;
         this.partition = partition;
         this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
         this.level = level;
         this.fileName = fileName;
         this.extraFiles = extraFiles;
@@ -68,6 +71,7 @@ public class SimpleFileEntry implements FileEntry {
                 entry.kind(),
                 entry.partition(),
                 entry.bucket(),
+                entry.totalBuckets(),
                 entry.level(),
                 entry.fileName(),
                 entry.file().extraFiles(),
@@ -96,6 +100,11 @@ public class SimpleFileEntry implements FileEntry {
         return bucket;
     }
 
+    @Override
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
     @Override
     public int level() {
         return level;
@@ -143,6 +152,7 @@ public class SimpleFileEntry implements FileEntry {
         }
         SimpleFileEntry that = (SimpleFileEntry) o;
         return bucket == that.bucket
+                && totalBuckets == that.totalBuckets
                 && level == that.level
                 && kind == that.kind
                 && Objects.equals(partition, that.partition)
@@ -156,7 +166,16 @@ public class SimpleFileEntry implements FileEntry {
     @Override
     public int hashCode() {
         return Objects.hash(
-                kind, partition, bucket, level, fileName, extraFiles, minKey, 
maxKey, externalPath);
+                kind,
+                partition,
+                bucket,
+                totalBuckets,
+                level,
+                fileName,
+                extraFiles,
+                minKey,
+                maxKey,
+                externalPath);
     }
 
     @Override
@@ -168,6 +187,8 @@ public class SimpleFileEntry implements FileEntry {
                 + partition
                 + ", bucket="
                 + bucket
+                + ", totalBuckets="
+                + totalBuckets
                 + ", level="
                 + level
                 + ", fileName="
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 852776d2fe..79ab01b37c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -319,7 +319,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                             readAllEntriesFromChangedPartitions(
                                     latestSnapshot, appendTableFiles, 
compactTableFiles));
                     noConflictsOrFail(
-                            latestSnapshot.commitUser(), baseEntries, 
appendSimpleEntries);
+                            latestSnapshot.commitUser(),
+                            baseEntries,
+                            appendSimpleEntries,
+                            Snapshot.CommitKind.APPEND);
                     safeLatestSnapshotId = latestSnapshot.id();
                 }
 
@@ -352,7 +355,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     noConflictsOrFail(
                             latestSnapshot.commitUser(),
                             baseEntries,
-                            SimpleFileEntry.from(compactTableFiles));
+                            SimpleFileEntry.from(compactTableFiles),
+                            Snapshot.CommitKind.COMPACT);
                     // assume this compact commit follows just after the 
append commit created above
                     safeLatestSnapshotId += 1;
                 }
@@ -904,7 +908,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 noConflictsOrFail(
                         latestSnapshot.commitUser(),
                         baseDataFiles,
-                        SimpleFileEntry.from(deltaFiles));
+                        SimpleFileEntry.from(deltaFiles),
+                        commitKind);
             } catch (Exception e) {
                 if (retryResult != null) {
                     retryResult.cleanAll();
@@ -1252,10 +1257,47 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     private void noConflictsOrFail(
             String baseCommitUser,
             List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> changes) {
+            List<SimpleFileEntry> changes,
+            Snapshot.CommitKind commitKind) {
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(changes);
 
+        if (commitKind != Snapshot.CommitKind.OVERWRITE) {
+            // total buckets within the same partition should remain the same
+            Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+            for (SimpleFileEntry entry : allEntries) {
+                if (entry.totalBuckets() <= 0) {
+                    continue;
+                }
+
+                if (!totalBuckets.containsKey(entry.partition())) {
+                    totalBuckets.put(entry.partition(), entry.totalBuckets());
+                    continue;
+                }
+
+                int old = totalBuckets.get(entry.partition());
+                if (old == entry.totalBuckets()) {
+                    continue;
+                }
+
+                Pair<RuntimeException, RuntimeException> conflictException =
+                        createConflictException(
+                                "Total buckets of partition "
+                                        + entry.partition()
+                                        + " changed from "
+                                        + old
+                                        + " to "
+                                        + entry.totalBuckets()
+                                        + " without overwrite. Give up 
committing.",
+                                baseCommitUser,
+                                baseEntries,
+                                changes,
+                                null);
+                LOG.warn("", conflictException.getLeft());
+                throw conflictException.getRight();
+            }
+        }
+
         java.util.function.Consumer<Throwable> conflictHandler =
                 e -> {
                     Pair<RuntimeException, RuntimeException> conflictException 
=
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 6dc77799ae..67c05c996d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -258,4 +259,58 @@ public class TableCommitTest {
                         "Cannot recover from this checkpoint because some 
files in the"
                                 + " snapshot that need to be resubmitted have 
been deleted");
     }
+
+    @Test
+    public void testGiveUpCommitWhenTotalBucketsChanged() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        String commitUser = UUID.randomUUID().toString();
+        try (TableWriteImpl<?> write = table.newWrite(commitUser);
+                TableCommitImpl commit = table.newCommit(commitUser)) {
+            write.write(GenericRow.of(0, 0L));
+            commit.commit(1, write.prepareCommit(false, 1));
+        }
+
+        options = new Options(table.options());
+        options.set(CoreOptions.BUCKET, 2);
+        table = table.copy(tableSchema.copy(options.toMap()));
+
+        commitUser = UUID.randomUUID().toString();
+        try (TableWriteImpl<?> write = table.newWrite(commitUser);
+                TableCommitImpl commit = table.newCommit(commitUser)) {
+            write.getWrite().withIgnoreNumBucketCheck(true);
+            for (int i = 1; i < 10; i++) {
+                write.write(GenericRow.of(i, (long) i));
+            }
+            for (int i = 0; i < 2; i++) {
+                write.compact(BinaryRow.EMPTY_ROW, i, true);
+            }
+            assertThatThrownBy(() -> commit.commit(1, 
write.prepareCommit(true, 1)))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("changed from 1 to 2 without 
overwrite");
+        }
+    }
 }

Reply via email to