This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 885c2e091 [flink] Introduce Range Partition And Sort in Append 
Scalable Table Batch Writing for Flink (#3384)
885c2e091 is described below

commit 885c2e0919c7b3afa28769dca037ea3e8fd75aad
Author: Wencong Liu <[email protected]>
AuthorDate: Fri May 31 14:15:28 2024 +0800

    [flink] Introduce Range Partition And Sort in Append Scalable Table Batch 
Writing for Flink (#3384)
---
 docs/content/flink/sql-write.md                    |  32 +++
 .../generated/flink_connector_configuration.html   |  24 ++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  37 +++
 .../paimon/flink/action/SortCompactAction.java     |  30 ++-
 .../apache/paimon/flink/shuffle/RangeShuffle.java  |  22 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 110 +++++++++
 .../paimon/flink/sink/FlinkTableSinkBase.java      |  11 +-
 .../apache/paimon/flink/sorter/HilbertSorter.java  |  11 +-
 .../apache/paimon/flink/sorter/OrderSorter.java    |  12 +-
 .../org/apache/paimon/flink/sorter/SortUtils.java  | 148 ++++++-----
 .../apache/paimon/flink/sorter/TableSortInfo.java  | 168 +++++++++++++
 .../apache/paimon/flink/sorter/TableSorter.java    |  15 +-
 .../apache/paimon/flink/sorter/ZorderSorter.java   |  11 +-
 ...artitionAndSortForUnawareBucketTableITCase.java | 270 +++++++++++++++++++++
 .../paimon/flink/sorter/TableSortInfoTest.java     | 160 ++++++++++++
 15 files changed, 962 insertions(+), 99 deletions(-)

diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md
index 8b275dbbc..f1e305ef4 100644
--- a/docs/content/flink/sql-write.md
+++ b/docs/content/flink/sql-write.md
@@ -49,6 +49,38 @@ snapshot expiration, and even partition expiration in Flink 
Sink (if it is confi
 
 For multiple jobs to write the same table, you can refer to [dedicated 
compaction job]({{< ref 
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) for more info.
 
+### Clustering
+
+In Paimon, clustering is a feature that allows you to cluster data in your 
[Append Table]({{< ref "append-table/append-table#Append Table" >}})
+based on the values of certain columns during the write process. This 
organization of data can significantly enhance the efficiency of downstream
+tasks when reading the data, as it enables faster and more targeted data 
retrieval. This feature is only supported for [Append Table]({{< ref 
"append-table/append-table#Append Table" >}})
+and batch execution mode.
+
+To utilize clustering, you can specify the columns you want to cluster when 
creating or writing to a table. Here's a simple example of how to enable 
clustering:
+
+```sql
+CREATE TABLE my_table (
+    a STRING,
+    b STRING,
+    c STRING,
+) WITH (
+  'sink.clustering.by-columns' = 'a,b',
+);
+```
+
+You can also use SQL hints to dynamically set clustering options:
+
+```sql
+INSERT INTO my_table /*+ OPTIONS('sink.clustering.by-columns' = 'a,b') */
+SELECT * FROM source;
+```
+
+The data is clustered using an automatically chosen strategy (such as ORDER, 
ZORDER, or HILBERT), but you can manually specify the clustering strategy
+by setting the `sink.clustering.strategy`. Clustering relies on sampling and 
sorting. If the clustering process takes too much time, you can decrease
+the total sample number by setting the `sink.clustering.sample-factor` or 
disable the sorting step by setting the `sink.clustering.sort-in-cluster` to 
false.
+
+You can refer to [FlinkConnectorOptions]({{< ref 
"maintenance/configurations#FlinkConnectorOptions" >}}) for more info about the 
configurations above.
+
 ## Overwriting the Whole Table
 
 For unpartitioned tables, Paimon supports overwriting the whole table.
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 4fb5fc271..6787cee3b 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -170,6 +170,30 @@ under the License.
             <td>Duration</td>
             <td>If no records flow in a partition of a stream for that amount 
of time, then that partition is considered "idle" and will not hold back the 
progress of watermarks in downstream operators.</td>
         </tr>
+        <tr>
+            <td><h5>sink.clustering.by-columns</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specifies the column name(s) used for comparison during range 
partitioning, in the format 'columnName1,columnName2'. If not set or set to an 
empty string, it indicates that the range partitioning feature is not enabled. 
This option will be effective only for bucket unaware table without primary 
keys and batch execution mode.</td>
+        </tr>
+        <tr>
+            <td><h5>sink.clustering.sample-factor</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Integer</td>
+            <td>Specifies the sample factor. Let S represent the total number 
of samples, F represent the sample factor, and P represent the sink 
parallelism, then S=F×P. The minimum allowed sample factor is 20.</td>
+        </tr>
+        <tr>
+            <td><h5>sink.clustering.sort-in-cluster</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Indicates whether to further sort data belonged to each sink 
task after range partitioning.</td>
+        </tr>
+        <tr>
+            <td><h5>sink.clustering.strategy</h5></td>
+            <td style="word-wrap: break-word;">"auto"</td>
+            <td>String</td>
+            <td>Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the 
z-order curve algorithm, hilbert curve algorithm, and basic type comparison 
algorithm, respectively. When not configured, it will automatically determine 
the algorithm based on the number of columns in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' 
for 5 or more columns.</td>
+        </tr>
         <tr>
             <td><h5>sink.committer-cpu</h5></td>
             <td style="word-wrap: break-word;">1.0</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 275ce836a..0256a5500 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -48,6 +48,8 @@ public class FlinkConnectorOptions {
 
     public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";
 
+    public static final int MIN_CLUSTERING_SAMPLE_FACTOR = 20;
+
     @ExcludeFromDocumentation("Confused without log system")
     public static final ConfigOption<String> LOG_SYSTEM =
             ConfigOptions.key("log.system")
@@ -387,6 +389,41 @@ public class FlinkConnectorOptions {
                                             "Both can be configured at the 
same time: 'done-partition,success-file'.")
                                     .build());
 
+    public static final ConfigOption<String> CLUSTERING_COLUMNS =
+            key("sink.clustering.by-columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Specifies the column name(s) used for comparison 
during range partitioning, in the format 'columnName1,columnName2'. "
+                                    + "If not set or set to an empty string, 
it indicates that the range partitioning feature is not enabled. "
+                                    + "This option will be effective only for 
bucket unaware table without primary keys and batch execution mode.");
+
+    public static final ConfigOption<String> CLUSTERING_STRATEGY =
+            key("sink.clustering.strategy")
+                    .stringType()
+                    .defaultValue("auto")
+                    .withDescription(
+                            "Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', "
+                                    + "corresponding to the z-order curve 
algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
+                                    + "respectively. When not configured, it 
will automatically determine the algorithm based on the number of columns "
+                                    + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
+                                    + "and 'hilbert' for 5 or more columns.");
+
+    public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
+            key("sink.clustering.sort-in-cluster")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Indicates whether to further sort data belonged 
to each sink task after range partitioning.");
+
+    public static final ConfigOption<Integer> CLUSTERING_SAMPLE_FACTOR =
+            key("sink.clustering.sample-factor")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "Specifies the sample factor. Let S represent the 
total number of samples, F represent the sample factor, "
+                                    + "and P represent the sink parallelism, 
then S=F×P. The minimum allowed sample factor is 20.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index efa2f386d..1cebe8bc1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -21,7 +21,9 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.SortCompactSinkBuilder;
+import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -116,8 +118,32 @@ public class SortCompactAction extends CompactAction {
         }
 
         DataStream<RowData> source = 
sourceBuilder.env(env).sourceBounded(true).build();
-        TableSorter sorter =
-                TableSorter.getSorter(env, source, fileStoreTable, 
sortStrategy, orderColumns);
+        int localSampleMagnification =
+                ((FileStoreTable) 
table).coreOptions().getLocalSampleMagnification();
+        if (localSampleMagnification < 20) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "the config '%s=%d' should not be set too 
small,greater than or equal to 20 is needed.",
+                            
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+                            localSampleMagnification));
+        }
+        String sinkParallelismValue =
+                
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        final int sinkParallelism =
+                sinkParallelismValue == null
+                        ? source.getParallelism()
+                        : Integer.parseInt(sinkParallelismValue);
+        TableSortInfo sortInfo =
+                new TableSortInfo.Builder()
+                        .setSortColumns(orderColumns)
+                        .setSortStrategy(OrderType.of(sortStrategy))
+                        .setSinkParallelism(sinkParallelism)
+                        .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
+                        .setGlobalSampleSize(sinkParallelism * 1000)
+                        .setRangeNumber(sinkParallelism * 10)
+                        .build();
+
+        TableSorter sorter = TableSorter.getSorter(env, source, 
fileStoreTable, sortInfo);
 
         new SortCompactSinkBuilder(fileStoreTable)
                 .forCompact(true)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 1883890ea..4db56601a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -164,7 +164,7 @@ public class RangeShuffle {
                                         new 
AssignRangeIndexOperator.RangePartitioner(rangeNum),
                                         new 
AssignRangeIndexOperator.Tuple2KeySelector<>()),
                                 StreamExchangeMode.BATCH),
-                        "REMOVE KEY",
+                        "REMOVE RANGE INDEX",
                         new RemoveRangeIndexOperator<>(),
                         input.getOutputType(),
                         outParallelism));
@@ -373,12 +373,16 @@ public class RangeShuffle {
 
         @Override
         public void processElement2(StreamRecord<Tuple2<T, RowData>> 
streamRecord) {
-            if (keyIndex == null || keyIndex.isEmpty()) {
-                throw new RuntimeException(
-                        "There should be one data from the first input. And 
boundaries should not be empty.");
+            if (keyIndex == null) {
+                throw new RuntimeException("There should be one data from the 
first input.");
+            }
+            // If the range number is 1, the range index will be 0 for all 
records.
+            if (keyIndex.isEmpty()) {
+                collector.collect(new Tuple2<>(0, streamRecord.getValue()));
+            } else {
+                Tuple2<T, RowData> row = streamRecord.getValue();
+                collector.collect(new Tuple2<>(binarySearch(row.f0), row));
             }
-            Tuple2<T, RowData> row = streamRecord.getValue();
-            collector.collect(new Tuple2<>(binarySearch(row.f0), row));
         }
 
         @Override
@@ -408,7 +412,7 @@ public class RangeShuffle {
             // key not found, but the low index is the target
             // bucket, since the boundaries are the upper bound
             return low > lastIndex
-                    ? keyIndex.get(lastIndex).getRight().get()
+                    ? (keyIndex.get(lastIndex).getRight().get() + 1)
                     : keyIndex.get(low).getRight().get();
         }
 
@@ -438,8 +442,8 @@ public class RangeShuffle {
             @Override
             public int partition(Integer key, int numPartitions) {
                 Preconditions.checkArgument(
-                        numPartitions < totalRangeNum,
-                        "Num of subPartitions should < totalRangeNum: " + 
totalRangeNum);
+                        numPartitions <= totalRangeNum,
+                        "Num of subPartitions should <= totalRangeNum: " + 
totalRangeNum);
                 int partition = key / (totalRangeNum / numPartitions);
                 return Math.min(numPartitions - 1, partition);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index a22de9316..8baee5ac1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -20,7 +20,11 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -34,14 +38,27 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
+import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ZORDER;
+import static org.apache.paimon.table.BucketMode.BUCKET_UNAWARE;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
  * DataStream API for building Flink Sink.
@@ -51,12 +68,15 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 @Public
 public class FlinkSinkBuilder {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSinkBuilder.class);
+
     private final FileStoreTable table;
 
     private DataStream<RowData> input;
     @Nullable private Map<String, String> overwritePartition;
     @Nullable private Integer parallelism;
     private Boolean boundedInput = null;
+    @Nullable private TableSortInfo tableSortInfo;
 
     // ============== for extension ==============
 
@@ -119,8 +139,88 @@ public class FlinkSinkBuilder {
         return this;
     }
 
+    /** Clustering the input data if possible. */
+    public FlinkSinkBuilder clusteringIfPossible(
+            String clusteringColumns,
+            String clusteringStrategy,
+            boolean sortInCluster,
+            int sampleFactor) {
+        // The clustering will be skipped if the clustering columns are empty 
or the execution
+        // mode is STREAMING or the table type is illegal.
+        if (clusteringColumns == null || clusteringColumns.isEmpty()) {
+            return this;
+        }
+        checkState(input != null, "The input stream should be specified 
earlier.");
+        if (boundedInput == null) {
+            boundedInput = !FlinkSink.isStreaming(input);
+        }
+        if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
+            LOG.warn(
+                    "Clustering is enabled; however, it has been skipped as "
+                            + "it only supports the bucket unaware table 
without primary keys and "
+                            + "BATCH execution mode.");
+            return this;
+        }
+        // If the clustering is not skipped, check the clustering column names 
and sample
+        // factor value.
+        List<String> columns = Arrays.asList(clusteringColumns.split(","));
+        List<String> fieldNames = table.schema().fieldNames();
+        checkState(
+                new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
+                String.format(
+                        "Field names %s should contains all clustering column 
names %s.",
+                        fieldNames, columns));
+        checkState(
+                sampleFactor >= MIN_CLUSTERING_SAMPLE_FACTOR,
+                "The minimum allowed "
+                        + CLUSTERING_SAMPLE_FACTOR.key()
+                        + " is "
+                        + MIN_CLUSTERING_SAMPLE_FACTOR
+                        + ".");
+        TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
+        if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
+            if (columns.size() == 1) {
+                sortInfoBuilder.setSortStrategy(ORDER);
+            } else if (columns.size() < 5) {
+                sortInfoBuilder.setSortStrategy(ZORDER);
+            } else {
+                sortInfoBuilder.setSortStrategy(HILBERT);
+            }
+        } else {
+            sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
+        }
+        int upstreamParallelism = input.getParallelism();
+        String sinkParallelismValue =
+                
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        int sinkParallelism =
+                sinkParallelismValue == null
+                        ? upstreamParallelism
+                        : Integer.parseInt(sinkParallelismValue);
+        sortInfoBuilder
+                .setSortColumns(columns)
+                .setSortInCluster(sortInCluster)
+                .setSinkParallelism(sinkParallelism);
+        int globalSampleSize = sinkParallelism * sampleFactor;
+        // If the adaptive scheduler is not enabled, the local sample size is 
determined by the
+        // division of global sample size by the upstream parallelism, which 
limits total
+        // received data of global sample node. If the adaptive scheduler is 
enabled, the
+        // local sample size will equal to sinkParallelism * minimum sample 
factor.
+        int localSampleSize =
+                upstreamParallelism > 0
+                        ? Math.max(sampleFactor, globalSampleSize / 
upstreamParallelism)
+                        : sinkParallelism * MIN_CLUSTERING_SAMPLE_FACTOR;
+        this.tableSortInfo =
+                sortInfoBuilder
+                        .setRangeNumber(sinkParallelism)
+                        .setGlobalSampleSize(globalSampleSize)
+                        .setLocalSampleSize(localSampleSize)
+                        .build();
+        return this;
+    }
+
     /** Build {@link DataStreamSink}. */
     public DataStreamSink<?> build() {
+        input = trySortInput(input);
         DataStream<InternalRow> input = MapToInternalRow.map(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
             input =
@@ -181,4 +281,14 @@ public class FlinkSinkBuilder {
                         table, overwritePartition, logSinkFunction, 
parallelism, boundedInput)
                 .sinkFrom(input);
     }
+
+    private DataStream<RowData> trySortInput(DataStream<RowData> input) {
+        if (tableSortInfo != null) {
+            TableSorter sorter =
+                    TableSorter.getSorter(
+                            input.getExecutionEnvironment(), input, table, 
tableSortInfo);
+            return sorter.sort();
+        }
+        return input;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index b6a944703..4202717c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -45,6 +45,10 @@ import java.util.Map;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 
 /** Table sink to create sink. */
@@ -131,7 +135,12 @@ public abstract class FlinkTableSinkBase
                                     new DataStream<>(
                                             
dataStream.getExecutionEnvironment(),
                                             dataStream.getTransformation()))
-                            .inputBounded(context.isBounded());
+                            .inputBounded(context.isBounded())
+                            .clusteringIfPossible(
+                                    conf.get(CLUSTERING_COLUMNS),
+                                    conf.get(CLUSTERING_STRATEGY),
+                                    conf.get(CLUSTERING_SORT_IN_CLUSTER),
+                                    conf.get(CLUSTERING_SAMPLE_FACTOR));
                     if (overwrite) {
                         builder.overwrite(staticPartitions);
                     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
index 944e0fc43..26c698f5e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * This is a table sorter which will sort the records by the hilbert curve of 
specified columns. It
@@ -49,12 +48,15 @@ public class HilbertSorter extends TableSorter {
     private static final RowType KEY_TYPE =
             new RowType(Collections.singletonList(new DataField(0, "H_INDEX", 
DataTypes.BYTES())));
 
+    private final TableSortInfo tableSortInfo;
+
     public HilbertSorter(
             StreamExecutionEnvironment batchTEnv,
             DataStream<RowData> origin,
             FileStoreTable table,
-            List<String> colNames) {
-        super(batchTEnv, origin, table, colNames);
+            TableSortInfo tableSortInfo) {
+        super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+        this.tableSortInfo = tableSortInfo;
     }
 
     @Override
@@ -99,6 +101,7 @@ public class HilbertSorter extends TableSorter {
                         return Arrays.copyOf(hilbert, hilbert.length);
                     }
                 },
-                GenericRow::of);
+                GenericRow::of,
+                tableSortInfo);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index dadd4c413..406195b6f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -31,19 +31,20 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 
-import java.util.List;
-
 import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
 
 /** Alphabetical order sorter to sort records by the given `orderColNames`. */
 public class OrderSorter extends TableSorter {
 
+    private final TableSortInfo tableSortInfo;
+
     public OrderSorter(
             StreamExecutionEnvironment batchTEnv,
             DataStream<RowData> origin,
             FileStoreTable table,
-            List<String> orderColNames) {
-        super(batchTEnv, origin, table, orderColNames);
+            TableSortInfo tableSortInfo) {
+        super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
+        this.tableSortInfo = tableSortInfo;
     }
 
     @Override
@@ -75,6 +76,7 @@ public class OrderSorter extends TableSorter {
                         return keyProjection.apply(new 
FlinkRowWrapper(value)).copy();
                     }
                 },
-                row -> row);
+                row -> row,
+                tableSortInfo);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 9a1dbb729..79e9f1298 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sorter;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.shuffle.RangeShuffle;
@@ -38,7 +37,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -72,6 +76,7 @@ public class SortUtils {
      * @param shuffleKeyAbstract abstract the key from the input `RowData`
      * @param convertor convert the `KEY` to the sort key, then we can sort in
      *     `BinaryExternalSortBuffer`.
+     * @param tableSortInfo the necessary info of table sort.
      * @return the global sorted data stream
      * @param <KEY> the KEY type in range shuffle
      */
@@ -82,34 +87,15 @@ public class SortUtils {
             final TypeInformation<KEY> keyTypeInformation,
             final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
             final KeyAbstract<KEY> shuffleKeyAbstract,
-            final ShuffleKeyConvertor<KEY> convertor) {
+            final ShuffleKeyConvertor<KEY> convertor,
+            final TableSortInfo tableSortInfo) {
 
         final RowType valueRowType = table.rowType();
-        final int parallelism = inputStream.getParallelism();
         CoreOptions options = table.coreOptions();
-
-        String sinkParallelismValue =
-                
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
-        final int sinkParallelism =
-                sinkParallelismValue == null
-                        ? inputStream.getParallelism()
-                        : Integer.parseInt(sinkParallelismValue);
-        if (sinkParallelism == -1) {
-            throw new UnsupportedOperationException(
-                    "The adaptive batch scheduler is not supported. Please set 
the sink parallelism using the key: "
-                            + FlinkConnectorOptions.SINK_PARALLELISM.key());
-        }
-        int localSampleMagnification = options.getLocalSampleMagnification();
-        if (localSampleMagnification < 20) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "the config '%s=%d' should not be set too 
small,greater than or equal to 20 is needed.",
-                            
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
-                            localSampleMagnification));
-        }
-        final int localSampleSize = sinkParallelism * localSampleMagnification;
-        final int globalSampleSize = sinkParallelism * 1000;
-        final int rangeNum = sinkParallelism * 10;
+        final int sinkParallelism = tableSortInfo.getSinkParallelism();
+        final int localSampleSize = tableSortInfo.getLocalSampleSize();
+        final int globalSampleSize = tableSortInfo.getGlobalSampleSize();
+        final int rangeNum = tableSortInfo.getRangeNumber();
         int keyFieldCount = sortKeyType.getFieldCount();
         int valueFieldCount = valueRowType.getFieldCount();
         final int[] valueProjectionMap = new int[valueFieldCount];
@@ -145,10 +131,11 @@ public class SortUtils {
                                     }
                                 },
                                 new TupleTypeInfo<>(keyTypeInformation, 
inputStream.getType()))
-                        .setParallelism(parallelism);
+                        .setParallelism(inputStream.getParallelism());
 
         // range shuffle by key
