This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 18dd7fe120 [Flink] Fix PostponeFixedBucketChannelComputer routing all
records to same channel (#7737)
18dd7fe120 is described below
commit 18dd7fe12043e84683218cd8db5f0c06926f919e
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 8 10:29:22 2026 +0800
[Flink] Fix PostponeFixedBucketChannelComputer routing all records to same
channel (#7737)
Fix a bug in `PostponeFixedBucketChannelComputer` where all records in
the same partition are routed to the same downstream channel in batch
mode, causing only one subtask to actually process data.
---
.../table/sink/FixedBucketRowKeyExtractor.java | 7 +-
.../sink/PostponeFixedBucketChannelComputer.java | 12 +-
.../PostponeFixedBucketChannelComputerTest.java | 169 +++++++++++++++++++++
3 files changed, 181 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
index b0cfc2d205..146a45b437 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FixedBucketRowKeyExtractor.java
@@ -69,10 +69,13 @@ public class FixedBucketRowKeyExtractor extends
RowKeyExtractor {
@Override
public int bucket() {
- BinaryRow bucketKey = bucketKey();
if (reuseBucket == null) {
- reuseBucket = bucketFunction.bucket(bucketKey, numBuckets);
+ reuseBucket = bucket(numBuckets);
}
return reuseBucket;
}
+
+ public int bucket(int numBuckets) {
+ return bucketFunction.bucket(bucketKey(), numBuckets);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
index 5fd55db13e..e1bf08d843 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputer.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.ChannelComputer;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
import java.util.Map;
@@ -38,7 +38,7 @@ public class PostponeFixedBucketChannelComputer implements
ChannelComputer<Inter
private final Map<BinaryRow, Integer> knownNumBuckets;
private transient int numChannels;
- private transient RowPartitionKeyExtractor partitionKeyExtractor;
+ private transient FixedBucketRowKeyExtractor keyExtractor;
public PostponeFixedBucketChannelComputer(
TableSchema schema, Map<BinaryRow, Integer> knownNumBuckets) {
@@ -49,13 +49,15 @@ public class PostponeFixedBucketChannelComputer implements
ChannelComputer<Inter
@Override
public void setup(int numChannels) {
this.numChannels = numChannels;
- this.partitionKeyExtractor = new RowPartitionKeyExtractor(schema);
+ this.keyExtractor = new FixedBucketRowKeyExtractor(schema);
}
@Override
public int channel(InternalRow record) {
- BinaryRow partition = partitionKeyExtractor.partition(record);
- int bucket = knownNumBuckets.computeIfAbsent(partition, p ->
numChannels);
+ keyExtractor.setRecord(record);
+ BinaryRow partition = keyExtractor.partition();
+ int numBuckets = knownNumBuckets.computeIfAbsent(partition, p ->
numChannels);
+ int bucket = keyExtractor.bucket(numBuckets);
return ChannelComputer.select(partition, bucket, numChannels);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
new file mode 100644
index 0000000000..991ab5d2dd
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PostponeFixedBucketChannelComputerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostponeFixedBucketChannelComputer}. */
+public class PostponeFixedBucketChannelComputerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testRecordsDistributedAcrossChannels() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"pt", "k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ new HashMap<String, String>() {
+ {
+ put("bucket", "-1");
+ put("postpone.bucket-mode", "true");
+ }
+ },
+ ""));
+
+ int numChannels = 8;
+ Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+ PostponeFixedBucketChannelComputer computer =
+ new PostponeFixedBucketChannelComputer(schema,
knownNumBuckets);
+ computer.setup(numChannels);
+
+ Set<Integer> channels = new HashSet<>();
+ for (long i = 0; i < 100; i++) {
+ InternalRow row = GenericRow.of(1, i, (double) i);
+ int channel = computer.channel(row);
+
assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels);
+ channels.add(channel);
+ }
+
+ // With 100 distinct keys and 8 channels, we should hit more than 1
channel
+ assertThat(channels.size()).isGreaterThan(1);
+ }
+
+ @Test
+ public void testNoPartitionDistribution() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<String, String>() {
+ {
+ put("bucket", "-1");
+ put("postpone.bucket-mode", "true");
+ }
+ },
+ ""));
+
+ int numChannels = 8;
+ Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+ PostponeFixedBucketChannelComputer computer =
+ new PostponeFixedBucketChannelComputer(schema,
knownNumBuckets);
+ computer.setup(numChannels);
+
+ Set<Integer> channels = new HashSet<>();
+ for (long i = 0; i < 100; i++) {
+ InternalRow row = GenericRow.of(i, (double) i);
+ int channel = computer.channel(row);
+
assertThat(channel).isGreaterThanOrEqualTo(0).isLessThan(numChannels);
+ channels.add(channel);
+ }
+
+ // Without the fix, all records would go to the same channel
+ // With the fix, 100 distinct keys across 8 channels should use
multiple channels
+ assertThat(channels.size()).isGreaterThan(1);
+ }
+
+ @Test
+ public void testSameKeyGoesToSameChannel() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"k", "v"});
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tempDir.toString()));
+ TableSchema schema =
+ schemaManager.createTable(
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<String, String>() {
+ {
+ put("bucket", "-1");
+ put("postpone.bucket-mode", "true");
+ }
+ },
+ ""));
+
+ int numChannels = 8;
+ Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+ PostponeFixedBucketChannelComputer computer =
+ new PostponeFixedBucketChannelComputer(schema,
knownNumBuckets);
+ computer.setup(numChannels);
+
+ // Same key should always route to the same channel
+ for (long key = 0; key < 50; key++) {
+ InternalRow row1 = GenericRow.of(key, 1.0);
+ InternalRow row2 = GenericRow.of(key, 2.0);
+
assertThat(computer.channel(row1)).isEqualTo(computer.channel(row2));
+ }
+ }
+}