This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 94644ac2871d4e58e82a58d5d306499f574c99b6 Author: Jingsong Lee <[email protected]> AuthorDate: Tue Oct 21 13:08:34 2025 +0800 [flink] Produce real random id in SourceSplitGenerator (#6441) --- .../org/apache/paimon/table/source/DataSplit.java | 16 ++++++++++ .../paimon/flink/source/FileStoreSourceSplit.java | 13 ++++++++ .../source/FileStoreSourceSplitGenerator.java | 37 +++------------------- .../source/FileStoreSourceSplitGeneratorTest.java | 32 +++++++++++-------- 4 files changed, 52 insertions(+), 46 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 079477b001..3a4c112a95 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -328,6 +328,22 @@ public class DataSplit implements Split { rawConvertible); } + @Override + public String toString() { + return "{" + + "snapshotId=" + + snapshotId + + ", partition=hash-" + + partition.hashCode() + + ", bucket=" + + bucket + + ", rawConvertible=" + + rawConvertible + + '}' + + "@" + + Integer.toHexString(hashCode()); + } + private void writeObject(ObjectOutputStream out) throws IOException { serialize(new DataOutputViewStreamWrapper(out)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java index 769154b196..b9ce94c413 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplit.java @@ -79,4 +79,17 @@ public class FileStoreSourceSplit implements SourceSplit { public int hashCode() { return Objects.hash(id, split, recordsToSkip); } + + @Override + public String toString() { + return "{" + + "id='" + + id + + '\'' + + ", split=" + + split + + ", recordsToSkip=" + + recordsToSkip + + '}'; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java index f19ec828e0..2d89bbbe52 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java @@ -18,10 +18,11 @@ package org.apache.paimon.flink.source; -import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableScan; import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -30,11 +31,8 @@ import java.util.stream.Collectors; */ public class FileStoreSourceSplitGenerator { - /** - * The current Id as a mutable string representation. This covers more values than the integer - * value range, so we should never overflow. - */ - private final char[] currentId = "0000000000".toCharArray(); + private final String uuid = UUID.randomUUID().toString(); + private final AtomicInteger idCounter = new AtomicInteger(1); public List<FileStoreSourceSplit> createSplits(TableScan.Plan plan) { return plan.splits().stream() @@ -42,32 +40,7 @@ public class FileStoreSourceSplitGenerator { .collect(Collectors.toList()); } - public List<FileStoreSourceSplit> createSplits(List<Split> splits) { - return splits.stream() - .map(s -> new FileStoreSourceSplit(getNextId(), s)) - .collect(Collectors.toList()); - } - protected final String getNextId() { - // because we just increment numbers, we increment the char representation directly, - // rather than incrementing an integer and converting it to a string representation - // every time again (requires quite some expensive conversion logic). - incrementCharArrayByOne(currentId, currentId.length - 1); - return new String(currentId); - } - - private static void incrementCharArrayByOne(char[] array, int pos) { - if (pos < 0) { - throw new RuntimeException("Produce too many splits."); - } - - char c = array[pos]; - c++; - - if (c > '9') { - c = '0'; - incrementCharArrayByOne(array, pos - 1); - } - array[pos] = c; + return uuid + "-" + idCounter.getAndIncrement(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index 932b7ee951..cf63274169 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -70,23 +70,27 @@ public class FileStoreSourceSplitGeneratorTest { o -> ((DataSplit) ((FileStoreSourceSplit) o).split()).bucket())); // splitId should be the input order! - assertSplit(splits.get(0), "0000000001", 1, 0, Arrays.asList("f0", "f1")); - assertSplit(splits.get(1), "0000000002", 1, 1, Collections.singletonList("f2")); - assertSplit(splits.get(2), "0000000003", 2, 0, Arrays.asList("f3", "f4", "f5")); - assertSplit(splits.get(3), "0000000004", 2, 1, Collections.singletonList("f6")); - assertSplit(splits.get(4), "0000000005", 3, 0, Collections.singletonList("f7")); - assertSplit(splits.get(5), "0000000006", 3, 1, Collections.singletonList("f8")); - assertSplit(splits.get(6), "0000000007", 4, 0, Collections.singletonList("f9")); - assertSplit(splits.get(7), "0000000008", 4, 1, Collections.singletonList("f10")); - assertSplit(splits.get(8), "0000000009", 5, 0, Collections.singletonList("f11")); - assertSplit(splits.get(9), "0000000010", 5, 1, Collections.singletonList("f12")); - assertSplit(splits.get(10), "0000000011", 6, 0, Collections.singletonList("f13")); - assertSplit(splits.get(11), "0000000012", 6, 1, Collections.singletonList("f14")); + assertSplit(splits.get(0), "-1", 1, 0, Arrays.asList("f0", "f1")); + assertSplit(splits.get(1), "-2", 1, 1, Collections.singletonList("f2")); + assertSplit(splits.get(2), "-3", 2, 0, Arrays.asList("f3", "f4", "f5")); + assertSplit(splits.get(3), "-4", 2, 1, Collections.singletonList("f6")); + assertSplit(splits.get(4), "-5", 3, 0, Collections.singletonList("f7")); + assertSplit(splits.get(5), "-6", 3, 1, Collections.singletonList("f8")); + assertSplit(splits.get(6), "-7", 4, 0, Collections.singletonList("f9")); + assertSplit(splits.get(7), "-8", 4, 1, Collections.singletonList("f10")); + assertSplit(splits.get(8), "-9", 5, 0, Collections.singletonList("f11")); + assertSplit(splits.get(9), "-10", 5, 1, Collections.singletonList("f12")); + assertSplit(splits.get(10), "-11", 6, 0, Collections.singletonList("f13")); + assertSplit(splits.get(11), "-12", 6, 1, Collections.singletonList("f14")); } private void assertSplit( - FileStoreSourceSplit split, String splitId, int part, int bucket, List<String> files) { - assertThat(split.splitId()).isEqualTo(splitId); + FileStoreSourceSplit split, + String splitIdSuffix, + int part, + int bucket, + List<String> files) { + assertThat(split.splitId()).endsWith(splitIdSuffix); assertThat(((DataSplit) split.split()).partition().getInt(0)).isEqualTo(part); assertThat(((DataSplit) split.split()).bucket()).isEqualTo(bucket); assertThat(
