This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e37843124 [core] Introduce 'commit.discard-duplicate-files' to make 
append safe (#6464)
9e37843124 is described below

commit 9e37843124b20889bfea757b795fc673fc57f65d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 28 10:51:19 2025 +0800

    [core] Introduce 'commit.discard-duplicate-files' to make append safe 
(#6464)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++
 .../java/org/apache/paimon/AbstractFileStore.java  |  1 +
 .../paimon/operation/FileStoreCommitImpl.java      | 36 +++++++++-
 .../paimon/table/AppendOnlySimpleTableTest.java    | 80 +++++++++++++++++++++-
 5 files changed, 131 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d058669f65..1889c9461c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -188,6 +188,12 @@ under the License.
             <td>String</td>
             <td>A list of commit callback classes to be called after a 
successful commit. Class names are connected with comma (example: 
com.test.CallbackA,com.sample.CallbackB).</td>
         </tr>
+        <tr>
+            <td><h5>commit.discard-duplicate-files</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether discard duplicate files in commit.</td>
+        </tr>
         <tr>
             <td><h5>commit.force-compact</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index d64f2e40e9..26af349006 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2020,6 +2020,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Write blob field using blob descriptor rather 
than blob bytes.");
 
+    public static final ConfigOption<Boolean> COMMIT_DISCARD_DUPLICATE_FILES =
+            key("commit.discard-duplicate-files")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether discard duplicate files in 
commit.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -2919,6 +2925,10 @@ public class CoreOptions implements Serializable {
         return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
     }
 
+    public boolean commitDiscardDuplicateFiles() {
+        return options.get(COMMIT_DISCARD_DUPLICATE_FILES);
+    }
+
     private Map<String, String> callbacks(
             ConfigOption<String> callbacks, ConfigOption<String> 
callbackParam) {
         Map<String, String> result = new HashMap<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2e209fba45..e02098c0dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -309,6 +309,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitMaxRetryWait(),
                 options.commitStrictModeLastSafeSnapshot().orElse(null),
                 options.rowTrackingEnabled(),
+                options.commitDiscardDuplicateFiles(),
                 conflictDetection);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 365317a07d..a078bd90ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -149,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
     private final boolean rowTrackingEnabled;
+    private final boolean discardDuplicateFiles;
     private final ConflictDetection conflictDetection;
 
     private boolean ignoreEmptyCommit;
@@ -185,6 +186,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             long commitMaxRetryWait,
             @Nullable Long strictModeLastSafeSnapshot,
             boolean rowTrackingEnabled,
+            boolean discardDuplicateFiles,
             ConflictDetection conflictDetection) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
@@ -228,6 +230,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
         this.rowTrackingEnabled = rowTrackingEnabled;
+        this.discardDuplicateFiles = discardDuplicateFiles;
         this.conflictDetection = conflictDetection;
     }
 
@@ -326,6 +329,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     conflictCheck = mustConflictCheck();
                 }
 
+                boolean discardDuplicate = discardDuplicateFiles && commitKind 
== CommitKind.APPEND;
+                if (discardDuplicate) {
+                    checkAppendFiles = true;
+                }
+
                 if (latestSnapshot != null && checkAppendFiles) {
                     // it is possible that some partitions only have compact 
changes,
                     // so we need to contain all changes
@@ -336,6 +344,20 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                             appendTableFiles,
                                             compactTableFiles,
                                             appendIndexFiles)));
+                    if (discardDuplicate) {
+                        Set<FileEntry.Identifier> baseIdentifiers =
+                                baseEntries.stream()
+                                        .map(FileEntry::identifier)
+                                        .collect(Collectors.toSet());
+                        appendTableFiles =
+                                appendTableFiles.stream()
+                                        .filter(
+                                                entry ->
+                                                        
!baseIdentifiers.contains(
+                                                                
entry.identifier()))
+                                        .collect(Collectors.toList());
+                        appendSimpleEntries = 
SimpleFileEntry.from(appendTableFiles);
+                    }
                     conflictDetection.checkNoConflictsOrFail(
                             latestSnapshot,
                             baseEntries,
@@ -1003,7 +1025,9 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
-        if (latestSnapshot != null && 
conflictCheck.shouldCheck(latestSnapshot.id())) {
+        boolean discardDuplicate = discardDuplicateFiles && commitKind == 
CommitKind.APPEND;
+        if (latestSnapshot != null
+                && (discardDuplicate || 
conflictCheck.shouldCheck(latestSnapshot.id()))) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
             List<BinaryRow> changedPartitions =
@@ -1021,6 +1045,16 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 baseDataFiles =
                         readAllEntriesFromChangedPartitions(latestSnapshot, 
changedPartitions);
             }
+            if (discardDuplicate) {
+                Set<FileEntry.Identifier> baseIdentifiers =
+                        baseDataFiles.stream()
+                                .map(FileEntry::identifier)
+                                .collect(Collectors.toSet());
+                deltaFiles =
+                        deltaFiles.stream()
+                                .filter(entry -> 
!baseIdentifiers.contains(entry.identifier()))
+                                .collect(Collectors.toList());
+            }
             conflictDetection.checkNoConflictsOrFail(
                     latestSnapshot,
                     baseDataFiles,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index cf246b13b5..2ea21b2085 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -50,7 +50,9 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
@@ -85,6 +87,10 @@ import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -104,12 +110,84 @@ import static 
org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
 import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
 public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
 
     @Test
-    public void testMultipleWriters() throws Exception {
+    public void testDiscardDuplicateFiles() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> 
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        List<CommitMessage> commitMessages;
+        try (BatchTableWrite write = writeBuilder.newWrite()) {
+            write.write(rowData(1, 10, 100L));
+            commitMessages = write.prepareCommit();
+        }
+        Runnable doCommit =
+                () -> {
+                    try (BatchTableCommit commit = writeBuilder.newCommit()) {
+                        commit.commit(commitMessages);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+
+        doCommit.run();
+        doCommit.run();
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        
assertThat(splits.get(0).convertToRawFiles()).map(List::size).get().isEqualTo(1);
+    }
+
+    @Test
+    public void testDiscardDuplicateFilesMultiThread() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> 
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        List<List<CommitMessage>> messages = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            try (BatchTableWrite write = writeBuilder.newWrite()) {
+                write.write(rowData(1, 10, 100L));
+                messages.add(write.prepareCommit());
+            }
+        }
+        Runnable doCommit =
+                () -> {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                    for (int i = 0; i < 10; i++) {
+                        try (BatchTableCommit commit = 
writeBuilder.newCommit()) {
+                            
commit.commit(messages.get(rnd.nextInt(messages.size())));
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                };
+
+        Runnable asserter =
+                () -> {
+                    List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+                    assertThat(splits.size()).isEqualTo(1);
+                    assertTrue(splits.get(0).convertToRawFiles().get().size() 
<= 10);
+                };
+
+        // test multiple threads
+        ExecutorService pool = Executors.newCachedThreadPool();
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            futures.add(pool.submit(doCommit));
+        }
+        for (Future<?> future : futures) {
+            future.get();
+        }
+        asserter.run();
+    }
+
+    @Test
+    public void testDynamicBucketNoSelector() throws Exception {
         assertThat(
                         createFileStoreTable(options -> options.set("bucket", 
"-1"))
                                 .newBatchWriteBuilder()

Reply via email to