This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d56d2d235 [core] Fix flaky duplicate file discard test (#8106)
1d56d2d235 is described below
commit 1d56d2d235b2f63882daf5b6c8c22a29c9ee40a0
Author: QuakeWang <[email protected]>
AuthorDate: Thu Jun 4 13:51:04 2026 +0800
[core] Fix flaky duplicate file discard test (#8106)
---
.../paimon/table/AppendOnlySimpleTableTest.java | 56 +++++++++++++---------
1 file changed, 34 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 4981d95453..fca17b5b98 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -86,6 +86,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -102,7 +103,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -322,6 +322,8 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
createFileStoreTable(
options -> {
options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true);
+ options.set(CoreOptions.COMMIT_MAX_RETRIES, 50);
+ options.set(CoreOptions.COMMIT_MAX_RETRY_WAIT,
Duration.ofMillis(100));
// Keep all snapshots so concurrent expiry does
not race readers.
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN,
1000);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX,
1000);
@@ -334,33 +336,43 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
messages.add(write.prepareCommit());
}
}
- Runnable doCommit =
- () -> {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
- for (int i = 0; i < 10; i++) {
- try (BatchTableCommit commit =
writeBuilder.newCommit()) {
-
commit.commit(messages.get(rnd.nextInt(messages.size())));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
-
+ int commitThreadNum = 10;
+ int commitsPerThread = 10;
Runnable asserter =
() -> {
List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
assertThat(splits.size()).isEqualTo(1);
- assertTrue(splits.get(0).convertToRawFiles().get().size()
<= 10);
+ assertThat(splits.get(0).convertToRawFiles().get().size())
+ .isLessThanOrEqualTo(messages.size());
};
- // test multiple threads
- ExecutorService pool = Executors.newCachedThreadPool();
- List<Future<?>> futures = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- futures.add(pool.submit(doCommit));
- }
- for (Future<?> future : futures) {
- future.get();
+ ExecutorService pool = Executors.newFixedThreadPool(commitThreadNum);
+ try {
+ List<Future<?>> futures = new ArrayList<>();
+ for (int thread = 0; thread < commitThreadNum; thread++) {
+ int threadId = thread;
+ futures.add(
+ pool.submit(
+ () -> {
+ for (int round = 0; round <
commitsPerThread; round++) {
+ int messageIndex = (threadId + round)
% messages.size();
+ try (BatchTableCommit commit =
writeBuilder.newCommit()) {
+
commit.commit(messages.get(messageIndex));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to commit
message %s in thread %s round %s.",
+ messageIndex,
threadId, round),
+ e);
+ }
+ }
+ }));
+ }
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ } finally {
+ pool.shutdownNow();
}
asserter.run();
}