This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e37843124 [core] Introduce 'commit.discard-duplicate-files' to make
append safe (#6464)
9e37843124 is described below
commit 9e37843124b20889bfea757b795fc673fc57f65d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 28 10:51:19 2025 +0800
[core] Introduce 'commit.discard-duplicate-files' to make append safe
(#6464)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++
.../java/org/apache/paimon/AbstractFileStore.java | 1 +
.../paimon/operation/FileStoreCommitImpl.java | 36 +++++++++-
.../paimon/table/AppendOnlySimpleTableTest.java | 80 +++++++++++++++++++++-
5 files changed, 131 insertions(+), 2 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d058669f65..1889c9461c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,6 +188,12 @@ under the License.
<td>String</td>
<td>A list of commit callback classes to be called after a
successful commit. Class names are connected with comma (example:
com.test.CallbackA,com.sample.CallbackB).</td>
</tr>
+ <tr>
+ <td><h5>commit.discard-duplicate-files</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether discard duplicate files in commit.</td>
+ </tr>
<tr>
<td><h5>commit.force-compact</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d64f2e40e9..26af349006 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2020,6 +2020,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Write blob field using blob descriptor rather
than blob bytes.");
+ public static final ConfigOption<Boolean> COMMIT_DISCARD_DUPLICATE_FILES =
+ key("commit.discard-duplicate-files")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether discard duplicate files in
commit.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -2919,6 +2925,10 @@ public class CoreOptions implements Serializable {
return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
}
+ public boolean commitDiscardDuplicateFiles() {
+ return options.get(COMMIT_DISCARD_DUPLICATE_FILES);
+ }
+
private Map<String, String> callbacks(
ConfigOption<String> callbacks, ConfigOption<String>
callbackParam) {
Map<String, String> result = new HashMap<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2e209fba45..e02098c0dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -309,6 +309,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null),
options.rowTrackingEnabled(),
+ options.commitDiscardDuplicateFiles(),
conflictDetection);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 365317a07d..a078bd90ac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -149,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
+ private final boolean discardDuplicateFiles;
private final ConflictDetection conflictDetection;
private boolean ignoreEmptyCommit;
@@ -185,6 +186,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
boolean rowTrackingEnabled,
+ boolean discardDuplicateFiles,
ConflictDetection conflictDetection) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
@@ -228,6 +230,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
+ this.discardDuplicateFiles = discardDuplicateFiles;
this.conflictDetection = conflictDetection;
}
@@ -326,6 +329,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
conflictCheck = mustConflictCheck();
}
+ boolean discardDuplicate = discardDuplicateFiles && commitKind
== CommitKind.APPEND;
+ if (discardDuplicate) {
+ checkAppendFiles = true;
+ }
+
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact
changes,
// so we need to contain all changes
@@ -336,6 +344,20 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
appendTableFiles,
compactTableFiles,
appendIndexFiles)));
+ if (discardDuplicate) {
+ Set<FileEntry.Identifier> baseIdentifiers =
+ baseEntries.stream()
+ .map(FileEntry::identifier)
+ .collect(Collectors.toSet());
+ appendTableFiles =
+ appendTableFiles.stream()
+ .filter(
+ entry ->
+
!baseIdentifiers.contains(
+
entry.identifier()))
+ .collect(Collectors.toList());
+ appendSimpleEntries =
SimpleFileEntry.from(appendTableFiles);
+ }
conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseEntries,
@@ -1003,7 +1025,9 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
- if (latestSnapshot != null &&
conflictCheck.shouldCheck(latestSnapshot.id())) {
+ boolean discardDuplicate = discardDuplicateFiles && commitKind ==
CommitKind.APPEND;
+ if (latestSnapshot != null
+ && (discardDuplicate ||
conflictCheck.shouldCheck(latestSnapshot.id()))) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions =
@@ -1021,6 +1045,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
baseDataFiles =
readAllEntriesFromChangedPartitions(latestSnapshot,
changedPartitions);
}
+ if (discardDuplicate) {
+ Set<FileEntry.Identifier> baseIdentifiers =
+ baseDataFiles.stream()
+ .map(FileEntry::identifier)
+ .collect(Collectors.toSet());
+ deltaFiles =
+ deltaFiles.stream()
+ .filter(entry ->
!baseIdentifiers.contains(entry.identifier()))
+ .collect(Collectors.toList());
+ }
conflictDetection.checkNoConflictsOrFail(
latestSnapshot,
baseDataFiles,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index cf246b13b5..2ea21b2085 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -50,7 +50,9 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -85,6 +87,10 @@ import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -104,12 +110,84 @@ import static
org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link AppendOnlyFileStoreTable}. */
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
@Test
- public void testMultipleWriters() throws Exception {
+ public void testDiscardDuplicateFiles() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options ->
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ List<CommitMessage> commitMessages;
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ write.write(rowData(1, 10, 100L));
+ commitMessages = write.prepareCommit();
+ }
+ Runnable doCommit =
+ () -> {
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(commitMessages);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ doCommit.run();
+ doCommit.run();
+ List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+
assertThat(splits.get(0).convertToRawFiles()).map(List::size).get().isEqualTo(1);
+ }
+
+ @Test
+ public void testDiscardDuplicateFilesMultiThread() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options ->
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ List<List<CommitMessage>> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ write.write(rowData(1, 10, 100L));
+ messages.add(write.prepareCommit());
+ }
+ }
+ Runnable doCommit =
+ () -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0; i < 10; i++) {
+ try (BatchTableCommit commit =
writeBuilder.newCommit()) {
+
commit.commit(messages.get(rnd.nextInt(messages.size())));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ Runnable asserter =
+ () -> {
+ List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertTrue(splits.get(0).convertToRawFiles().get().size()
<= 10);
+ };
+
+ // test multiple threads
+ ExecutorService pool = Executors.newCachedThreadPool();
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ futures.add(pool.submit(doCommit));
+ }
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ asserter.run();
+ }
+
+ @Test
+ public void testDynamicBucketNoSelector() throws Exception {
assertThat(
createFileStoreTable(options -> options.set("bucket",
"-1"))
.newBatchWriteBuilder()