This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d56d2d235 [core] Fix flaky duplicate file discard test (#8106)
1d56d2d235 is described below

commit 1d56d2d235b2f63882daf5b6c8c22a29c9ee40a0
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 13:51:04 2026 +0800

    [core] Fix flaky duplicate file discard test (#8106)
---
 .../paimon/table/AppendOnlySimpleTableTest.java    | 56 +++++++++++++---------
 1 file changed, 34 insertions(+), 22 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 4981d95453..fca17b5b98 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -86,6 +86,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -102,7 +103,6 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -322,6 +322,8 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                 createFileStoreTable(
                         options -> {
                             
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true);
+                            options.set(CoreOptions.COMMIT_MAX_RETRIES, 50);
+                            options.set(CoreOptions.COMMIT_MAX_RETRY_WAIT, 
Duration.ofMillis(100));
                             // Keep all snapshots so concurrent expiry does 
not race readers.
                             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 
1000);
                             options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 
1000);
@@ -334,33 +336,43 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                 messages.add(write.prepareCommit());
             }
         }
-        Runnable doCommit =
-                () -> {
-                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
-                    for (int i = 0; i < 10; i++) {
-                        try (BatchTableCommit commit = 
writeBuilder.newCommit()) {
-                            
commit.commit(messages.get(rnd.nextInt(messages.size())));
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-                };
-
+        int commitThreadNum = 10;
+        int commitsPerThread = 10;
         Runnable asserter =
                 () -> {
                     List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
                     assertThat(splits.size()).isEqualTo(1);
-                    assertTrue(splits.get(0).convertToRawFiles().get().size() 
<= 10);
+                    assertThat(splits.get(0).convertToRawFiles().get().size())
+                            .isLessThanOrEqualTo(messages.size());
                 };
 
-        // test multiple threads
-        ExecutorService pool = Executors.newCachedThreadPool();
-        List<Future<?>> futures = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            futures.add(pool.submit(doCommit));
-        }
-        for (Future<?> future : futures) {
-            future.get();
+        ExecutorService pool = Executors.newFixedThreadPool(commitThreadNum);
+        try {
+            List<Future<?>> futures = new ArrayList<>();
+            for (int thread = 0; thread < commitThreadNum; thread++) {
+                int threadId = thread;
+                futures.add(
+                        pool.submit(
+                                () -> {
+                                    for (int round = 0; round < 
commitsPerThread; round++) {
+                                        int messageIndex = (threadId + round) 
% messages.size();
+                                        try (BatchTableCommit commit = 
writeBuilder.newCommit()) {
+                                            
commit.commit(messages.get(messageIndex));
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(
+                                                    String.format(
+                                                            "Failed to commit 
message %s in thread %s round %s.",
+                                                            messageIndex, 
threadId, round),
+                                                    e);
+                                        }
+                                    }
+                                }));
+            }
+            for (Future<?> future : futures) {
+                future.get();
+            }
+        } finally {
+            pool.shutdownNow();
         }
         asserter.run();
     }

Reply via email to