This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4422d890df [core] Support partition strategy for unaware bucket
partitioned Append only table (#5202)
4422d890df is described below
commit 4422d890dfb57ba01df9717da4e428853a69ff4a
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Mar 12 14:43:46 2025 +0800
[core] Support partition strategy for unaware bucket partitioned Append
only table (#5202)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 30 +++++++++
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 ++++
.../sink/RowDataHashPartitionChannelComputer.java | 57 ++++++++++++++++
.../flink/UnawareBucketAppendOnlyTableITCase.java | 77 ++++++++++++++++++++++
5 files changed, 181 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index a97092b4a3..7539a118eb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -707,6 +707,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>String</td>
<td>Mark done action will reports the partition to the remote http
server, this can only be used by http-report partition mark done action.</td>
</tr>
+ <tr>
+ <td><h5>partition.strategy.for-unaware-append</h5></td>
+ <td style="word-wrap: break-word;">NONE</td>
+ <td><p>Enum</p></td>
+ <td>This is only for partitioned unaware-buckets append table, and
the purpose is to reduce small files and improve write performance. Through
this repartitioning strategy to reduce the number of partitions written by each
task to as few as possible.<ul><li>none: Rebalanced or Forward partitioning,
this is the default behavior, this strategy is suitable for the number of
partitions you write in a batch is much smaller than write
parallelism.</li><li>hash: Hash the partitions [...]
+ </tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index a41c478724..b04bce6174 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -459,6 +459,25 @@ public class CoreOptions implements Serializable {
"Open file cost of a source file. It is used to
avoid reading"
+ " too many files with a source split,
which can be very slow.");
+ public static final ConfigOption<UnawareAppendPartitionStrategy>
+ PARTITION_STRATEGY_FOR_UNAWARE_APPEND =
+ key("partition.strategy.for-unaware-append")
+ .enumType(UnawareAppendPartitionStrategy.class)
+ .defaultValue(UnawareAppendPartitionStrategy.NONE)
+ .withDescription(
+ Description.builder()
+ .text(
+ "This is only for
partitioned unaware-buckets append table, and the purpose is to reduce small
files and improve write performance."
+ + " Through this
repartitioning strategy to reduce the number of partitions written by each task
to as few as possible.")
+ .list(
+ text(
+ "none: Rebalanced
or Forward partitioning, this is the default behavior,"
+ + " this
strategy is suitable for the number of partitions you write in a batch is much
smaller than write parallelism."),
+ text(
+ "hash: Hash the
partitions value,"
+ + " this
strategy is suitable for the number of partitions you write in a batch is
greater equals than write parallelism."))
+ .build());
+
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
key("write-buffer-size")
.memoryType()
@@ -1990,6 +2009,10 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_BUFFER_FOR_APPEND);
}
+ public UnawareAppendPartitionStrategy partitionStrategyForUnawareAppend() {
+ return options.get(PARTITION_STRATEGY_FOR_UNAWARE_APPEND);
+ }
+
public int writeMaxWritersToSpill() {
return options.get(WRITE_MAX_WRITERS_TO_SPILL);
}
@@ -3332,4 +3355,11 @@ public class CoreOptions implements Serializable {
/** Lookup compaction will use UniversalCompaction strategy to gently
compact new files. */
GENTLE
}
+
+ /** Partition strategy for unaware bucket partitioned append only table. */
+ public enum UnawareAppendPartitionStrategy {
+ NONE,
+ HASH
+ // TODO : Supports range-partition strategy.
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index e06639d794..4d67c11865 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -55,6 +55,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
import static org.apache.paimon.CoreOptions.OrderType.ORDER;
import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
+import static
org.apache.paimon.CoreOptions.UnawareAppendPartitionStrategy.HASH;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
@@ -303,6 +304,16 @@ public class FlinkSinkBuilder {
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for
now.");
+
+ if (!table.partitionKeys().isEmpty()
+ && table.coreOptions().partitionStrategyForUnawareAppend() ==
HASH) {
+ input =
+ partition(
+ input,
+ new
RowDataHashPartitionChannelComputer(table.schema()),
+ parallelism);
+ }
+
return new RowUnawareBucketSink(table, overwritePartition,
logSinkFunction, parallelism)
.sinkFrom(input);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
new file mode 100644
index 0000000000..964fad0955
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataHashPartitionChannelComputer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
+
+/** This is only for partitioned unaware-buckets Append only table. */
+public class RowDataHashPartitionChannelComputer implements
ChannelComputer<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TableSchema schema;
+
+ private transient int numChannels;
+ private transient KeyAndBucketExtractor<InternalRow> extractor;
+
+ public RowDataHashPartitionChannelComputer(TableSchema schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ this.extractor = new UnawareBucketRowKeyExtractor(schema);
+ }
+
+ @Override
+ public int channel(InternalRow record) {
+ extractor.setRecord(record);
+ return ChannelComputer.select(extractor.partition(), 0, numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by partition";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index fb8bee5d59..60301cca4a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -18,13 +18,16 @@
package org.apache.paimon.flink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,8 @@ import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.File;
import java.time.Duration;
@@ -58,6 +63,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
+import static
org.apache.paimon.CoreOptions.PARTITION_STRATEGY_FOR_UNAWARE_APPEND;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -404,6 +410,77 @@ public class UnawareBucketAppendOnlyTableITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(Row.of(1, "test"), Row.of(2,
"test"));
}
+ @ParameterizedTest
+ @EnumSource(CoreOptions.UnawareAppendPartitionStrategy.class)
+ public void testPartitionStrategyForPartitionedTable(
+ CoreOptions.UnawareAppendPartitionStrategy strategy)
+ throws Catalog.TableNotExistException {
+
+ int partitionNums = 5;
+ int largerSinkParallelism = 7;
+ int lessSinkParallelism = 3;
+ int hashStrategyResultFileCount = 1;
+ // sink parallelism is greater than the number of partitions write in
a batch, there are 2
+ // task will be no data.
+ batchSql(
+ "CREATE TABLE IF NOT EXISTS partition_strategy_table_larger ("
+ + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+ + " WITH ("
+ + "'bucket' = '-1',"
+ + "'%s' = '%s',"
+ + "'sink.parallelism' = '7')",
+ PARTITION_STRATEGY_FOR_UNAWARE_APPEND.key(), strategy);
+
+ // sink parallelism is less than the number of partitions write in a
batch, there are 2 task
+ // will write data to 2 partition.
+ batchSql(
+ "CREATE TABLE IF NOT EXISTS partition_strategy_table_less ("
+ + "id INT, data STRING, dt STRING) PARTITIONED BY (dt)"
+ + " WITH ("
+ + "'bucket' = '-1',"
+ + "'%s' = '%s',"
+ + "'sink.parallelism' = '3')",
+ PARTITION_STRATEGY_FOR_UNAWARE_APPEND.key(), strategy);
+
+ StringBuilder values = new StringBuilder();
+ // 5 partition in a batch write.
+ for (int i = 1; i <= 30; i++) {
+ for (int j = 1; j <= partitionNums; j++) {
+ values.append(String.format("(%s, 'HXH', '2025030%s'),", j,
j));
+ }
+ }
+
+ batchSql(
+ "INSERT INTO partition_strategy_table_larger VALUES "
+ + values.substring(0, values.length() - 1));
+ batchSql(
+ "INSERT INTO partition_strategy_table_less VALUES "
+ + values.substring(0, values.length() - 1));
+
+ assertThat(batchSql("SELECT * FROM
partition_strategy_table_larger").size()).isEqualTo(150);
+ assertThat(batchSql("SELECT * FROM
partition_strategy_table_less").size()).isEqualTo(150);
+
+ FileStoreTable fileStoreTableLarger =
paimonTable("partition_strategy_table_larger");
+ List<PartitionEntry> partitionEntriesLarger =
+
fileStoreTableLarger.newReadBuilder().newScan().listPartitionEntries();
+ assertThat(partitionEntriesLarger.size()).isEqualTo(partitionNums);
+ int fileCountLarger =
+ strategy == CoreOptions.UnawareAppendPartitionStrategy.HASH
+ ? hashStrategyResultFileCount
+ : largerSinkParallelism;
+ partitionEntriesLarger.forEach(x ->
assertThat(x.fileCount()).isEqualTo(fileCountLarger));
+
+ FileStoreTable fileStoreTableLess =
paimonTable("partition_strategy_table_less");
+ List<PartitionEntry> partitionEntriesLess =
+
fileStoreTableLess.newReadBuilder().newScan().listPartitionEntries();
+ assertThat(partitionEntriesLess.size()).isEqualTo(partitionNums);
+ int fileCountLess =
+ strategy == CoreOptions.UnawareAppendPartitionStrategy.HASH
+ ? hashStrategyResultFileCount
+ : lessSinkParallelism;
+ partitionEntriesLess.forEach(x ->
assertThat(x.fileCount()).isEqualTo(fileCountLess));
+ }
+
private static class TestStatelessWriterSource extends
AbstractNonCoordinatedSource<Integer> {
private final FileStoreTable table;