This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8264611381 [core] Total buckets should remain the same within the same
partition when commit (#5725)
8264611381 is described below
commit 8264611381d75e889c13083acfb21d8bfcf7a42f
Author: tsreaper <[email protected]>
AuthorDate: Tue Jun 10 18:28:05 2025 +0800
[core] Total buckets should remain the same within the same partition when
commit (#5725)
---
.../apache/paimon/manifest/ExpireFileEntry.java | 3 ++
.../java/org/apache/paimon/manifest/FileEntry.java | 2 +
.../org/apache/paimon/manifest/ManifestEntry.java | 1 +
.../apache/paimon/manifest/SimpleFileEntry.java | 23 ++++++++-
.../paimon/operation/FileStoreCommitImpl.java | 50 ++++++++++++++++++--
.../apache/paimon/table/sink/TableCommitTest.java | 55 ++++++++++++++++++++++
6 files changed, 129 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 5d6d68144e..48ab6d5d5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -35,6 +35,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
FileKind kind,
BinaryRow partition,
int bucket,
+ int totalBuckets,
int level,
String fileName,
List<String> extraFiles,
@@ -47,6 +48,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
kind,
partition,
bucket,
+ totalBuckets,
level,
fileName,
extraFiles,
@@ -66,6 +68,7 @@ public class ExpireFileEntry extends SimpleFileEntry {
entry.kind(),
entry.partition(),
entry.bucket(),
+ entry.totalBuckets(),
entry.level(),
entry.fileName(),
entry.file().extraFiles(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index dd77759de1..a09bea2acf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -50,6 +50,8 @@ public interface FileEntry {
int bucket();
+ int totalBuckets();
+
int level();
String fileName();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
index 3cb5733a38..75ab52ac75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java
@@ -115,6 +115,7 @@ public class ManifestEntry implements FileEntry {
return file.extraFiles();
}
+ @Override
public int totalBuckets() {
return totalBuckets;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index c8708db0b8..14ae43c349 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -32,6 +32,7 @@ public class SimpleFileEntry implements FileEntry {
private final FileKind kind;
private final BinaryRow partition;
private final int bucket;
+ private final int totalBuckets;
private final int level;
private final String fileName;
private final List<String> extraFiles;
@@ -44,6 +45,7 @@ public class SimpleFileEntry implements FileEntry {
FileKind kind,
BinaryRow partition,
int bucket,
+ int totalBuckets,
int level,
String fileName,
List<String> extraFiles,
@@ -54,6 +56,7 @@ public class SimpleFileEntry implements FileEntry {
this.kind = kind;
this.partition = partition;
this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
@@ -68,6 +71,7 @@ public class SimpleFileEntry implements FileEntry {
entry.kind(),
entry.partition(),
entry.bucket(),
+ entry.totalBuckets(),
entry.level(),
entry.fileName(),
entry.file().extraFiles(),
@@ -96,6 +100,11 @@ public class SimpleFileEntry implements FileEntry {
return bucket;
}
+ @Override
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
@Override
public int level() {
return level;
@@ -143,6 +152,7 @@ public class SimpleFileEntry implements FileEntry {
}
SimpleFileEntry that = (SimpleFileEntry) o;
return bucket == that.bucket
+ && totalBuckets == that.totalBuckets
&& level == that.level
&& kind == that.kind
&& Objects.equals(partition, that.partition)
@@ -156,7 +166,16 @@ public class SimpleFileEntry implements FileEntry {
@Override
public int hashCode() {
return Objects.hash(
- kind, partition, bucket, level, fileName, extraFiles, minKey,
maxKey, externalPath);
+ kind,
+ partition,
+ bucket,
+ totalBuckets,
+ level,
+ fileName,
+ extraFiles,
+ minKey,
+ maxKey,
+ externalPath);
}
@Override
@@ -168,6 +187,8 @@ public class SimpleFileEntry implements FileEntry {
+ partition
+ ", bucket="
+ bucket
+ + ", totalBuckets="
+ + totalBuckets
+ ", level="
+ level
+ ", fileName="
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 852776d2fe..79ab01b37c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -319,7 +319,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
readAllEntriesFromChangedPartitions(
latestSnapshot, appendTableFiles,
compactTableFiles));
noConflictsOrFail(
- latestSnapshot.commitUser(), baseEntries,
appendSimpleEntries);
+ latestSnapshot.commitUser(),
+ baseEntries,
+ appendSimpleEntries,
+ Snapshot.CommitKind.APPEND);
safeLatestSnapshotId = latestSnapshot.id();
}
@@ -352,7 +355,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
noConflictsOrFail(
latestSnapshot.commitUser(),
baseEntries,
- SimpleFileEntry.from(compactTableFiles));
+ SimpleFileEntry.from(compactTableFiles),
+ Snapshot.CommitKind.COMPACT);
// assume this compact commit follows just after the
append commit created above
safeLatestSnapshotId += 1;
}
@@ -904,7 +908,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
noConflictsOrFail(
latestSnapshot.commitUser(),
baseDataFiles,
- SimpleFileEntry.from(deltaFiles));
+ SimpleFileEntry.from(deltaFiles),
+ commitKind);
} catch (Exception e) {
if (retryResult != null) {
retryResult.cleanAll();
@@ -1252,10 +1257,47 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
private void noConflictsOrFail(
String baseCommitUser,
List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> changes) {
+ List<SimpleFileEntry> changes,
+ Snapshot.CommitKind commitKind) {
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(changes);
+ if (commitKind != Snapshot.CommitKind.OVERWRITE) {
+ // total buckets within the same partition should remain the same
+ Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+ for (SimpleFileEntry entry : allEntries) {
+ if (entry.totalBuckets() <= 0) {
+ continue;
+ }
+
+ if (!totalBuckets.containsKey(entry.partition())) {
+ totalBuckets.put(entry.partition(), entry.totalBuckets());
+ continue;
+ }
+
+ int old = totalBuckets.get(entry.partition());
+ if (old == entry.totalBuckets()) {
+ continue;
+ }
+
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "Total buckets of partition "
+ + entry.partition()
+ + " changed from "
+ + old
+ + " to "
+ + entry.totalBuckets()
+ + " without overwrite. Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ changes,
+ null);
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
+ }
+ }
+
java.util.function.Consumer<Throwable> conflictHandler =
e -> {
Pair<RuntimeException, RuntimeException> conflictException
=
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 6dc77799ae..67c05c996d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -258,4 +259,58 @@ public class TableCommitTest {
"Cannot recover from this checkpoint because some
files in the"
+ " snapshot that need to be resubmitted have
been deleted");
}
+
+ @Test
+ public void testGiveUpCommitWhenTotalBucketsChanged() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ String commitUser = UUID.randomUUID().toString();
+ try (TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser)) {
+ write.write(GenericRow.of(0, 0L));
+ commit.commit(1, write.prepareCommit(false, 1));
+ }
+
+ options = new Options(table.options());
+ options.set(CoreOptions.BUCKET, 2);
+ table = table.copy(tableSchema.copy(options.toMap()));
+
+ commitUser = UUID.randomUUID().toString();
+ try (TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser)) {
+ write.getWrite().withIgnoreNumBucketCheck(true);
+ for (int i = 1; i < 10; i++) {
+ write.write(GenericRow.of(i, (long) i));
+ }
+ for (int i = 0; i < 2; i++) {
+ write.compact(BinaryRow.EMPTY_ROW, i, true);
+ }
+ assertThatThrownBy(() -> commit.commit(1,
write.prepareCommit(true, 1)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("changed from 1 to 2 without
overwrite");
+ }
+ }
}