This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4422d890df [core] Support partition strategy for unaware bucket 
partitioned Append only table (#5202)
4422d890df is described below

commit 4422d890dfb57ba01df9717da4e428853a69ff4a
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Mar 12 14:43:46 2025 +0800

    [core] Support partition strategy for unaware bucket partitioned Append 
only table (#5202)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 30 +++++++++
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 ++++
 .../sink/RowDataHashPartitionChannelComputer.java  | 57 ++++++++++++++++
 .../flink/UnawareBucketAppendOnlyTableITCase.java  | 77 ++++++++++++++++++++++
 5 files changed, 181 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a97092b4a3..7539a118eb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -707,6 +707,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>String</td>
             <td>Mark done action will reports the partition to the remote http 
server, this can only be used by http-report partition mark done action.</td>
         </tr>
+        <tr>
+            <td><h5>partition.strategy.for-unaware-append</h5></td>
+            <td style="word-wrap: break-word;">NONE</td>
+            <td><p>Enum</p></td>
+            <td>This is only for partitioned unaware-buckets append table, and 
the purpose is to reduce small files and improve write performance. Through 
this repartitioning strategy to reduce the number of partitions written by each 
task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning, 
this is the default behavior, this strategy is suitable for the number of 
partitions you write in a batch is much smaller than write 
parallelism.</li><li>hash: Hash the partitions  [...]
+        </tr>
         <tr>
             <td><h5>partition.timestamp-formatter</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index a41c478724..b04bce6174 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -459,6 +459,25 @@ public class CoreOptions implements Serializable {
                             "Open file cost of a source file. It is used to 
avoid reading"
                                     + " too many files with a source split, 
which can be very slow.");
 
+    public static final ConfigOption<UnawareAppendPartitionStrategy>
+            PARTITION_STRATEGY_FOR_UNAWARE_APPEND =
+                    key("partition.strategy.for-unaware-append")
+                            .enumType(UnawareAppendPartitionStrategy.class)
+                            .defaultValue(UnawareAppendPartitionStrategy.NONE)
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "This is only for 
partitioned unaware-buckets append table, and the purpose is to reduce small 
files and improve write performance."
+                                                            + " Through this 
repartitioning strategy to reduce the number of partitions written by each task 
to as few as possible.")
+                                            .list(
+                                                    text(
+                                                            "none: Rebalanced 
or Forward partitioning, this is the default behavior,"
+                                                                    + " this 
strategy is suitable for the number of partitions you write in a batch is much 
smaller than write parallelism."),
+                                                    text(
+                                                            "hash: Hash the 
partitions value,"
+                                                                    + " this 
strategy is suitable for the number of partitions you write in a batch is 
greater equals than write parallelism."))
+                                            .build());
+
     public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
             key("write-buffer-size")
                     .memoryType()
@@ -1990,6 +2009,10 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_BUFFER_FOR_APPEND);
     }
 
+    public UnawareAppendPartitionStrategy partitionStrategyForUnawareAppend() {
+        return options.get(PARTITION_STRATEGY_FOR_UNAWARE_APPEND);
+    }
+
     public int writeMaxWritersToSpill() {
         return options.get(WRITE_MAX_WRITERS_TO_SPILL);
     }
@@ -3332,4 +3355,11 @@ public class CoreOptions implements Serializable {
         /** Lookup compaction will use UniversalCompaction strategy to gently 
compact new files. */
         GENTLE
     }
+
+    /** Partition strategy for unaware bucket partitioned append only table. */
+    public enum UnawareAppendPartitionStrategy {
+        NONE,
+        HASH
+        // TODO : Supports range-partition strategy.
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index e06639d794..4d67c11865 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -55,6 +55,7 @@ import java.util.Map;
 import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
 import static org.apache.paimon.CoreOptions.OrderType.ORDER;
 import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
+import static 
org.apache.paimon.CoreOptions.UnawareAppendPartitionStrategy.HASH;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -303,6 +304,16 @@ public class FlinkSinkBuilder {
         checkArgument(
                 table.primaryKeys().isEmpty(),
                 "Unaware bucket mode only works with append-only table for 
now.");
+
+        if (!table.partitionKeys().isEmpty()
+                && table.coreOptions().partitionStrategyForUnawareAppend() == 
HASH) {
+            input =
+                    partition(
+                            input,
+                            new 
RowDataHashPartitionChannelComputer(table.schema()),
+                            parallelism);
+        }
+
         return new RowUnawareBucketSink(table, overwritePartition, 
logSinkFunction, parallelism)
                 .sinkFrom(input);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
new file mode 100644
index 0000000000..964fad0955
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
+
+/** This is only for partitioned unaware-buckets Append only table. */
+public class RowDataHashPartitionChannelComputer implements 
ChannelComputer<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TableSchema schema;
+
+    private transient int numChannels;
+    private transient KeyAndBucketExtractor<InternalRow> extractor;
+
+    public RowDataHashPartitionChannelComputer(TableSchema schema) {
+        this.schema = schema;
+    }
+
+    @Override
+    public void setup(int numChannels) {
+        this.numChannels = numChannels;
+        this.extractor = new UnawareBucketRowKeyExtractor(schema);
+    }
+
+    @Override
+    public int channel(InternalRow record) {
+        extractor.setRecord(record);
+        return ChannelComputer.select(extractor.partition(), 0, numChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "shuffle by partition";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index fb8bee5d59..60301cca4a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -18,13 +18,16 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
 import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
 import org.apache.paimon.flink.source.SimpleSourceSplit;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,8 @@ import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.File;
 import java.time.Duration;
@@ -58,6 +63,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import static 
org.apache.paimon.CoreOptions.PARTITION_STRATEGY_FOR_UNAWARE_APPEND;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -404,6 +410,77 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of(1, "test"), Row.of(2, 
"test"));
     }
 
+    @ParameterizedTest
+    @EnumSource(CoreOptions.UnawareAppendPartitionStrategy.class)
+    public void testPartitionStrategyForPartitionedTable(
+            CoreOptions.UnawareAppendPartitionStrategy strategy)
+            throws Catalog.TableNotExistException {
+
+        int partitionNums = 5;
+        int largerSinkParallelism = 7;
+        int lessSinkParallelism = 3;
+        int hashStrategyResultFileCount = 1;
+        // sink parallelism is greater than the number of partitions write in 
a batch, there are 2
+        // task will be no data.
+        batchSql(
+                "CREATE TABLE IF NOT EXISTS partition_strategy_table_larger ("
+                        + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+                        + " WITH ("
+                        + "'bucket' = '-1',"
+                        + "'%s' = '%s',"
+                        + "'sink.parallelism' = '7')",
+                PARTITION_STRATEGY_FOR_UNAWARE_APPEND.key(), strategy);
+
+        // sink parallelism is less than the number of partitions write in a 
batch, there are 2 task
+        // will write data to 2 partition.
+        batchSql(
+                "CREATE TABLE IF NOT EXISTS partition_strategy_table_less ("
+                        + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+                        + " WITH ("
+                        + "'bucket' = '-1',"
+                        + "'%s' = '%s',"
+                        + "'sink.parallelism' = '3')",
+                PARTITION_STRATEGY_FOR_UNAWARE_APPEND.key(), strategy);
+
+        StringBuilder values = new StringBuilder();
+        // 5 partition in a batch write.
+        for (int i = 1; i <= 30; i++) {
+            for (int j = 1; j <= partitionNums; j++) {
+                values.append(String.format("(%s, 'HXH', '2025030%s'),", j, 
j));
+            }
+        }
+
+        batchSql(
+                "INSERT INTO partition_strategy_table_larger VALUES "
+                        + values.substring(0, values.length() - 1));
+        batchSql(
+                "INSERT INTO partition_strategy_table_less VALUES "
+                        + values.substring(0, values.length() - 1));
+
+        assertThat(batchSql("SELECT * FROM 
partition_strategy_table_larger").size()).isEqualTo(150);
+        assertThat(batchSql("SELECT * FROM 
partition_strategy_table_less").size()).isEqualTo(150);
+
+        FileStoreTable fileStoreTableLarger = 
paimonTable("partition_strategy_table_larger");
+        List<PartitionEntry> partitionEntriesLarger =
+                
fileStoreTableLarger.newReadBuilder().newScan().listPartitionEntries();
+        assertThat(partitionEntriesLarger.size()).isEqualTo(partitionNums);
+        int fileCountLarger =
+                strategy == CoreOptions.UnawareAppendPartitionStrategy.HASH
+                        ? hashStrategyResultFileCount
+                        : largerSinkParallelism;
+        partitionEntriesLarger.forEach(x -> 
assertThat(x.fileCount()).isEqualTo(fileCountLarger));
+
+        FileStoreTable fileStoreTableLess = 
paimonTable("partition_strategy_table_less");
+        List<PartitionEntry> partitionEntriesLess =
+                
fileStoreTableLess.newReadBuilder().newScan().listPartitionEntries();
+        assertThat(partitionEntriesLess.size()).isEqualTo(partitionNums);
+        int fileCountLess =
+                strategy == CoreOptions.UnawareAppendPartitionStrategy.HASH
+                        ? hashStrategyResultFileCount
+                        : lessSinkParallelism;
+        partitionEntriesLess.forEach(x -> 
assertThat(x.fileCount()).isEqualTo(fileCountLess));
+    }
+
     private static class TestStatelessWriterSource extends 
AbstractNonCoordinatedSource<Integer> {
 
         private final FileStoreTable table;

Reply via email to