-        return RangeShuffle.rangeShuffleByKey(
+        DataStream<Tuple2<KEY, RowData>> rangeShuffleResult =
+                RangeShuffle.rangeShuffleByKey(
                         inputWithKey,
                         shuffleKeyComparator,
                         keyTypeInformation,
@@ -157,45 +144,52 @@ public class SortUtils {
                         rangeNum,
                         sinkParallelism,
                         valueRowType,
-                        options.sortBySize())
-                .map(
-                        a -> new JoinedRow(convertor.apply(a.f0), new 
FlinkRowWrapper(a.f1)),
-                        internalRowType)
-                .setParallelism(sinkParallelism)
-                // sort the output locally by `SortOperator`
-                .transform(
-                        "LOCAL SORT",
-                        internalRowType,
-                        new SortOperator(
-                                sortKeyType,
-                                longRowType,
-                                options.writeBufferSize(),
-                                options.pageSize(),
-                                options.localSortMaxNumFileHandles(),
-                                options.spillCompression(),
-                                sinkParallelism,
-                                options.writeBufferSpillDiskSize()))
-                .setParallelism(sinkParallelism)
-                // remove the key column from every row
-                .map(
-                        new RichMapFunction<InternalRow, InternalRow>() {
-
-                            private transient KeyProjectedRow keyProjectedRow;
-
-                            @Override
-                            public void open(Configuration parameters) {
-                                keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
-                            }
-
-                            @Override
-                            public InternalRow map(InternalRow value) {
-                                return keyProjectedRow.replaceRow(value);
-                            }
-                        },
-                        InternalTypeInfo.fromRowType(valueRowType))
-                .setParallelism(sinkParallelism)
-                .map(FlinkRowData::new, inputStream.getType())
-                .setParallelism(sinkParallelism);
+                        options.sortBySize());
+        if (tableSortInfo.isSortInCluster()) {
+            return rangeShuffleResult
+                    .map(
+                            a -> new JoinedRow(convertor.apply(a.f0), new 
FlinkRowWrapper(a.f1)),
+                            internalRowType)
+                    .setParallelism(sinkParallelism)
+                    // sort the output locally by `SortOperator`
+                    .transform(
+                            "LOCAL SORT",
+                            internalRowType,
+                            new SortOperator(
+                                    sortKeyType,
+                                    longRowType,
+                                    options.writeBufferSize(),
+                                    options.pageSize(),
+                                    options.localSortMaxNumFileHandles(),
+                                    options.spillCompression(),
+                                    sinkParallelism,
+                                    options.writeBufferSpillDiskSize()))
+                    .setParallelism(sinkParallelism)
+                    // remove the key column from every row
+                    .map(
+                            new RichMapFunction<InternalRow, InternalRow>() {
+
+                                private transient KeyProjectedRow 
keyProjectedRow;
+
+                                @Override
+                                public void open(Configuration parameters) {
+                                    keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
+                                }
+
+                                @Override
+                                public InternalRow map(InternalRow value) {
+                                    return keyProjectedRow.replaceRow(value);
+                                }
+                            },
+                            InternalTypeInfo.fromRowType(valueRowType))
+                    .setParallelism(sinkParallelism)
+                    .map(FlinkRowData::new, inputStream.getType())
+                    .setParallelism(sinkParallelism);
+        } else {
+            return rangeShuffleResult
+                    .transform("REMOVE KEY", inputStream.getType(), new 
RemoveKeyOperator<>())
+                    .setParallelism(sinkParallelism);
+        }
     }
 
     /** Abstract key from a row data. */
