This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 79a3ab2b58ae523799ddc584d24999b4c7378c10 Author: Jingsong Lee <[email protected]> AuthorDate: Fri Jun 13 19:41:10 2025 +0800 [flink] Postpone mode should support 'partition.sink-strategy' (#5743) --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- paimon-api/src/main/java/org/apache/paimon/CoreOptions.java | 2 +- .../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +++++++++-- .../flink/sink/RowDataHashPartitionChannelComputer.java | 10 ++++------ .../org/apache/paimon/flink/PostponeBucketTableITCase.java | 11 ++++++++++- 5 files changed, 25 insertions(+), 11 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 020b03c0c8..8cd61f61d6 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -765,7 +765,7 @@ This config option does not affect the default filesystem metastore.</td> <td><h5>partition.sink-strategy</h5></td> <td style="word-wrap: break-word;">NONE</td> <td><p>Enum</p></td> - <td>This is only for partitioned unaware-buckets append table, and the purpose is to reduce small files and improve write performance. Through this repartitioning strategy to reduce the number of partitions written by each task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning, this is the default behavior, this strategy is suitable for the number of partitions you write in a batch is much smaller than write parallelism.</li><li>hash: Hash the partitions [...] + <td>This is only for partitioned append table or postpone pk table, and the purpose is to reduce small files and improve write performance. Through this repartitioning strategy to reduce the number of partitions written by each task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning, this is the default behavior, this strategy is suitable for the number of partitions you write in a batch is much smaller than write parallelism.</li><li>hash: Hash the partit [...] </tr> <tr> <td><h5>partition.timestamp-formatter</h5></td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index e79d1593d5..1df7949acc 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1325,7 +1325,7 @@ public class CoreOptions implements Serializable { .withDescription( Description.builder() .text( - "This is only for partitioned unaware-buckets append table, and the purpose is to reduce small files and improve write performance." + "This is only for partitioned append table or postpone pk table, and the purpose is to reduce small files and improve write performance." + " Through this repartitioning strategy to reduce the number of partitions written by each task to as few as possible.") .list( text( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index e83bbeecbb..7f211870b9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -30,6 +30,7 @@ import org.apache.paimon.flink.sorter.TableSorter; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.ChannelComputer; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; @@ -291,8 +292,14 @@ public class FlinkSinkBuilder { } private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> input) { - DataStream<InternalRow> partitioned = - partition(input, new PostponeBucketChannelComputer(table.schema()), parallelism); + ChannelComputer<InternalRow> channelComputer; + if (!table.partitionKeys().isEmpty() + && table.coreOptions().partitionSinkStrategy() == PartitionSinkStrategy.HASH) { + channelComputer = new RowDataHashPartitionChannelComputer(table.schema()); + } else { + channelComputer = new PostponeBucketChannelComputer(table.schema()); + } + DataStream<InternalRow> partitioned = partition(input, channelComputer, parallelism); FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, null); return sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java index 73258e2966..8943c549ce 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java @@ -20,9 +20,8 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.InternalRow; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.sink.AppendTableRowKeyExtractor; import org.apache.paimon.table.sink.ChannelComputer; -import org.apache.paimon.table.sink.KeyAndBucketExtractor; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; /** This is only for partitioned unaware-buckets Append only table. */ public class RowDataHashPartitionChannelComputer implements ChannelComputer<InternalRow> { @@ -32,7 +31,7 @@ public class RowDataHashPartitionChannelComputer implements ChannelComputer<Inte private final TableSchema schema; private transient int numChannels; - private transient KeyAndBucketExtractor<InternalRow> extractor; + private transient RowPartitionKeyExtractor extractor; public RowDataHashPartitionChannelComputer(TableSchema schema) { this.schema = schema; @@ -41,13 +40,12 @@ public class RowDataHashPartitionChannelComputer implements ChannelComputer<Inte @Override public void setup(int numChannels) { this.numChannels = numChannels; - this.extractor = new AppendTableRowKeyExtractor(schema); + this.extractor = new RowPartitionKeyExtractor(schema); } @Override public int channel(InternalRow record) { - extractor.setRecord(record); - return ChannelComputer.select(extractor.partition(), 0, numChannels); + return ChannelComputer.select(extractor.partition(record), 0, numChannels); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java index cf319ac1c2..f66c2f0998 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Timeout; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import static org.assertj.core.api.Assertions.assertThat; @@ -75,7 +76,15 @@ public class PostponeBucketTableITCase extends AbstractTestBase { values.add(String.format("(%d, %d, %d)", i, j, i * numKeys + j)); } } - tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", values)).await(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + if (random.nextBoolean()) { + tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", values)).await(); + } else { + tEnv.executeSql( + "INSERT INTO T /*+ OPTIONS('partition.sink-strategy'='hash') */ VALUES " + + String.join(", ", values)) + .await(); + } assertThat(collect(tEnv.executeSql("SELECT * FROM T"))).isEmpty(); tEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
