This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1187e935 [core] Append commit should check bucket number if latest
commit user is different (#6723)
7a1187e935 is described below
commit 7a1187e9354d656cebbc2b4dd1a42ac56e75a469
Author: yuzelin <[email protected]>
AuthorDate: Tue Dec 2 19:23:37 2025 +0800
[core] Append commit should check bucket number if latest commit user is
different (#6723)
---
.../apache/paimon/operation/FileStoreCommit.java | 2 +
.../paimon/operation/FileStoreCommitImpl.java | 9 +++
.../paimon/table/sink/BatchWriteBuilderImpl.java | 13 ++++-
.../apache/paimon/table/sink/InnerTableCommit.java | 2 +
.../apache/paimon/table/sink/TableCommitImpl.java | 6 ++
.../apache/paimon/table/sink/TableCommitTest.java | 68 ++++++++++++++++++++++
.../paimon/flink/sink/PostponeFixedBucketSink.java | 12 +++-
.../paimon/spark/commands/PaimonSparkWriter.scala | 2 +
8 files changed, 110 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 6a00db6f0e..ecd4975858 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -36,6 +36,8 @@ public interface FileStoreCommit extends AutoCloseable {
FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire);
+ FileStoreCommit appendCommitCheckConflict(boolean
appendCommitCheckConflict);
+
/** Find out which committables need to be retried when recovering from
the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index cc9890fc72..fc89ff1ecd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -154,6 +154,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
+ private boolean appendCommitCheckConflict = false;
public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
@@ -246,6 +247,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return this;
}
+ @Override
+ public FileStoreCommit appendCommitCheckConflict(boolean
appendCommitCheckConflict) {
+ this.appendCommitCheckConflict = appendCommitCheckConflict;
+ return this;
+ }
+
@Override
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committables) {
// nothing to filter, fast exit
@@ -327,6 +334,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (containsFileDeletionOrDeletionVectors(appendSimpleEntries,
appendIndexFiles)) {
commitKind = CommitKind.OVERWRITE;
conflictCheck = mustConflictCheck();
+ } else if (latestSnapshot != null &&
appendCommitCheckConflict) {
+ conflictCheck = mustConflictCheck();
}
boolean discardDuplicate = discardDuplicateFiles && commitKind
== CommitKind.APPEND;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 6c950a360a..66c67e8965 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -40,6 +40,7 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
private final String commitUser;
private Map<String, String> staticPartition;
+ private boolean appendCommitCheckConflict = false;
public BatchWriteBuilderImpl(InnerTable table) {
this.table = table;
@@ -81,7 +82,10 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
@Override
public BatchTableCommit newCommit() {
- InnerTableCommit commit =
table.newCommit(commitUser).withOverwrite(staticPartition);
+ InnerTableCommit commit =
+ table.newCommit(commitUser)
+ .withOverwrite(staticPartition)
+ .appendCommitCheckConflict(appendCommitCheckConflict);
commit.ignoreEmptyCommit(
Options.fromMap(table.options())
.getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
@@ -89,7 +93,12 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
return commit;
}
- public BatchWriteBuilder copyWithNewTable(Table newTable) {
+ public BatchWriteBuilderImpl copyWithNewTable(Table newTable) {
return new BatchWriteBuilderImpl((InnerTable) newTable, commitUser,
staticPartition);
}
+
+ public BatchWriteBuilderImpl appendCommitCheckConflict(boolean
appendCommitCheckConflict) {
+ this.appendCommitCheckConflict = appendCommitCheckConflict;
+ return this;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index df6241086a..0a8fdd6742 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -46,6 +46,8 @@ public interface InnerTableCommit extends StreamTableCommit,
BatchTableCommit {
InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit);
+ InnerTableCommit appendCommitCheckConflict(boolean
appendCommitCheckConflict);
+
@Override
InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 3e778c6bcc..9caf08ff28 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -158,6 +158,12 @@ public class TableCommitImpl implements InnerTableCommit {
return this;
}
+ @Override
+ public TableCommitImpl appendCommitCheckConflict(boolean
appendCommitCheckConflict) {
+ commit.appendCommitCheckConflict(appendCommitCheckConflict);
+ return this;
+ }
+
@Override
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
commit.withMetrics(new CommitMetrics(registry, tableName));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index e73695a209..bc0756fcd1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -50,6 +50,8 @@ import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Arrays;
import java.util.Collections;
@@ -67,6 +69,7 @@ import java.util.stream.LongStream;
import static java.util.Collections.singletonMap;
import static
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link TableCommit}. */
@@ -322,6 +325,71 @@ public class TableCommitTest {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testGiveUpCommitWhenAppendFoundTotalBucketsChanged(boolean
checkAppend)
+ throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ String commitUser1 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write1 = table.newWrite(commitUser1);
+ TableCommitImpl commit1 = table.newCommit(commitUser1);
+ for (int i = 1; i < 10; i++) {
+ write1.write(GenericRow.of(i, (long) i));
+ }
+
+ // mock rescale
+ String commitUser2 = UUID.randomUUID().toString();
+ options = new Options(table.options());
+ options.set(CoreOptions.BUCKET, 2);
+ FileStoreTable rescaleTable =
table.copy(tableSchema.copy(options.toMap()));
+ try (TableWriteImpl<?> write = rescaleTable.newWrite(commitUser2);
+ TableCommitImpl commit =
+
rescaleTable.newCommit(commitUser2).withOverwrite(Collections.emptyMap())) {
+ for (int i = 1; i < 10; i++) {
+ write.write(GenericRow.of(i, (long) i));
+ }
+ commit.commit(1, write.prepareCommit(false, 1));
+ }
+
+ if (checkAppend) {
+ commit1.appendCommitCheckConflict(true);
+ assertThatThrownBy(() -> commit1.commit(1,
write1.prepareCommit(false, 1)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("changed from 2 to 1 without
overwrite");
+ } else {
+ // the commit result is error, but here verify that no check if
+ // appendCommitCheckConflict was not set
+ assertThatCode(() -> commit1.commit(1, write1.prepareCommit(false,
1)))
+ .doesNotThrowAnyException();
+ }
+ write1.close();
+ commit1.close();
+ }
+
@Test
public void testStrictModeForCompact() throws Exception {
String path = tempDir.toString();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
index 3223ac50d3..f81ec99866 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -68,8 +68,16 @@ public class PostponeFixedBucketSink extends
FlinkWriteSink<InternalRow> {
@Override
protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory() {
if (overwritePartition == null) {
- // The table has copied bucket option outside, no need to change
anything
- return super.createCommitterFactory();
+ // The table has copied bucket option outside, no need to change.
+ return context ->
+ new StoreCommitter(
+ table,
+ table.newCommit(context.commitUser())
+ .withOverwrite(overwritePartition)
+
.ignoreEmptyCommit(!context.streamingCheckpointEnabled())
+ // Need to check conflict
+ .appendCommitCheckConflict(true),
+ context);
} else {
// When overwriting, the postpone bucket files need to be deleted,
so using a postpone
// bucket table commit here
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 23690b46eb..c4219e4e63 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -410,6 +410,8 @@ case class PaimonSparkWriter(
writeBuilder
.asInstanceOf[BatchWriteBuilderImpl]
.copyWithNewTable(PostponeUtils.tableForCommit(table))
+ // Need to check conflict
+ .appendCommitCheckConflict(true)
} else {
writeBuilder
}