@@ -206,4 +200,24 @@ public class SortUtils {
     }
 
     interface ShuffleKeyConvertor<KEY> extends Function<KEY, InternalRow>, 
Serializable {}
+
+    /** Remove the abstract key. */
+    private static class RemoveKeyOperator<T> extends 
TableStreamOperator<RowData>
+            implements OneInputStreamOperator<Tuple2<T, RowData>, RowData> {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient Collector<RowData> collector;
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            this.collector = new StreamRecordCollector<>(output);
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<T, RowData>> 
streamRecord) {
+            collector.collect(streamRecord.getValue().f1);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
new file mode 100644
index 000000000..16760bd35
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSortInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link TableSortInfo} is used to indicate the configuration details for 
table data sorting. This
+ * includes information about which columns to sort by, the sorting strategy 
(e.g., order, Z-order),
+ * whether to sort within each cluster, and sample sizes for local and global 
sample nodes.
+ */
+public class TableSortInfo {
+
+    private final List<String> sortColumns;
+
+    private final OrderType sortStrategy;
+
+    private final boolean sortInCluster;
+
+    private final int rangeNumber;
+
+    private final int sinkParallelism;
+
+    private final int localSampleSize;
+
+    private final int globalSampleSize;
+
+    private TableSortInfo(
+            List<String> sortColumns,
+            OrderType sortStrategy,
+            boolean sortInCluster,
+            int rangeNumber,
+            int sinkParallelism,
+            int localSampleSize,
+            int globalSampleSize) {
+        this.sortColumns = sortColumns;
+        this.sortStrategy = sortStrategy;
+        this.sortInCluster = sortInCluster;
+        this.rangeNumber = rangeNumber;
+        this.sinkParallelism = sinkParallelism;
+        this.localSampleSize = localSampleSize;
+        this.globalSampleSize = globalSampleSize;
+    }
+
+    public List<String> getSortColumns() {
+        return sortColumns;
+    }
+
+    public OrderType getSortStrategy() {
+        return sortStrategy;
+    }
+
+    public boolean isSortInCluster() {
+        return sortInCluster;
+    }
+
+    public int getRangeNumber() {
+        return rangeNumber;
+    }
+
+    public int getLocalSampleSize() {
+        return localSampleSize;
+    }
+
+    public int getGlobalSampleSize() {
+        return globalSampleSize;
+    }
+
+    public int getSinkParallelism() {
+        return sinkParallelism;
+    }
+
+    /** Builder for {@link TableSortInfo}. */
+    public static class Builder {
+
+        private List<String> sortColumns = Collections.emptyList();
+
+        private OrderType sortStrategy = OrderType.ORDER;
+
+        private boolean sortInCluster = true;
+
+        private int rangeNumber = -1;
+
+        private int sinkParallelism = -1;
+
+        private int localSampleSize = -1;
+
+        private int globalSampleSize = -1;
+
+        public Builder setSortColumns(List<String> sortColumns) {
+            this.sortColumns = sortColumns;
+            return this;
+        }
+
+        public Builder setSortStrategy(OrderType sortStrategy) {
+            this.sortStrategy = sortStrategy;
+            return this;
+        }
+
+        public Builder setSortInCluster(boolean sortInCluster) {
+            this.sortInCluster = sortInCluster;
+            return this;
+        }
+
+        public Builder setRangeNumber(int rangeNumber) {
+            this.rangeNumber = rangeNumber;
+            return this;
+        }
+
+        public Builder setSinkParallelism(int sinkParallelism) {
+            this.sinkParallelism = sinkParallelism;
+            return this;
+        }
+
+        public Builder setLocalSampleSize(int localSampleSize) {
+            this.localSampleSize = localSampleSize;
+            return this;
+        }
+
+        public Builder setGlobalSampleSize(int globalSampleSize) {
+            this.globalSampleSize = globalSampleSize;
+            return this;
+        }
+
+        public TableSortInfo build() {
+            checkArgument(!sortColumns.isEmpty(), "Sort columns cannot be 
empty");
+            checkNotNull(sortStrategy, "Sort strategy cannot be null");
+            checkArgument(
+                    sinkParallelism > 0,
+                    "The sink parallelism must be specified when sorting the 
table data. Please set it using the key: %s",
+                    FlinkConnectorOptions.SINK_PARALLELISM.key());
+            checkArgument(rangeNumber > 0, "Range number must be positive");
+            checkArgument(localSampleSize > 0, "Local sample size must be 
positive");
+            checkArgument(globalSampleSize > 0, "Global sample size must be 
positive");
+            return new TableSortInfo(
+                    sortColumns,
+                    sortStrategy,
+                    sortInCluster,
+                    rangeNumber,
+                    sinkParallelism,
+                    localSampleSize,
+                    globalSampleSize);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index e4f85ea9d..a0d4b6af2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -70,21 +70,22 @@ public abstract class TableSorter {
             StreamExecutionEnvironment batchTEnv,
             DataStream<RowData> origin,
             FileStoreTable fileStoreTable,
-            String sortStrategy,
-            List<String> orderColumns) {
-        switch (OrderType.of(sortStrategy)) {
+            TableSortInfo sortInfo) {
+        OrderType sortStrategy = sortInfo.getSortStrategy();
+        switch (sortStrategy) {
             case ORDER:
-                return new OrderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
+                return new OrderSorter(batchTEnv, origin, fileStoreTable, 
sortInfo);
             case ZORDER:
-                return new ZorderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
+                return new ZorderSorter(batchTEnv, origin, fileStoreTable, 
sortInfo);
             case HILBERT:
-                return new HilbertSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
+                return new HilbertSorter(batchTEnv, origin, fileStoreTable, 
sortInfo);
             default:
                 throw new IllegalArgumentException("cannot match order type: " 
+ sortStrategy);
         }
     }
 
-    enum OrderType {
+    /** The order type of table sort. */
+    public enum OrderType {
         ORDER("order"),
         ZORDER("zorder"),
         HILBERT("hilbert");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index bc3c7f5c5..fad66f364 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * This is a table sorter which will sort the records by the z-order of 
specified columns. It works
@@ -49,12 +48,15 @@ public class ZorderSorter extends TableSorter {
     private static final RowType KEY_TYPE =
             new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", 
DataTypes.BYTES())));
 
+    private final TableSortInfo sortInfo;
+
     public ZorderSorter(
             StreamExecutionEnvironment batchTEnv,
             DataStream<RowData> origin,
             FileStoreTable table,
-            List<String> zOrderColNames) {
-        super(batchTEnv, origin, table, zOrderColNames);
+            TableSortInfo sortInfo) {
+        super(batchTEnv, origin, table, sortInfo.getSortColumns());
+        this.sortInfo = sortInfo;
     }
 
     @Override
@@ -102,6 +104,7 @@ public class ZorderSorter extends TableSorter {
                         return Arrays.copyOf(zorder, zorder.length);
                     }
                 },
-                GenericRow::of);
+                GenericRow::of,
+                sortInfo);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
new file mode 100644
index 000000000..c65f4ac12
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RangePartitionAndSortForUnawareBucketTableITCase.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AppendOnlyFileStoreScan;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * The IT test for the range partitioning and sort batch writing for 
append-only bucket unaware
+ * tables.
+ */
+public class RangePartitionAndSortForUnawareBucketTableITCase extends 
CatalogITCaseBase {
+
+    private static final int SINK_ROW_NUMBER = 1000;
+
+    @Test
+    public void testSortConfigurationChecks() {
+        batchSql(
+                "CREATE TEMPORARY TABLE source1 (col1 INT, col2 INT, col3 INT, 
col4 INT) "
+                        + "WITH ('connector'='values', 'bounded'='true')");
+        batchSql("CREATE TABLE IF NOT EXISTS sink1 (col1 INT, col2 INT, col3 
INT, col4 INT)");
+        streamSqlIter(
+                "CREATE TEMPORARY TABLE source2 (col1 INT, col2 INT, col3 INT, 
col4 INT) WITH ('connector'='values')");
+        streamSqlIter("CREATE TABLE IF NOT EXISTS sink2 (col1 INT, col2 INT, 
col3 INT, col4 INT)");
+        // 1. Check the sort columns.
+        assertThatExceptionOfType(RuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO sink1 /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1,xx1') */ "
+                                                + "SELECT * FROM source1"))
+                .withMessageContaining("should contains all clustering column 
names");
+        // 2. Check the sample factor.
+        assertThatExceptionOfType(RuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO sink1 /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1', "
+                                                + 
"'sink.clustering.sample-factor' = '10') */ SELECT * "
+                                                + "FROM source1"))
+                .withMessageContaining("The minimum allowed 
sink.clustering.sample-factor");
+        // 3. Check the sink parallelism.
+        batchSql(
+                "CREATE TEMPORARY TABLE source3 (col1 INT, col2 INT) WITH 
('connector'='values', 'bounded'='true')");
+        assertThatExceptionOfType(RuntimeException.class)
+                .isThrownBy(
+                        () ->
+                                batchSql(
+                                        "INSERT INTO sink1 /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1') */ "
+                                                + "SELECT source1.col1, 
source1.col2, source3.col1, "
+                                                + "source3.col2 FROM source1 
JOIN source3 ON source1.col1 = source3.col1"))
+                .withMessageContaining(
+                        "The sink parallelism must be specified when sorting 
the table data");
+    }
+
+    @Test
+    public void testRangePartition() throws Exception {
+        List<Row> inputRows = generateSinkRows();
+        String id = TestValuesTableFactory.registerData(inputRows);
+        batchSql(
+                "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 
INT, col4 INT) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+        batchSql(
+                "INSERT INTO test_table /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1', "
+                        + "'sink.parallelism' = '10', 
'sink.clustering.sort-in-cluster' = 'false') */ "
+                        + "SELECT * FROM test_source");
+        List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+        assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+        FileStoreTable testStoreTable = paimonTable("test_table");
+        List<ManifestEntry> files = 
testStoreTable.store().newScan().plan().files();
+        assertThat(files.size()).isEqualTo(10);
+        List<Tuple2<Integer, Integer>> minMaxOfEachFile = new ArrayList<>();
+        for (ManifestEntry file : files) {
+            DataSplit dataSplit =
+                    DataSplit.builder()
+                            .withPartition(file.partition())
+                            .withBucket(file.bucket())
+                            
.withDataFiles(Collections.singletonList(file.file()))
+                            .withBucketPath("/temp/xxx")
+                            .build();
+            final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+            final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+            testStoreTable
+                    .newReadBuilder()
+                    .newRead()
+                    .createReader(dataSplit)
+                    .forEachRemaining(
+                            internalRow -> {
+                                int result = internalRow.getInt(0);
+                                min.set(Math.min(min.get(), result));
+                                max.set(Math.max(max.get(), result));
+                            });
+            minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+        }
+        minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+        Tuple2<Integer, Integer> preResult = minMaxOfEachFile.get(0);
+        for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+            Tuple2<Integer, Integer> currentResult = 
minMaxOfEachFile.get(index);
+            assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+            assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+            assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+        }
+    }
+
+    @Test
+    public void testRangePartitionAndSortWithOrderStrategy() throws Exception {
+        List<Row> inputRows = generateSinkRows();
+        String id = TestValuesTableFactory.registerData(inputRows);
+        batchSql(
+                "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 
INT, col4 INT) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+        batchSql(
+                "INSERT INTO test_table /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1', "
+                        + "'sink.parallelism' = '10', 
'sink.clustering.strategy' = 'order') */ "
+                        + "SELECT * FROM test_source");
+        List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+        assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+        FileStoreTable testStoreTable = paimonTable("test_table");
+        List<ManifestEntry> files = 
testStoreTable.store().newScan().plan().files();
+        assertThat(files.size()).isEqualTo(10);
+        List<Tuple2<Integer, Integer>> minMaxOfEachFile = new ArrayList<>();
+        for (ManifestEntry file : files) {
+            DataSplit dataSplit =
+                    DataSplit.builder()
+                            .withPartition(file.partition())
+                            .withBucket(file.bucket())
+                            
.withDataFiles(Collections.singletonList(file.file()))
+                            .withBucketPath("/temp/xxx")
+                            .build();
+            final AtomicInteger min = new AtomicInteger(Integer.MAX_VALUE);
+            final AtomicInteger max = new AtomicInteger(Integer.MIN_VALUE);
+            final AtomicInteger current = new AtomicInteger(Integer.MIN_VALUE);
+            testStoreTable
+                    .newReadBuilder()
+                    .newRead()
+                    .createReader(dataSplit)
+                    .forEachRemaining(
+                            internalRow -> {
+                                int result = internalRow.getInt(0);
+                                min.set(Math.min(min.get(), result));
+                                max.set(Math.max(max.get(), result));
+                                
Assertions.assertThat(result).isGreaterThanOrEqualTo(current.get());
+                                current.set(result);
+                            });
+            minMaxOfEachFile.add(Tuple2.of(min.get(), max.get()));
+        }
+        minMaxOfEachFile.sort(Comparator.comparing(o -> o.f0));
+        Tuple2<Integer, Integer> preResult = minMaxOfEachFile.get(0);
+        for (int index = 1; index < minMaxOfEachFile.size(); ++index) {
+            Tuple2<Integer, Integer> currentResult = 
minMaxOfEachFile.get(index);
+            assertThat(currentResult.f0).isGreaterThanOrEqualTo(0);
+            assertThat(currentResult.f1).isLessThanOrEqualTo(SINK_ROW_NUMBER);
+            assertThat(currentResult.f0).isGreaterThanOrEqualTo(preResult.f1);
+        }
+    }
+
+    @Test
+    public void testRangePartitionAndSortWithZOrderStrategy() throws Exception 
{
+        List<Row> inputRows = generateSinkRows();
+        String id = TestValuesTableFactory.registerData(inputRows);
+        batchSql(
+                "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 
INT, col4 INT) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+        batchSql(
+                "INSERT INTO test_table /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1', "
+                        + "'sink.parallelism' = '10', 
'sink.clustering.strategy' = 'zorder') */ "
+                        + "SELECT * FROM test_source");
+        List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+        assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+        FileStoreTable testStoreTable = paimonTable("test_table");
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(testStoreTable.rowType());
+        Predicate predicate = predicateBuilder.between(0, 100, 200);
+        List<ManifestEntry> files = 
testStoreTable.store().newScan().plan().files();
+        assertThat(files.size()).isEqualTo(10);
+        List<ManifestEntry> filesFilter =
+                ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+    }
+
+    @Test
+    public void testRangePartitionAndSortWithHilbertStrategy() throws 
Exception {
+        List<Row> inputRows = generateSinkRows();
+        String id = TestValuesTableFactory.registerData(inputRows);
+        batchSql(
+                "CREATE TEMPORARY TABLE test_source (col1 INT, col2 INT, col3 
INT, col4 INT) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+        batchSql(
+                "INSERT INTO test_table /*+ 
OPTIONS('sink.clustering.by-columns' = 'col1,col2', "
+                        + "'sink.parallelism' = '10', 
'sink.clustering.strategy' = 'hilbert') */ "
+                        + "SELECT * FROM test_source");
+        List<Row> sinkRows = batchSql("SELECT * FROM test_table");
+        assertThat(sinkRows.size()).isEqualTo(SINK_ROW_NUMBER);
+        FileStoreTable testStoreTable = paimonTable("test_table");
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(testStoreTable.rowType());
+        Predicate predicate = predicateBuilder.between(0, 100, 200);
+        List<ManifestEntry> files = 
testStoreTable.store().newScan().plan().files();
+        assertThat(files.size()).isEqualTo(10);
+        List<ManifestEntry> filesFilter =
+                ((AppendOnlyFileStoreScan) testStoreTable.store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+    }
+
+    private List<Row> generateSinkRows() {
+        List<Row> sinkRows = new ArrayList<>();
+        Random random = new Random();
+        for (int round = 0; round < SINK_ROW_NUMBER; round++) {
+            sinkRows.add(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            random.nextInt(SINK_ROW_NUMBER),
+                            random.nextInt(SINK_ROW_NUMBER),
+                            random.nextInt(SINK_ROW_NUMBER),
+                            random.nextInt(SINK_ROW_NUMBER)));
+        }
+        return sinkRows;
+    }
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS test_table (col1 INT, col2 INT, 
col3 INT, col4 INT)");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
new file mode 100644
index 000000000..e9e01baea
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/TableSortInfoTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sorter.TableSorter.OrderType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** The unit test for {@link TableSortInfo}. */
+public class TableSortInfoTest {
+
+    @Test
+    public void testTableSortInfoBuilderWithValidParameters() {
+        TableSortInfo tableSortInfo =
+                new TableSortInfo.Builder()
+                        .setSortColumns(Arrays.asList("column1", "column2"))
+                        .setSortStrategy(OrderType.ORDER)
+                        .setSortInCluster(true)
+                        .setRangeNumber(10)
+                        .setSinkParallelism(5)
+                        .setLocalSampleSize(100)
+                        .setGlobalSampleSize(500)
+                        .build();
+
+        assertThat(tableSortInfo.getSortColumns()).containsExactly("column1", 
"column2");
+        assertThat(tableSortInfo.getSortStrategy()).isEqualTo(OrderType.ORDER);
+        assertThat(tableSortInfo.isSortInCluster()).isTrue();
+        assertThat(tableSortInfo.getRangeNumber()).isEqualTo(10);
+        assertThat(tableSortInfo.getSinkParallelism()).isEqualTo(5);
+        assertThat(tableSortInfo.getLocalSampleSize()).isEqualTo(100);
+        assertThat(tableSortInfo.getGlobalSampleSize()).isEqualTo(500);
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithEmptySortColumns() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Collections.emptyList())
+                                        .setSortStrategy(OrderType.ORDER)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(10)
+                                        .setSinkParallelism(5)
+                                        .setLocalSampleSize(100)
+                                        .setGlobalSampleSize(500)
+                                        .build())
+                .withMessage("Sort columns cannot be empty");
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithNullSortStrategy() {
+        assertThatExceptionOfType(NullPointerException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Arrays.asList("column1", "column2"))
+                                        .setSortStrategy(null)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(10)
+                                        .setSinkParallelism(5)
+                                        .setLocalSampleSize(100)
+                                        .setGlobalSampleSize(500)
+                                        .build())
+                .withMessage("Sort strategy cannot be null");
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithNegativeRangeNumber() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Arrays.asList("column1", "column2"))
+                                        .setSortStrategy(OrderType.ORDER)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(-1)
+                                        .setSinkParallelism(5)
+                                        .setLocalSampleSize(100)
+                                        .setGlobalSampleSize(500)
+                                        .build())
+                .withMessage("Range number must be positive");
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithZeroSinkParallelism() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Arrays.asList("column1", "column2"))
+                                        .setSortStrategy(OrderType.ORDER)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(10)
+                                        .setSinkParallelism(0)
+                                        .setLocalSampleSize(100)
+                                        .setGlobalSampleSize(500)
+                                        .build())
+                .withMessageContaining(
+                        "The sink parallelism must be specified when sorting 
the table data. Please set it using the key: %s",
+                        FlinkConnectorOptions.SINK_PARALLELISM.key());
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithZeroLocalSampleSize() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Arrays.asList("column1", "column2"))
+                                        .setSortStrategy(OrderType.ORDER)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(10)
+                                        .setSinkParallelism(5)
+                                        .setLocalSampleSize(0) // This should 
trigger an exception
+                                        .setGlobalSampleSize(500)
+                                        .build())
+                .withMessage("Local sample size must be positive");
+    }
+
+    @Test
+    public void testTableSortInfoBuilderWithNegativeGlobalSampleSize() {
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                new TableSortInfo.Builder()
+                                        
.setSortColumns(Arrays.asList("column1", "column2"))
+                                        .setSortStrategy(OrderType.ORDER)
+                                        .setSortInCluster(true)
+                                        .setRangeNumber(10)
+                                        .setSinkParallelism(5)
+                                        .setLocalSampleSize(100)
+                                        .setGlobalSampleSize(-1) // This 
should trigger an exception
+                                        .build())
+                .withMessage("Global sample size must be positive");
+    }
+}

Reply via email to