This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 18dd7fe120 [Flink] Fix PostponeFixedBucketChannelComputer routing all 
records to same channel (#7737)
18dd7fe120 is described below

commit 18dd7fe12043e84683218cd8db5f0c06926f919e
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 8 10:29:22 2026 +0800

    [Flink] Fix PostponeFixedBucketChannelComputer routing all records to same 
channel (#7737)
    
    Fix a bug in `PostponeFixedBucketChannelComputer` where all records in
    the same partition are routed to the same downstream channel in batch
    mode, causing only one subtask to actually process data.
---
 .../table/sink/FixedBucketRowKeyExtractor.java     |   7 +-
 .../sink/PostponeFixedBucketChannelComputer.java   |  12 +-
 .../PostponeFixedBucketChannelComputerTest.java    | 169 +++++++++++++++++++++
 3 files changed, 181 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
index b0cfc2d205..146a45b437 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
@@ -69,10 +69,13 @@ public class FixedBucketRowKeyExtractor extends 
RowKeyExtractor {
 
     @Override
     public int bucket() {
-        BinaryRow bucketKey = bucketKey();
         if (reuseBucket == null) {
-            reuseBucket = bucketFunction.bucket(bucketKey, numBuckets);
+            reuseBucket = bucket(numBuckets);
         }
         return reuseBucket;
     }
+
+    public int bucket(int numBuckets) {
+        return bucketFunction.bucket(bucketKey(), numBuckets);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
index 5fd55db13e..e1bf08d843 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 
 import java.util.Map;
 
@@ -38,7 +38,7 @@ public class PostponeFixedBucketChannelComputer implements 
ChannelComputer<Inter
     private final Map<BinaryRow, Integer> knownNumBuckets;
 
     private transient int numChannels;
-    private transient RowPartitionKeyExtractor partitionKeyExtractor;
+    private transient FixedBucketRowKeyExtractor keyExtractor;
 
     public PostponeFixedBucketChannelComputer(
             TableSchema schema, Map<BinaryRow, Integer> knownNumBuckets) {
@@ -49,13 +49,15 @@ public class PostponeFixedBucketChannelComputer implements 
ChannelComputer<Inter
     @Override
     public void setup(int numChannels) {
         this.numChannels = numChannels;
-        this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+        this.keyExtractor = new FixedBucketRowKeyExtractor(schema);
     }
 
     @Override
     public int channel(InternalRow record) {
-        BinaryRow partition = partitionKeyExtractor.partition(record);
-        int bucket = knownNumBuckets.computeIfAbsent(partition, p -> 
numChannels);
+        keyExtractor.setRecord(record);
+        BinaryRow partition = keyExtractor.partition();
+        int numBuckets = knownNumBuckets.computeIfAbsent(partition, p -> 
numChannels);
+        int bucket = keyExtractor.bucket(numBuckets);
         return ChannelComputer.select(partition, bucket, numChannels);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
new file mode 100644
index 0000000000..991ab5d2dd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostponeFixedBucketChannelComputer}. */
+public class PostponeFixedBucketChannelComputerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testRecordsDistributedAcrossChannels() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"pt", "k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "k"),
+                                new HashMap<String, String>() {
+                                    {
+                                        put("bucket", "-1");
+                                        put("postpone.bucket-mode", "true");
+                                    }
+                                },
+                                ""));
+
+        int numChannels = 8;
+        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+        PostponeFixedBucketChannelComputer computer =
+                new PostponeFixedBucketChannelComputer(schema, 
knownNumBuckets);
+        computer.setup(numChannels);
+
+        Set<Integer> channels = new HashSet<>();
+        for (long i = 0; i < 100; i++) {
+            InternalRow row = GenericRow.of(1, i, (double) i);
+            int channel = computer.channel(row);
+            
assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels);
+            channels.add(channel);
+        }
+
+        // With 100 distinct keys and 8 channels, we should hit more than 1 
channel
+        assertThat(channels.size()).isGreaterThan(1);
+    }
+
+    @Test
+    public void testNoPartitionDistribution() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                new HashMap<String, String>() {
+                                    {
+                                        put("bucket", "-1");
+                                        put("postpone.bucket-mode", "true");
+                                    }
+                                },
+                                ""));
+
+        int numChannels = 8;
+        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+        PostponeFixedBucketChannelComputer computer =
+                new PostponeFixedBucketChannelComputer(schema, 
knownNumBuckets);
+        computer.setup(numChannels);
+
+        Set<Integer> channels = new HashSet<>();
+        for (long i = 0; i < 100; i++) {
+            InternalRow row = GenericRow.of(i, (double) i);
+            int channel = computer.channel(row);
+            
assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels);
+            channels.add(channel);
+        }
+
+        // Without the fix, all records would go to the same channel
+        // With the fix, 100 distinct keys across 8 channels should use 
multiple channels
+        assertThat(channels.size()).isGreaterThan(1);
+    }
+
+    @Test
+    public void testSameKeyGoesToSameChannel() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.DOUBLE()},
+                        new String[] {"k", "v"});
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        TableSchema schema =
+                schemaManager.createTable(
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                new HashMap<String, String>() {
+                                    {
+                                        put("bucket", "-1");
+                                        put("postpone.bucket-mode", "true");
+                                    }
+                                },
+                                ""));
+
+        int numChannels = 8;
+        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+        PostponeFixedBucketChannelComputer computer =
+                new PostponeFixedBucketChannelComputer(schema, 
knownNumBuckets);
+        computer.setup(numChannels);
+
+        // Same key should always route to the same channel
+        for (long key = 0; key < 50; key++) {
+            InternalRow row1 = GenericRow.of(key, 1.0);
+            InternalRow row2 = GenericRow.of(key, 2.0);
+            
assertThat(computer.channel(row1)).isEqualTo(computer.channel(row2));
+        }
+    }
+}

Reply via email to