This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 885c2e091 [flink] Introduce Range Partition And Sort in Append
Scalable Table Batch Writing for Flink (#3384)
885c2e091 is described below
commit 885c2e0919c7b3afa28769dca037ea3e8fd75aad
Author: Wencong Liu <[email protected]>
AuthorDate: Fri May 31 14:15:28 2024 +0800
[flink] Introduce Range Partition And Sort in Append Scalable Table Batch
Writing for Flink (#3384)
---
docs/content/flink/sql-write.md | 32 +++
.../generated/flink_connector_configuration.html | 24 ++
.../apache/paimon/flink/FlinkConnectorOptions.java | 37 +++
.../paimon/flink/action/SortCompactAction.java | 30 ++-
.../apache/paimon/flink/shuffle/RangeShuffle.java | 22 +-
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 110 +++++++++
.../paimon/flink/sink/FlinkTableSinkBase.java | 11 +-
.../apache/paimon/flink/sorter/HilbertSorter.java | 11 +-
.../apache/paimon/flink/sorter/OrderSorter.java | 12 +-
.../org/apache/paimon/flink/sorter/SortUtils.java | 148 ++++++-----
.../apache/paimon/flink/sorter/TableSortInfo.java | 168 +++++++++++++
.../apache/paimon/flink/sorter/TableSorter.java | 15 +-
.../apache/paimon/flink/sorter/ZorderSorter.java | 11 +-
...artitionAndSortForUnawareBucketTableITCase.java | 270 +++++++++++++++++++++
.../paimon/flink/sorter/TableSortInfoTest.java | 160 ++++++++++++
15 files changed, 962 insertions(+), 99 deletions(-)
diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md
index 8b275dbbc..f1e305ef4 100644
--- a/docs/content/flink/sql-write.md
+++ b/docs/content/flink/sql-write.md
@@ -49,6 +49,38 @@ snapshot expiration, and even partition expiration in Flink
Sink (if it is confi
For multiple jobs to write the same table, you can refer to [dedicated
compaction job]({{< ref
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
+### Clustering
+
+In Paimon, clustering is a feature that allows you to cluster data in your
[Append Table]({{< ref "append-table/append-table#Append Table" >}})
+based on the values of certain columns during the write process. This
organization of data can significantly enhance the efficiency of downstream
+tasks when reading the data, as it enables faster and more targeted data
retrieval. This feature is only supported for [Append Table]({{< ref
"append-table/append-table#Append Table" >}})
+and batch execution mode.
+
+To utilize clustering, you can specify the columns you want to cluster when
creating or writing to a table. Here's a simple example of how to enable
clustering:
+
+```sql
+CREATE TABLE my_table (
+ a STRING,
+ b STRING,
+ c STRING,
+) WITH (
+ 'sink.clustering.by-columns' = 'a,b',
+);
+```
+
+You can also use SQL hints to dynamically set clustering options:
+
+```sql
+INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
+SELECT * FROM source;
+```
+
+The data is clustered using an automatically chosen strategy (such as ORDER,
ZORDER, or HILBERT), but you can manually specify the clustering strategy
+by setting the `sink.clustering.strategy`. Clustering relies on sampling and
sorting. If the clustering process takes too much time, you can decrease
+the total sample number by setting the `sink.clustering.sample-factor` or
disable the sorting step by setting the `sink.clustering.sort-in-cluster` to
false.
+
+You can refer to [FlinkConnectorOptions]({{< ref
"maintenance/configurations#FlinkConnectorOptions" >}}) for more info about the
configurations above.
+
## Overwriting the Whole Table
For unpartitioned tables, Paimon supports overwriting the whole table.
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 4fb5fc271..6787cee3b 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -170,6 +170,30 @@ under the License.
<td>Duration</td>
<td>If no records flow in a partition of a stream for that amount
of time, then that partition is considered "idle" and will not hold back the
progress of watermarks in downstream operators.</td>
</tr>
+ <tr>
+ <td><h5>sink.clustering.by-columns</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specifies the column name(s) used for comparison during range
partitioning, in the format 'columnName1,columnName2'. If not set or set to an
empty string, it indicates that the range partitioning feature is not enabled.
This option will be effective only for bucket unaware table without primary
keys and batch execution mode.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.clustering.sample-factor</h5></td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Specifies the sample factor. Let S represent the total number
of samples, F represent the sample factor, and P represent the sink
parallelism, then S=F×P. The minimum allowed sample factor is 20.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.clustering.sort-in-cluster</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Indicates whether to further sort data belonged to each sink
task after range partitioning.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.clustering.strategy</h5></td>
+ <td style="word-wrap: break-word;">"auto"</td>
+ <td>String</td>
+ <td>Specifies the comparison algorithm used for range
partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the
z-order curve algorithm, hilbert curve algorithm, and basic type comparison
algorithm, respectively. When not configured, it will automatically determine
the algorithm based on the number of columns in 'sink.clustering.by-columns'.
'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert'
for 5 or more columns.</td>
+ </tr>
<tr>
<td><h5>sink.committer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 275ce836a..0256a5500 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -48,6 +48,8 @@ public class FlinkConnectorOptions {
public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
+ public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;
+
@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
@@ -387,6 +389,41 @@ public class FlinkConnectorOptions {
"Both can be configured at the
same time: 'done-partition,success-file'.")
.build());
+ public static final ConfigOption<String> CLUSTERING_COLUMNS =
+ key("sink.clustering.by-columns")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Specifies the column name(s) used for comparison
during range partitioning, in the format 'columnName1,columnName2'. "
+ + "If not set or set to an empty string,
it indicates that the range partitioning feature is not enabled. "
+ + "This option will be effective only for
bucket unaware table without primary keys and batch execution mode.");
+
+ public static final ConfigOption<String> CLUSTERING_STRATEGY =
+ key("sink.clustering.strategy")
+ .stringType()
+ .defaultValue("auto")
+ .withDescription(
+ "Specifies the comparison algorithm used for range
partitioning, including 'zorder', 'hilbert', and 'order', "
+ + "corresponding to the z-order curve
algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
+ + "respectively. When not configured, it
will automatically determine the algorithm based on the number of columns "
+ + "in 'sink.clustering.by-columns'.
'order' is used for 1 column, 'zorder' for less than 5 columns, "
+ + "and 'hilbert' for 5 or more columns.");
+
+ public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
+ key("sink.clustering.sort-in-cluster")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Indicates whether to further sort data belonged
to each sink task after range partitioning.");
+
+ public static final ConfigOption<Integer> CLUSTERING_SAMPLE_FACTOR =
+ key("sink.clustering.sample-factor")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "Specifies the sample factor. Let S represent the
total number of samples, F represent the sample factor, "
+ + "and P represent the sink parallelism,
then S=F×P. The minimum allowed sample factor is 20.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index efa2f386d..1cebe8bc1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -21,7 +21,9 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
+import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -116,8 +118,32 @@ public class SortCompactAction extends CompactAction {
}
DataStream<RowData> source =
sourceBuilder.env(env).sourceBounded(true).build();
- TableSorter sorter =
- TableSorter.getSorter(env, source, fileStoreTable,
sortStrategy, orderColumns);
+ int localSampleMagnification =
+ ((FileStoreTable)
table).coreOptions().getLocalSampleMagnification();
+ if (localSampleMagnification < 20) {
+ throw new IllegalArgumentException(
+ String.format(
+ "the config '%s=%d' should not be set too
small,greater than or equal to 20 is needed.",
+
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+ localSampleMagnification));
+ }
+ String sinkParallelismValue =
+
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ final int sinkParallelism =
+ sinkParallelismValue == null
+ ? source.getParallelism()
+ : Integer.parseInt(sinkParallelismValue);
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(orderColumns)
+ .setSortStrategy(OrderType.of(sortStrategy))
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+
+ TableSorter sorter = TableSorter.getSorter(env, source,
fileStoreTable, sortInfo);
new SortCompactSinkBuilder(fileStoreTable)
.forCompact(true)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 1883890ea..4db56601a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -164,7 +164,7 @@ public class RangeShuffle {
new
AssignRangeIndexOperator.RangePartitioner(rangeNum),
new
AssignRangeIndexOperator.Tuple2KeySelector<>()),
StreamExchangeMode.BATCH),
- "REMOVE KEY",
+ "REMOVE RANGE INDEX",
new RemoveRangeIndexOperator<>(),
input.getOutputType(),
outParallelism));
@@ -373,12 +373,16 @@ public class RangeShuffle {
@Override
public void processElement2(StreamRecord<Tuple2<T, RowData>>
streamRecord) {
- if (keyIndex == null || keyIndex.isEmpty()) {
- throw new RuntimeException(
- "There should be one data from the first input. And
boundaries should not be empty.");
+ if (keyIndex == null) {
+ throw new RuntimeException("There should be one data from the
first input.");
+ }
+ // If the range number is 1, the range index will be 0 for all
records.
+ if (keyIndex.isEmpty()) {
+ collector.collect(new Tuple2<>(0, streamRecord.getValue()));
+ } else {
+ Tuple2<T, RowData> row = streamRecord.getValue();
+ collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
- Tuple2<T, RowData> row = streamRecord.getValue();
- collector.collect(new Tuple2<>(binarySearch(row.f0), row));
}
@Override
@@ -408,7 +412,7 @@ public class RangeShuffle {
// key not found, but the low index is the target
// bucket, since the boundaries are the upper bound
return low > lastIndex
- ? keyIndex.get(lastIndex).getRight().get()
+ ? (keyIndex.get(lastIndex).getRight().get() + 1)
: keyIndex.get(low).getRight().get();
}
@@ -438,8 +442,8 @@ public class RangeShuffle {
@Override
public int partition(Integer key, int numPartitions) {
Preconditions.checkArgument(
- numPartitions < totalRangeNum,
- "Num of subPartitions should < totalRangeNum: " +
totalRangeNum);
+ numPartitions <= totalRangeNum,
+ "Num of subPartitions should <= totalRangeNum: " +
totalRangeNum);
int partition = key / (totalRangeNum / numPartitions);
return Math.min(numPartitions - 1, partition);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index a22de9316..8baee5ac1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -20,7 +20,11 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -34,14 +38,27 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
+import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
/**
* DataStream API for building Flink Sink.
@@ -51,12 +68,15 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
@Public
public class FlinkSinkBuilder {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSinkBuilder.class);
+
private final FileStoreTable table;
private DataStream<RowData> input;
@Nullable private Map<String, String> overwritePartition;
@Nullable private Integer parallelism;
private Boolean boundedInput = null;
+ @Nullable private TableSortInfo tableSortInfo;
// ============== for extension ==============
@@ -119,8 +139,88 @@ public class FlinkSinkBuilder {
return this;
}
+ /** Clustering the input data if possible. */
+ public FlinkSinkBuilder clusteringIfPossible(
+ String clusteringColumns,
+ String clusteringStrategy,
+ boolean sortInCluster,
+ int sampleFactor) {
+ // The clustering will be skipped if the clustering columns are empty
or the execution
+ // mode is STREAMING or the table type is illegal.
+ if (clusteringColumns == null || clusteringColumns.isEmpty()) {
+ return this;
+ }
+ checkState(input != null, "The input stream should be specified
earlier.");
+ if (boundedInput == null) {
+ boundedInput = !FlinkSink.isStreaming(input);
+ }
+ if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
+ LOG.warn(
+ "Clustering is enabled; however, it has been skipped as "
+ + "it only supports the bucket unaware table
without primary keys and "
+ + "BATCH execution mode.");
+ return this;
+ }
+ // If the clustering is not skipped, check the clustering column names
and sample
+ // factor value.
+ List<String> columns = Arrays.asList(clusteringColumns.split(","));
+ List<String> fieldNames = table.schema().fieldNames();
+ checkState(
+ new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
+ String.format(
+ "Field names %s should contains all clustering column
names %s.",
+ fieldNames, columns));
+ checkState(
+ sampleFactor >= MIN_CLUSTERING_SAMPLE_FACTOR,
+ "The minimum allowed "
+ + CLUSTERING_SAMPLE_FACTOR.key()
+ + " is "
+ + MIN_CLUSTERING_SAMPLE_FACTOR
+ + ".");
+ TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
+ if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
+ if (columns.size() == 1) {
+ sortInfoBuilder.setSortStrategy(ORDER);
+ } else if (columns.size() < 5) {
+ sortInfoBuilder.setSortStrategy(ZORDER);
+ } else {
+ sortInfoBuilder.setSortStrategy(HILBERT);
+ }
+ } else {
+ sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
+ }
+ int upstreamParallelism = input.getParallelism();
+ String sinkParallelismValue =
+
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ int sinkParallelism =
+ sinkParallelismValue == null
+ ? upstreamParallelism
+ : Integer.parseInt(sinkParallelismValue);
+ sortInfoBuilder
+ .setSortColumns(columns)
+ .setSortInCluster(sortInCluster)
+ .setSinkParallelism(sinkParallelism);
+ int globalSampleSize = sinkParallelism * sampleFactor;
+ // If the adaptive scheduler is not enabled, the local sample size is
determined by the
+ // division of global sample size by the upstream parallelism, which
limits total
+ // received data of global sample node. If the adaptive scheduler is
enabled, the
+ // local sample size will equal to sinkParallelism * minimum sample
factor.
+ int localSampleSize =
+ upstreamParallelism > 0
+ ? Math.max(sampleFactor, globalSampleSize /
upstreamParallelism)
+ : sinkParallelism * MIN_CLUSTERING_SAMPLE_FACTOR;
+ this.tableSortInfo =
+ sortInfoBuilder
+ .setRangeNumber(sinkParallelism)
+ .setGlobalSampleSize(globalSampleSize)
+ .setLocalSampleSize(localSampleSize)
+ .build();
+ return this;
+ }
+
/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
+ input = trySortInput(input);
DataStream<InternalRow> input = MapToInternalRow.map(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
input =
@@ -181,4 +281,14 @@ public class FlinkSinkBuilder {
table, overwritePartition, logSinkFunction,
parallelism, boundedInput)
.sinkFrom(input);
}
+
+ private DataStream<RowData> trySortInput(DataStream<RowData> input) {
+ if (tableSortInfo != null) {
+ TableSorter sorter =
+ TableSorter.getSorter(
+ input.getExecutionEnvironment(), input, table,
tableSortInfo);
+ return sorter.sort();
+ }
+ return input;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index b6a944703..4202717c8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -45,6 +45,10 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
/** Table sink to create sink. */
@@ -131,7 +135,12 @@ public abstract class FlinkTableSinkBase
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
- .inputBounded(context.isBounded());
+ .inputBounded(context.isBounded())
+ .clusteringIfPossible(
+ conf.get(CLUSTERING_COLUMNS),
+ conf.get(CLUSTERING_STRATEGY),
+ conf.get(CLUSTERING_SORT_IN_CLUSTER),
+ conf.get(CLUSTERING_SAMPLE_FACTOR));
if (overwrite) {
builder.overwrite(staticPartitions);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
index 944e0fc43..26c698f5e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* This is a table sorter which will sort the records by the hilbert curve of
specified columns. It
@@ -49,12 +48,15 @@ public class HilbertSorter extends TableSorter {
private static final RowType KEY_TYPE =
new RowType(Collections.singletonList(new DataField(0, "H_INDEX",
DataTypes.BYTES())));
+ private final TableSortInfo tableSortInfo;
+
public HilbertSorter(
StreamExecutionEnvironment batchTEnv,
DataStream<RowData> origin,
FileStoreTable table,
- List<String> colNames) {
- super(batchTEnv, origin, table, colNames);
+ TableSortInfo tableSortInfo) {
+ super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+ this.tableSortInfo = tableSortInfo;
}
@Override
@@ -99,6 +101,7 @@ public class HilbertSorter extends TableSorter {
return Arrays.copyOf(hilbert, hilbert.length);
}
},
- GenericRow::of);
+ GenericRow::of,
+ tableSortInfo);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index dadd4c413..406195b6f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -31,19 +31,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
-import java.util.List;
-
import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
/** Alphabetical order sorter to sort records by the given `orderColNames`. */
public class OrderSorter extends TableSorter {
+ private final TableSortInfo tableSortInfo;
+
public OrderSorter(
StreamExecutionEnvironment batchTEnv,
DataStream<RowData> origin,
FileStoreTable table,
- List<String> orderColNames) {
- super(batchTEnv, origin, table, orderColNames);
+ TableSortInfo tableSortInfo) {
+ super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+ this.tableSortInfo = tableSortInfo;
}
@Override
@@ -75,6 +76,7 @@ public class OrderSorter extends TableSorter {
return keyProjection.apply(new
FlinkRowWrapper(value)).copy();
}
},
- row -> row);
+ row -> row,
+ tableSortInfo);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 9a1dbb729..79e9f1298 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sorter;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
@@ -38,7 +37,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
import java.io.Serializable;
import java.util.ArrayList;
@@ -72,6 +76,7 @@ public class SortUtils {
* @param shuffleKeyAbstract abstract the key from the input `RowData`
* @param convertor convert the `KEY` to the sort key, then we can sort in
* `BinaryExternalSortBuffer`.
+ * @param tableSortInfo the necessary info of table sort.
* @return the global sorted data stream
* @param <KEY> the KEY type in range shuffle
*/
@@ -82,34 +87,15 @@ public class SortUtils {
final TypeInformation<KEY> keyTypeInformation,
final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
final KeyAbstract<KEY> shuffleKeyAbstract,
- final ShuffleKeyConvertor<KEY> convertor) {
+ final ShuffleKeyConvertor<KEY> convertor,
+ final TableSortInfo tableSortInfo) {
final RowType valueRowType = table.rowType();
- final int parallelism = inputStream.getParallelism();
CoreOptions options = table.coreOptions();
-
- String sinkParallelismValue =
-
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
- final int sinkParallelism =
- sinkParallelismValue == null
- ? inputStream.getParallelism()
- : Integer.parseInt(sinkParallelismValue);
- if (sinkParallelism == -1) {
- throw new UnsupportedOperationException(
- "The adaptive batch scheduler is not supported. Please set
the sink parallelism using the key: "
- + FlinkConnectorOptions.SINK_PARALLELISM.key());
- }
- int localSampleMagnification = options.getLocalSampleMagnification();
- if (localSampleMagnification < 20) {
- throw new IllegalArgumentException(
- String.format(
- "the config '%s=%d' should not be set too
small,greater than or equal to 20 is needed.",
-
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
- localSampleMagnification));
- }
- final int localSampleSize = sinkParallelism * localSampleMagnification;
- final int globalSampleSize = sinkParallelism * 1000;
- final int rangeNum = sinkParallelism * 10;
+ final int sinkParallelism = tableSortInfo.getSinkParallelism();
+ final int localSampleSize = tableSortInfo.getLocalSampleSize();
+ final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
+ final int rangeNum = tableSortInfo.getRangeNumber();
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
@@ -145,10 +131,11 @@ public class SortUtils {
}
},
new TupleTypeInfo<>(keyTypeInformation,
inputStream.getType()))
- .setParallelism(parallelism);
+ .setParallelism(inputStream.getParallelism());
// range shuffle by key
- return RangeShuffle.rangeShuffleByKey(
+ DataStream<Tuple2<KEY, RowData>> rangeShuffleResult =
+ RangeShuffle.rangeShuffleByKey(
inputWithKey,
shuffleKeyComparator,
keyTypeInformation,
@@ -157,45 +144,52 @@ public class SortUtils {
rangeNum,
sinkParallelism,
valueRowType,
- options.sortBySize())
- .map(
- a -> new JoinedRow(convertor.apply(a.f0), new
FlinkRowWrapper(a.f1)),
- internalRowType)
- .setParallelism(sinkParallelism)
- // sort the output locally by `SortOperator`
- .transform(
- "LOCAL SORT",
- internalRowType,
- new SortOperator(
- sortKeyType,
- longRowType,
- options.writeBufferSize(),
- options.pageSize(),
- options.localSortMaxNumFileHandles(),
- options.spillCompression(),
- sinkParallelism,
- options.writeBufferSpillDiskSize()))
- .setParallelism(sinkParallelism)
- // remove the key column from every row
- .map(
- new RichMapFunction<InternalRow, InternalRow>() {
-
- private transient KeyProjectedRow keyProjectedRow;
-
- @Override
- public void open(Configuration parameters) {
- keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
- }
-
- @Override
- public InternalRow map(InternalRow value) {
- return keyProjectedRow.replaceRow(value);
- }
- },
- InternalTypeInfo.fromRowType(valueRowType))
- .setParallelism(sinkParallelism)
- .map(FlinkRowData::new, inputStream.getType())
- .setParallelism(sinkParallelism);
+ options.sortBySize());
+ if (tableSortInfo.isSortInCluster()) {
+ return rangeShuffleResult
+ .map(
+ a -> new JoinedRow(convertor.apply(a.f0), new
FlinkRowWrapper(a.f1)),
+ internalRowType)
+ .setParallelism(sinkParallelism)
+ // sort the output locally by `SortOperator`
+ .transform(
+ "LOCAL SORT",
+ internalRowType,
+ new SortOperator(
+ sortKeyType,
+ longRowType,
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles(),
+ options.spillCompression(),
+ sinkParallelism,
+ options.writeBufferSpillDiskSize()))
+ .setParallelism(sinkParallelism)
+ // remove the key column from every row
+ .map(
+ new RichMapFunction<InternalRow, InternalRow>() {
+
+ private transient KeyProjectedRow
keyProjectedRow;
+
+ @Override
+ public void open(Configuration parameters) {
+ keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) {
+ return keyProjectedRow.replaceRow(value);
+ }
+ },
+ InternalTypeInfo.fromRowType(valueRowType))
+ .setParallelism(sinkParallelism)
+ .map(FlinkRowData::new, inputStream.getType())
+ .setParallelism(sinkParallelism);
+ } else {
+ return rangeShuffleResult
+ .transform("REMOVE KEY", inputStream.getType(), new
RemoveKeyOperator<>())
+ .setParallelism(sinkParallelism);
+ }
}
/** Abstract key from a row data. */
@@ -206,4 +200,24 @@ public class SortUtils {
}
interface ShuffleKeyConvertor<KEY> extends Function<KEY, InternalRow>,
Serializable {}
+
+ /** Remove the abstract key. */
+ private static class RemoveKeyOperator<T> extends
TableStreamOperator<RowData>
+ implements OneInputStreamOperator<Tuple2<T, RowData>, RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Collector<RowData> collector;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<T, RowData>>
streamRecord) {
+ collector.collect(streamRecord.getValue().f1);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
new file mode 100644
index 000000000..16760bd35
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link TableSortInfo} is used to indicate the configuration details for
table data sorting. This
+ * includes information about which columns to sort by, the sorting strategy
(e.g., order, Z-order),
+ * whether to sort within each cluster, and sample sizes for local and global
sample nodes.
+ */
+public class TableSortInfo {
+
+ private final List<String> sortColumns;
+
+ private final OrderType sortStrategy;
+
+ private final boolean sortInCluster;
+
+ private final int rangeNumber;
+
+ private final int sinkParallelism;
+
+ private final int localSampleSize;
+
+ private final int globalSampleSize;
+
+ private TableSortInfo(
+ List<String> sortColumns,
+ OrderType sortStrategy,
+ boolean sortInCluster,
+ int rangeNumber,
+ int sinkParallelism,
+ int localSampleSize,
+ int globalSampleSize) {
+ this.sortColumns = sortColumns;
+ this.sortStrategy = sortStrategy;
+ this.sortInCluster = sortInCluster;
+ this.rangeNumber = rangeNumber;
+ this.sinkParallelism = sinkParallelism;
+ this.localSampleSize = localSampleSize;
+ this.globalSampleSize = globalSampleSize;
+ }
+
+ public List<String> getSortColumns() {
+ return sortColumns;
+ }
+
+ public OrderType getSortStrategy() {
+ return sortStrategy;
+ }
+
+ public boolean isSortInCluster() {
+ return sortInCluster;
+ }
+
+ public int getRangeNumber() {
+ return rangeNumber;
+ }
+
+ public int getLocalSampleSize() {
+ return localSampleSize;
+ }
+
+ public int getGlobalSampleSize() {
+ return globalSampleSize;
+ }
+
+ public int getSinkParallelism() {
+ return sinkParallelism;
+ }
+
+ /** Builder for {@link TableSortInfo}. */
+ public static class Builder {
+
+ private List<String> sortColumns = Collections.emptyList();
+
+ private OrderType sortStrategy = OrderType.ORDER;
+
+ private boolean sortInCluster = true;
+
+ private int rangeNumber = -1;
+
+ private int sinkParallelism = -1;
+
+ private int localSampleSize = -1;
+
+ private int globalSampleSize = -1;
+
+ public Builder setSortColumns(List<String> sortColumns) {
+ this.sortColumns = sortColumns;
+ return this;
+ }
+
+ public Builder setSortStrategy(OrderType sortStrategy) {
+ this.sortStrategy = sortStrategy;
+ return this;
+ }
+
+ public Builder setSortInCluster(boolean sortInCluster) {
+ this.sortInCluster = sortInCluster;
+ return this;
+ }
+
+ public Builder setRangeNumber(int rangeNumber) {
+ this.rangeNumber = rangeNumber;
+ return this;
+ }
+
+ public Builder setSinkParallelism(int sinkParallelism) {
+ this.sinkParallelism = sinkParallelism;
+ return this;
+ }
+
+ public Builder setLocalSampleSize(int localSampleSize) {
+ this.localSampleSize = localSampleSize;
+ return this;
+ }
+
+ public Builder setGlobalSampleSize(int globalSampleSize) {
+ this.globalSampleSize = globalSampleSize;
+ return this;
+ }
+
+ public TableSortInfo build() {
+ checkArgument(!sortColumns.isEmpty(), "Sort columns cannot be
empty");
+ checkNotNull(sortStrategy, "Sort strategy cannot be null");
+ checkArgument(
+ sinkParallelism > 0,
+ "The sink parallelism must be specified when sorting the
table data. Please set it using the key: %s",
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
+ checkArgument(rangeNumber > 0, "Range number must be positive");
+ checkArgument(localSampleSize > 0, "Local sample size must be
positive");
+ checkArgument(globalSampleSize > 0, "Global sample size must be
positive");
+ return new TableSortInfo(
+ sortColumns,
+ sortStrategy,
+ sortInCluster,
+ rangeNumber,
+ sinkParallelism,
+ localSampleSize,
+ globalSampleSize);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index e4f85ea9d..a0d4b6af2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -70,21 +70,22 @@ public abstract class TableSorter {
StreamExecutionEnvironment batchTEnv,
DataStream<RowData> origin,
FileStoreTable fileStoreTable,
- String sortStrategy,
- List<String> orderColumns) {
- switch (OrderType.of(sortStrategy)) {
+ TableSortInfo sortInfo) {
+ OrderType sortStrategy = sortInfo.getSortStrategy();
+ switch (sortStrategy) {
case ORDER:
- return new OrderSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
+ return new OrderSorter(batchTEnv, origin, fileStoreTable,
sortInfo);
case ZORDER:
- return new ZorderSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
+ return new ZorderSorter(batchTEnv, origin, fileStoreTable,
sortInfo);
case HILBERT:
- return new HilbertSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
+ return new HilbertSorter(batchTEnv, origin, fileStoreTable,
sortInfo);
default:
throw new IllegalArgumentException("cannot match order type: "
+ sortStrategy);
}
}
- enum OrderType {
+ /** The order type of table sort. */
+ public enum OrderType {
ORDER("order"),
ZORDER("zorder"),
HILBERT("hilbert");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index bc3c7f5c5..fad66f364 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
/**
* This is a table sorter which will sort the records by the z-order of
specified columns. It works
@@ -49,12 +48,15 @@ public class ZorderSorter extends TableSorter {
private static final RowType KEY_TYPE =
new RowType(Collections.singletonList(new DataField(0, "Z_INDEX",
DataTypes.BYTES())));
+ private final TableSortInfo sortInfo;
+
public ZorderSorter(
StreamExecutionEnvironment batchTEnv,
DataStream<RowData> origin,
FileStoreTable table,
- List<String> zOrderColNames) {
- super(batchTEnv, origin, table, zOrderColNames);
+ TableSortInfo sortInfo) {
+ super(batchTEnv, origin, table, sortInfo.getSortColumns());
+ this.sortInfo = sortInfo;
}
@Override
@@ -102,6 +104,7 @@ public class ZorderSorter extends TableSorter {
return Arrays.copyOf(zorder, zorder.length);
}
},
- GenericRow::of);
+ GenericRow::of,
+ sortInfo);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
new file mode 100644
index 000000000..c65f4ac12
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AppendOnlyFileStoreScan;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * The IT test for the range partitioning and sort batch writing for
append-only bucket unaware
+ * tables.
+ */
+public class RangePartitionAndSortForUnawareBucketTableITCase extends
CatalogITCaseBase {
+
+ private static final int SINK_ROW_NUMBER = 1000;
+
+ @Test
+ public void testSortConfigurationChecks() {
+ batchSql(
+ "CREATE TEMPORARY TABLE source1 (col1 INT, col2 INT, col3 INT,
col4 INT) "
+ + "WITH ('connector'='values', 'bounded'='true')");
+ batchSql("CREATE TABLE IF NOT EXISTS sink1 (col1 INT, col2 INT, col3
INT, col4 INT)");
+ streamSqlIter(
+ "CREATE TEMPORARY TABLE source2 (col1 INT, col2 INT, col3 INT,
col4 INT) WITH ('connector'='values')");
+ streamSqlIter("CREATE TABLE IF NOT EXISTS sink2 (col1 INT, col2 INT,
col3 INT, col4 INT)");
+ // 1. Check the sort columns.
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+
OPTIONS('sink.clustering.by-columns' = 'col1,xx1') */ "
+ + "SELECT * FROM source1"))
+ .withMessageContaining("should contains all clustering column
names");
+ // 2. Check the sample factor.
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+
OPTIONS('sink.clustering.by-columns' = 'col1', "
+ +
"'sink.clustering.sample-factor' = '10') */ SELECT * "
+ + "FROM source1"))
+ .withMessageContaining("The minimum allowed
sink.clustering.sample-factor");
+ // 3. Check the sink parallelism.
+ batchSql(
+ "CREATE TEMPORARY TABLE source3 (col1 INT, col2 INT) WITH
('connector'='values', 'bounded'='true')");
+ assertThatExceptionOfType(RuntimeException.class)
+ .isThrownBy(
+ () ->
+ batchSql(
+ "INSERT INTO sink1 /*+
OPTIONS('sink.clustering.by-columns' = 'col1') */ "
+ + "SELECT source1.col1,
source1.col2, source3.col1, "
+ + "source3.col2 FROM source1
JOIN source3 ON source1.col1 = source3.col1"))
+ .withMessageContaining(
+ "The sink parallelism must be specified when sorting
the table data");
+ }
+
+ @Test
+ public void testRangePartition() throws Exception {
+ List<Row> inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3
INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+
OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10',
'sink.clustering.sort-in-cluster' = 'false') */ "
+ + "SELECT * FROM test_source");
+ List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ List<ManifestEntry> files =
testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List<Tuple2<Integer, Integer>> minMaxOfEachFile = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+
.withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("/temp/xxx")
+ .build();
+ final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+ final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+ testStoreTable
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+ int result = internalRow.getInt(0);
+ min.set(Math.min(min.get(), result));
+ max.set(Math.max(max.get(), result));
+ });
+ minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+ }
+ minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+ Tuple2<Integer, Integer> preResult = minMaxOfEachFile.get(0);
+ for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+ Tuple2<Integer, Integer> currentResult =
minMaxOfEachFile.get(index);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+ assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+ }
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithOrderStrategy() throws Exception {
+ List<Row> inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3
INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+
OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10',
'sink.clustering.strategy' = 'order') */ "
+ + "SELECT * FROM test_source");
+ List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ List<ManifestEntry> files =
testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List<Tuple2<Integer, Integer>> minMaxOfEachFile = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+
.withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("/temp/xxx")
+ .build();
+ final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+ final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+ final AtomicInteger current = new AtomicInteger(Integer.MIN_VALUE);
+ testStoreTable
+ .newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+ int result = internalRow.getInt(0);
+ min.set(Math.min(min.get(), result));
+ max.set(Math.max(max.get(), result));
+
Assertions.assertThat(result).isGreaterThanOrEqualTo(current.get());
+ current.set(result);
+ });
+ minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+ }
+ minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+ Tuple2<Integer, Integer> preResult = minMaxOfEachFile.get(0);
+ for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+ Tuple2<Integer, Integer> currentResult =
minMaxOfEachFile.get(index);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+ assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+ assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+ }
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithZOrderStrategy() throws Exception
{
+ List<Row> inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3
INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+
OPTIONS('sink.clustering.by-columns' = 'col1', "
+ + "'sink.parallelism' = '10',
'sink.clustering.strategy' = 'zorder') */ "
+ + "SELECT * FROM test_source");
+ List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(testStoreTable.rowType());
+ Predicate predicate = predicateBuilder.between(0, 100, 200);
+ List<ManifestEntry> files =
testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List<ManifestEntry> filesFilter =
+ ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ }
+
+ @Test
+ public void testRangePartitionAndSortWithHilbertStrategy() throws
Exception {
+ List<Row> inputRows = generateSinkRows();
+ String id = TestValuesTableFactory.registerData(inputRows);
+ batchSql(
+ "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3
INT, col4 INT) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+ batchSql(
+ "INSERT INTO test_table /*+
OPTIONS('sink.clustering.by-columns' = 'col1,col2', "
+ + "'sink.parallelism' = '10',
'sink.clustering.strategy' = 'hilbert') */ "
+ + "SELECT * FROM test_source");
+ List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+ assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+ FileStoreTable testStoreTable = paimonTable("test_table");
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(testStoreTable.rowType());
+ Predicate predicate = predicateBuilder.between(0, 100, 200);
+ List<ManifestEntry> files =
testStoreTable.store().newScan().plan().files();
+ assertThat(files.size()).isEqualTo(10);
+ List<ManifestEntry> filesFilter =
+ ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ }
+
+ private List<Row> generateSinkRows() {
+ List<Row> sinkRows = new ArrayList<>();
+ Random random = new Random();
+ for (int round = 0; round < SINK_ROW_NUMBER; round++) {
+ sinkRows.add(
+ Row.ofKind(
+ RowKind.INSERT,
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER),
+ random.nextInt(SINK_ROW_NUMBER)));
+ }
+ return sinkRows;
+ }
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS test_table (col1 INT, col2 INT,
col3 INT, col4 INT)");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
new file mode 100644
index 000000000..e9e01baea
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** The unit test for {@link TableSortInfo}. */
+public class TableSortInfoTest {
+
+ @Test
+ public void testTableSortInfoBuilderWithValidParameters() {
+ TableSortInfo tableSortInfo =
+ new TableSortInfo.Builder()
+ .setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build();
+
+ assertThat(tableSortInfo.getSortColumns()).containsExactly("column1",
"column2");
+ assertThat(tableSortInfo.getSortStrategy()).isEqualTo(OrderType.ORDER);
+ assertThat(tableSortInfo.isSortInCluster()).isTrue();
+ assertThat(tableSortInfo.getRangeNumber()).isEqualTo(10);
+ assertThat(tableSortInfo.getSinkParallelism()).isEqualTo(5);
+ assertThat(tableSortInfo.getLocalSampleSize()).isEqualTo(100);
+ assertThat(tableSortInfo.getGlobalSampleSize()).isEqualTo(500);
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithEmptySortColumns() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Collections.emptyList())
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Sort columns cannot be empty");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNullSortStrategy() {
+ assertThatExceptionOfType(NullPointerException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(null)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Sort strategy cannot be null");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNegativeRangeNumber() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(-1)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Range number must be positive");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithZeroSinkParallelism() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(0)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessageContaining(
+ "The sink parallelism must be specified when sorting
the table data. Please set it using the key: %s",
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithZeroLocalSampleSize() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(0) // This should
trigger an exception
+ .setGlobalSampleSize(500)
+ .build())
+ .withMessage("Local sample size must be positive");
+ }
+
+ @Test
+ public void testTableSortInfoBuilderWithNegativeGlobalSampleSize() {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ new TableSortInfo.Builder()
+
.setSortColumns(Arrays.asList("column1", "column2"))
+ .setSortStrategy(OrderType.ORDER)
+ .setSortInCluster(true)
+ .setRangeNumber(10)
+ .setSinkParallelism(5)
+ .setLocalSampleSize(100)
+ .setGlobalSampleSize(-1) // This
should trigger an exception
+ .build())
+ .withMessage("Global sample size must be positive");
+ }
+}