This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new af6ac7bb7a [flink] Fix postpone bucket without changelog producer 
cannot read all records with multiple streaming readers (#6142)
af6ac7bb7a is described below

commit af6ac7bb7ab8c75383c25d196f7e27599e63fcf4
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Mon Aug 25 20:19:01 2025 +0800

    [flink] Fix postpone bucket without changelog producer cannot read all 
records with multiple streaming readers (#6142)
---
 .../postpone/PostponeBucketFileStoreWrite.java     | 13 ++++++++++++
 .../postpone/PostponeBucketCompactSplitSource.java | 20 +++++-------------
 .../source/ContinuousFileSplitEnumerator.java      | 19 ++++++++++++++---
 .../paimon/flink/PostponeBucketTableITCase.java    | 24 ++++++++++++++++------
 4 files changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index ac9a851cb2..4978ec6b3d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -216,4 +216,17 @@ public class PostponeBucketFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue>
     protected Function<WriterContainer<KeyValue>, Boolean> 
createWriterCleanChecker() {
         return createNoConflictAwareWriterCleanChecker();
     }
+
+    public static int getWriteId(String fileName) {
+        try {
+            String[] parts = fileName.split("-s-");
+            return Integer.parseInt(parts[1].substring(0, 
parts[1].indexOf('-')));
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Data file name "
+                            + fileName
+                            + " does not match the pattern. This is 
unexpected.",
+                    e);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index ab53f8f824..3e0cd7cf23 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -28,13 +28,13 @@ import org.apache.paimon.flink.source.SimpleSourceSplit;
 import org.apache.paimon.flink.source.operator.ReadOperator;
 import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -55,8 +55,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 /**
  * Source for compacting postpone bucket tables. This source scans all files 
from {@code bucket =
@@ -163,27 +161,19 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
     private static class SplitChannelComputer implements 
ChannelComputer<Split> {
 
         private transient int numChannels;
-        private transient Pattern pattern;
 
         @Override
         public void setup(int numChannels) {
             this.numChannels = numChannels;
-            // see PostponeBucketTableWriteOperator
-            this.pattern = Pattern.compile("-s-(\\d+?)-");
         }
 
         @Override
         public int channel(Split record) {
             DataSplit dataSplit = (DataSplit) record;
-            String fileName = dataSplit.dataFiles().get(0).fileName();
-
-            Matcher matcher = pattern.matcher(fileName);
-            Preconditions.checkState(
-                    matcher.find(),
-                    "Data file name %s does not match the pattern. This is 
unexpected.",
-                    fileName);
-            return ChannelComputer.select(
-                    dataSplit.partition(), Integer.parseInt(matcher.group(1)), 
numChannels);
+            int bucketId =
+                    
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+                            % numChannels;
+            return ChannelComputer.select(dataSplit.partition(), bucketId, 
numChannels);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 46ca1a5c92..668bd7d847 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -22,6 +22,8 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
 import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
 import org.apache.paimon.flink.source.assigners.SplitAssigner;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -304,11 +306,22 @@ public class ContinuousFileSplitEnumerator
 
     protected int assignSuggestedTask(FileStoreSourceSplit split) {
         DataSplit dataSplit = ((DataSplit) split.split());
+        int parallelism = context.currentParallelism();
+
+        int bucketId;
+        if (dataSplit.bucket() == BucketMode.POSTPONE_BUCKET) {
+            bucketId =
+                    
PostponeBucketFileStoreWrite.getWriteId(dataSplit.dataFiles().get(0).fileName())
+                            % parallelism;
+        } else {
+            bucketId = dataSplit.bucket();
+        }
+
         if (shuffleBucketWithPartition) {
-            return ChannelComputer.select(
-                    dataSplit.partition(), dataSplit.bucket(), 
context.currentParallelism());
+            return ChannelComputer.select(dataSplit.partition(), bucketId, 
parallelism);
+        } else {
+            return ChannelComputer.select(bucketId, parallelism);
         }
-        return ChannelComputer.select(dataSplit.bucket(), 
context.currentParallelism());
     }
 
     protected SplitAssigner createSplitAssigner(boolean unordered) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index 4a449e240a..be19cdff42 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -714,7 +714,7 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         TableEnvironment sEnv =
                 tableEnvironmentBuilder()
                         .streamingMode()
-                        .parallelism(1)
+                        .parallelism(2)
                         .checkpointIntervalMs(500)
                         .build();
         String createCatalog =
@@ -750,15 +750,27 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
         bEnv.executeSql("INSERT INTO T VALUES (1, 101), (3, 31)").await();
         bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
 
+        TableResult streamingSelect = sEnv.executeSql("SELECT * FROM T");
+        Thread.sleep(1000);
+
+        bEnv.executeSql("INSERT INTO T VALUES (1, 102), (4, 42)").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 103), (5, 53)").await();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
         assertThat(collect(bEnv.executeSql("SELECT * FROM T")))
-                .containsExactlyInAnyOrder("+I[1, 101]", "+I[2, 20]", "+I[3, 
31]");
-        TableResult streamingSelect =
-                sEnv.executeSql("SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id' = '1') */");
+                .containsExactlyInAnyOrder(
+                        "+I[1, 103]", "+I[2, 20]", "+I[3, 31]", "+I[4, 42]", 
"+I[5, 53]");
         JobClient client = streamingSelect.getJobClient().get();
         CloseableIterator<Row> it = streamingSelect.collect();
-        assertThat(collect(client, it, 5))
+        assertThat(collect(client, it, 7))
                 .containsExactlyInAnyOrder(
-                        "+I[1, 10]", "+I[2, 20]", "+I[1, 100]", "+I[1, 101]", 
"+I[3, 31]");
+                        "+I[1, 101]",
+                        "+I[2, 20]",
+                        "+I[3, 31]",
+                        "+I[1, 102]",
+                        "+I[4, 42]",
+                        "+I[1, 103]",
+                        "+I[5, 53]");
     }
 
     private List<String> collect(TableResult result) throws Exception {

Reply via email to