This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new af6ac7bb7a [flink] Fix postpone bucket without changelog producer cannot read all records with multiple streaming readers (#6142) af6ac7bb7a is described below commit af6ac7bb7ab8c75383c25d196f7e27599e63fcf4 Author: tsreaper <tsreape...@gmail.com> AuthorDate: Mon Aug 25 20:19:01 2025 +0800 [flink] Fix postpone bucket without changelog producer cannot read all records with multiple streaming readers (#6142) --- .../postpone/PostponeBucketFileStoreWrite.java | 13 ++++++++++++ .../postpone/PostponeBucketCompactSplitSource.java | 20 +++++------------- .../source/ContinuousFileSplitEnumerator.java | 19 ++++++++++++++--- .../paimon/flink/PostponeBucketTableITCase.java | 24 ++++++++++++++++------ 4 files changed, 52 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java index ac9a851cb2..4978ec6b3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java @@ -216,4 +216,17 @@ public class PostponeBucketFileStoreWrite extends MemoryFileStoreWrite<KeyValue> protected Function<WriterContainer<KeyValue>, Boolean> createWriterCleanChecker() { return createNoConflictAwareWriterCleanChecker(); } + + public static int getWriteId(String fileName) { + try { + String[] parts = fileName.split("-s-"); + return Integer.parseInt(parts[1].substring(0, parts[1].indexOf('-'))); + } catch (Exception e) { + throw new RuntimeException( + "Data file name " + + fileName + + " does not match the pattern. This is unexpected.", + e); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java index ab53f8f824..3e0cd7cf23 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java @@ -28,13 +28,13 @@ import org.apache.paimon.flink.source.SimpleSourceSplit; import org.apache.paimon.flink.source.operator.ReadOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Boundedness; @@ -55,8 +55,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Source for compacting postpone bucket tables. This source scans all files from {@code bucket = @@ -163,27 +161,19 @@ public class PostponeBucketCompactSplitSource extends AbstractNonCoordinatedSour private static class SplitChannelComputer implements ChannelComputer<Split> { private transient int numChannels; - private transient Pattern pattern; @Override public void setup(int numChannels) { this.numChannels = numChannels; - // see PostponeBucketTableWriteOperator - this.pattern = Pattern.compile("-s-(\\d+?)-"); } @Override public int channel(Split record) { DataSplit dataSplit = (DataSplit) record; - String fileName = dataSplit.dataFiles().get(0).fileName(); - - Matcher matcher = pattern.matcher(fileName); - Preconditions.checkState( - matcher.find(), - "Data file name %s does not match the pattern. This is unexpected.", - fileName); - return ChannelComputer.select( - dataSplit.partition(), Integer.parseInt(matcher.group(1)), numChannels); + int bucketId = + PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName()) + % numChannels; + return ChannelComputer.select(dataSplit.partition(), bucketId, numChannels); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 46ca1a5c92..668bd7d847 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner; import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; +import org.apache.paimon.postpone.PostponeBucketFileStoreWrite; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; @@ -304,11 +306,22 @@ public class ContinuousFileSplitEnumerator protected int assignSuggestedTask(FileStoreSourceSplit split) { DataSplit dataSplit = ((DataSplit) split.split()); + int parallelism = context.currentParallelism(); + + int bucketId; + if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) { + bucketId = + PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName()) + % parallelism; + } else { + bucketId = dataSplit.bucket(); + } + if (shuffleBucketWithPartition) { - return ChannelComputer.select( - dataSplit.partition(), dataSplit.bucket(), context.currentParallelism()); + return ChannelComputer.select(dataSplit.partition(), bucketId, parallelism); + } else { + return ChannelComputer.select(bucketId, parallelism); } - return ChannelComputer.select(dataSplit.bucket(), context.currentParallelism()); } protected SplitAssigner createSplitAssigner(boolean unordered) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java index 4a449e240a..be19cdff42 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java @@ -714,7 +714,7 @@ public class PostponeBucketTableITCase extends AbstractTestBase { TableEnvironment sEnv = tableEnvironmentBuilder() .streamingMode() - .parallelism(1) + .parallelism(2) .checkpointIntervalMs(500) .build(); String createCatalog = @@ -750,15 +750,27 @@ public class PostponeBucketTableITCase extends AbstractTestBase { bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await(); bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + TableResult streamingSelect = sEnv.executeSql("SELECT * FROM T"); + Thread.sleep(1000); + + bEnv.executeSql("INSERT INTO T VALUES (1, 102), (4, 42)").await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 103), (5, 53)").await(); + bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await(); + assertThat(collect(bEnv.executeSql("SELECT * FROM T"))) - .containsExactlyInAnyOrder("+I[1, 101]", "+I[2, 20]", "+I[3, 31]"); - TableResult streamingSelect = - sEnv.executeSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id' = '1') */"); + .containsExactlyInAnyOrder( + "+I[1, 103]", "+I[2, 20]", "+I[3, 31]", "+I[4, 42]", "+I[5, 53]"); JobClient client = streamingSelect.getJobClient().get(); CloseableIterator<Row> it = streamingSelect.collect(); - assertThat(collect(client, it, 5)) + assertThat(collect(client, it, 7)) .containsExactlyInAnyOrder( - "+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]", "+I[3, 31]"); + "+I[1, 101]", + "+I[2, 20]", + "+I[3, 31]", + "+I[1, 102]", + "+I[4, 42]", + "+I[1, 103]", + "+I[5, 53]"); } private List<String> collect(TableResult result) throws Exception {