This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d054a4c06 [flink] support order sort in flink compact action (#1904)
d054a4c06 is described below

commit d054a4c0680527578587fe95681990f00a8fda9d
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 30 13:56:09 2023 +0800

    [flink] support order sort in flink compact action (#1904)
---
 .../data/serializer/InternalRowSerializer.java     |   2 +-
 .../org/apache/paimon/sort/zorder/ZIndexer.java    |  50 ++++--
 .../paimon/flink/BinaryRowTypeSerializer.java      | 176 -------------------
 .../org/apache/paimon/flink/action/ActionBase.java |   2 +
 .../paimon/flink/action/SortCompactAction.java     |  13 +-
 .../apache/paimon/flink/shuffle/RangeShuffle.java  |  76 ++++----
 .../apache/paimon/flink/sorter/OrderSorter.java    |  84 +++++++++
 .../apache/paimon/flink/sorter/SortOperator.java   |  23 +--
 .../org/apache/paimon/flink/sorter/SortUtils.java  | 191 +++++++++++++++++++++
 .../apache/paimon/flink/sorter/TableSorter.java    |   3 +-
 .../apache/paimon/flink/sorter/ZorderSorter.java   |  47 ++++-
 .../paimon/flink/sorter/ZorderSorterUtils.java     | 156 -----------------
 .../flink/utils/InternalRowTypeSerializer.java     | 108 ++++++++++++
 .../paimon/flink/BinaryRowTypeSerializerTest.java  |  58 -------
 .../flink/action/OrderRewriteActionITCase.java     | 117 ++++++++++++-
 .../flink/utils/InternalRowSerializerTest.java     |  93 ++++++++++
 16 files changed, 729 insertions(+), 470 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index b0701be48..199d08b67 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -109,7 +109,7 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
     }
 
     @SuppressWarnings("unchecked")
-    private InternalRow copyRowData(InternalRow from, InternalRow reuse) {
+    public InternalRow copyRowData(InternalRow from, InternalRow reuse) {
         GenericRow ret;
         if (reuse instanceof GenericRow) {
             ret = (GenericRow) reuse;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java 
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
index 8c30a3b02..9701dd90e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.sort.zorder;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySegmentUtils;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -128,26 +130,42 @@ public class ZIndexer implements Serializable {
 
         @Override
         public ZProcessFunction visit(CharType charType) {
-            return (row, reuse) ->
-                    row.isNullAt(fieldIndex)
-                            ? NULL_BYTES
-                            : ZOrderByteUtils.stringToOrderedBytes(
-                                            
row.getString(fieldIndex).toString(),
-                                            PRIMITIVE_BUFFER_SIZE,
-                                            reuse)
-                                    .array();
+            return (row, reuse) -> {
+                BinaryString binaryString = row.getString(fieldIndex);
+
+                return row.isNullAt(fieldIndex)
+                        ? NULL_BYTES
+                        : ZOrderByteUtils.byteTruncateOrFill(
+                                        MemorySegmentUtils.getBytes(
+                                                binaryString.getSegments(),
+                                                binaryString.getOffset(),
+                                                Math.min(
+                                                        PRIMITIVE_BUFFER_SIZE,
+                                                        
binaryString.getSizeInBytes())),
+                                        PRIMITIVE_BUFFER_SIZE,
+                                        reuse)
+                                .array();
+            };
         }
 
         @Override
         public ZProcessFunction visit(VarCharType varCharType) {
-            return (row, reuse) ->
-                    row.isNullAt(fieldIndex)
-                            ? NULL_BYTES
-                            : ZOrderByteUtils.stringToOrderedBytes(
-                                            
row.getString(fieldIndex).toString(),
-                                            PRIMITIVE_BUFFER_SIZE,
-                                            reuse)
-                                    .array();
+            return (row, reuse) -> {
+                BinaryString binaryString = row.getString(fieldIndex);
+
+                return row.isNullAt(fieldIndex)
+                        ? NULL_BYTES
+                        : ZOrderByteUtils.byteTruncateOrFill(
+                                        MemorySegmentUtils.getBytes(
+                                                binaryString.getSegments(),
+                                                binaryString.getOffset(),
+                                                Math.min(
+                                                        PRIMITIVE_BUFFER_SIZE,
+                                                        
binaryString.getSizeInBytes())),
+                                        PRIMITIVE_BUFFER_SIZE,
+                                        reuse)
+                                .array();
+            };
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
deleted file mode 100644
index 892969583..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/BinaryRowTypeSerializer.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.BinaryRowSerializer;
-import org.apache.paimon.memory.MemorySegment;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/** A {@link TypeSerializer} to serialize {@link BinaryRow}. */
-public final class BinaryRowTypeSerializer extends TypeSerializer<BinaryRow> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final BinaryRowSerializer serializer;
-
-    public BinaryRowTypeSerializer(int numFields) {
-        this.serializer = new BinaryRowSerializer(numFields);
-    }
-
-    @Override
-    public boolean isImmutableType() {
-        return false;
-    }
-
-    @Override
-    public BinaryRowTypeSerializer duplicate() {
-        return new BinaryRowTypeSerializer(serializer.getArity());
-    }
-
-    @Override
-    public BinaryRow createInstance() {
-        return serializer.createInstance();
-    }
-
-    @Override
-    public BinaryRow copy(BinaryRow from) {
-        return serializer.copy(from);
-    }
-
-    @Override
-    public BinaryRow copy(BinaryRow from, BinaryRow reuse) {
-        return copy(from);
-    }
-
-    @Override
-    public int getLength() {
-        return -1;
-    }
-
-    @Override
-    public void serialize(BinaryRow record, DataOutputView target) throws 
IOException {
-        target.writeInt(record.getSizeInBytes());
-        target.write(record.toBytes());
-    }
-
-    @Override
-    public BinaryRow deserialize(DataInputView source) throws IOException {
-        return deserialize(createInstance(), source);
-    }
-
-    @Override
-    public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws 
IOException {
-        int len = source.readInt();
-        byte[] bytes = new byte[len];
-        source.readFully(bytes);
-        reuse.pointTo(MemorySegment.wrap(bytes), 0, len);
-        return reuse;
-    }
-
-    @Override
-    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
-        int length = source.readInt();
-        target.writeInt(length);
-        target.write(source, length);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        BinaryRowTypeSerializer that = (BinaryRowTypeSerializer) o;
-        return Objects.equals(serializer, that.serializer);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(serializer);
-    }
-
-    @Override
-    public TypeSerializerSnapshot<BinaryRow> snapshotConfiguration() {
-        return new BinaryRowTypeSerializerSnapshot(serializer.getArity());
-    }
-
-    /**
-     * {@link TypeSerializerSnapshot} for {@link BinaryRow}. It checks the 
compatibility of
-     * numFields without checking type.
-     */
-    public static class BinaryRowTypeSerializerSnapshot
-            implements TypeSerializerSnapshot<BinaryRow> {
-
-        private int numFields;
-
-        public BinaryRowTypeSerializerSnapshot() {}
-
-        private BinaryRowTypeSerializerSnapshot(int numFields) {
-            this.numFields = numFields;
-        }
-
-        @Override
-        public int getCurrentVersion() {
-            return 0;
-        }
-
-        @Override
-        public void writeSnapshot(DataOutputView out) throws IOException {
-            out.writeInt(numFields);
-        }
-
-        @Override
-        public void readSnapshot(int readVersion, DataInputView in, 
ClassLoader userCodeClassLoader)
-                throws IOException {
-            this.numFields = in.readInt();
-        }
-
-        @Override
-        public TypeSerializer<BinaryRow> restoreSerializer() {
-            return new BinaryRowTypeSerializer(numFields);
-        }
-
-        @Override
-        public TypeSerializerSchemaCompatibility<BinaryRow> 
resolveSchemaCompatibility(
-                TypeSerializer<BinaryRow> newSerializer) {
-            if (!(newSerializer instanceof BinaryRowTypeSerializer)) {
-                return TypeSerializerSchemaCompatibility.incompatible();
-            }
-
-            BinaryRowTypeSerializer other = (BinaryRowTypeSerializer) 
newSerializer;
-
-            if (numFields == other.serializer.getArity()) {
-                return TypeSerializerSchemaCompatibility.compatibleAsIs();
-            } else {
-                return TypeSerializerSchemaCompatibility.incompatible();
-            }
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index d9f839798..6a4477f47 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -60,6 +60,8 @@ public abstract class ActionBase implements Action {
         catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, 
catalogOptions);
         env = StreamExecutionEnvironment.getExecutionEnvironment();
+        // we enable object reuse, we copy the un-reusable object ourselves.
+        env.getConfig().enableObjectReuse();
         batchTEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
 
         // register flink catalog to table environment
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index ff5e1a90d..9b488476d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -35,6 +35,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -46,6 +48,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Compact with sort action. */
 public class SortCompactAction extends CompactAction {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SortCompactAction.class);
+
     private String sortStrategy;
     private List<String> orderColumns;
 
@@ -64,8 +68,6 @@ public class SortCompactAction extends CompactAction {
 
     @Override
     public void run() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         build(env);
         execute(env, "Sort Compact Job");
     }
@@ -74,8 +76,9 @@ public class SortCompactAction extends CompactAction {
         // only support batch sort yet
         if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                 != RuntimeExecutionMode.BATCH) {
-            throw new IllegalArgumentException(
-                    "Only support batch mode yet, please set 
-Dexecution.runtime-mode=BATCH");
+            LOG.warn(
+                    "Sort Compact only support batch mode yet. Please add 
-Dexecution.runtime-mode=BATCH. The action this time will shift to batch mode 
forcely.");
+            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         }
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
@@ -102,7 +105,7 @@ public class SortCompactAction extends CompactAction {
             sourceBuilder.withPredicate(partitionPredicate);
         }
 
-        String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
         if (scanParallelism != null) {
             sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 7318a883d..89b0729f6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.shuffle;
 
-import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SerializableSupplier;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -81,20 +81,21 @@ public class RangeShuffle {
      * <p>The streams except the sample and histogram process stream will been 
blocked, so the the
      * sample and histogram process stream does not care about 
requiredExchangeMode.
      */
-    public static <T> DataStream<Pair<T, RowData>> rangeShuffleByKey(
-            DataStream<Pair<T, RowData>> inputDataStream,
-            Comparator<T> keyComparator,
-            Class<T> keyClass,
+    public static <T> DataStream<Tuple2<T, RowData>> rangeShuffleByKey(
+            DataStream<Tuple2<T, RowData>> inputDataStream,
+            SerializableSupplier<Comparator<T>> keyComparator,
+            TypeInformation<T> keyTypeInformation,
             int sampleSize,
-            int rangeNum) {
-        Transformation<Pair<T, RowData>> input = 
inputDataStream.getTransformation();
+            int rangeNum,
+            int outParallelism) {
+        Transformation<Tuple2<T, RowData>> input = 
inputDataStream.getTransformation();
 
-        OneInputTransformation<Pair<T, RowData>, T> keyInput =
+        OneInputTransformation<Tuple2<T, RowData>, T> keyInput =
                 new OneInputTransformation<>(
                         input,
                         "ABSTRACT KEY",
-                        new StreamMap<>(Pair::getLeft),
-                        TypeInformation.of(keyClass),
+                        new StreamMap<>(a -> a.f0),
+                        keyTypeInformation,
                         input.getParallelism());
 
         // 1. Fixed size sample in each partitions.
@@ -103,8 +104,7 @@ public class RangeShuffle {
                         keyInput,
                         "LOCAL SAMPLE",
                         new LocalSampleOperator<>(sampleSize),
-                        new TupleTypeInfo<>(
-                                BasicTypeInfo.DOUBLE_TYPE_INFO, 
TypeInformation.of(keyClass)),
+                        new TupleTypeInfo<>(BasicTypeInfo.DOUBLE_TYPE_INFO, 
keyTypeInformation),
                         keyInput.getParallelism());
 
         // 2. Collect all the samples and gather them into a sorted key range.
@@ -113,14 +113,14 @@ public class RangeShuffle {
                         localSample,
                         "GLOBAL SAMPLE",
                         new GlobalSampleOperator<>(sampleSize, keyComparator, 
rangeNum),
-                        new ListTypeInfo<>(TypeInformation.of(keyClass)),
+                        new ListTypeInfo<>(keyTypeInformation),
                         1);
 
         // 3. Take range boundaries as broadcast input and take the tuple of 
partition id and
         // record as output.
         // The shuffle mode of input edge must be BATCH to avoid dead lock. See
         // DeadlockBreakupProcessor.
-        TwoInputTransformation<List<T>, Pair<T, RowData>, Tuple2<Integer, 
Pair<T, RowData>>>
+        TwoInputTransformation<List<T>, Tuple2<T, RowData>, Tuple2<Integer, 
Tuple2<T, RowData>>>
                 preparePartition =
                         new TwoInputTransformation<>(
                                 new PartitionTransformation<>(
@@ -150,7 +150,7 @@ public class RangeShuffle {
                         "REMOVE KEY",
                         new RemoveRangeIndexOperator<>(),
                         input.getOutputType(),
-                        input.getParallelism()));
+                        outParallelism));
     }
 
     /**
@@ -207,20 +207,25 @@ public class RangeShuffle {
 
         private final int numSample;
         private final int rangesNum;
-        private final Comparator<T> keyComparator;
+        private final SerializableSupplier<Comparator<T>> comparatorSupplier;
 
+        private transient Comparator<T> keyComparator;
         private transient Collector<List<T>> collector;
         private transient Sampler<T> sampler;
 
-        public GlobalSampleOperator(int numSample, Comparator<T> comparator, 
int rangesNum) {
+        public GlobalSampleOperator(
+                int numSample,
+                SerializableSupplier<Comparator<T>> comparatorSupplier,
+                int rangesNum) {
             this.numSample = numSample;
-            this.keyComparator = comparator;
+            this.comparatorSupplier = comparatorSupplier;
             this.rangesNum = rangesNum;
         }
 
         @Override
         public void open() throws Exception {
             super.open();
+            this.keyComparator = comparatorSupplier.get();
             this.sampler = new Sampler<>(numSample, 0L);
             this.collector = new StreamRecordCollector<>(output);
         }
@@ -262,25 +267,27 @@ public class RangeShuffle {
      * Tuple2 which includes range index and record from the other input 
itself as output.
      */
     private static class AssignRangeIndexOperator<T>
-            extends TableStreamOperator<Tuple2<Integer, Pair<T, RowData>>>
+            extends TableStreamOperator<Tuple2<Integer, Tuple2<T, RowData>>>
             implements TwoInputStreamOperator<
-                            List<T>, Pair<T, RowData>, Tuple2<Integer, Pair<T, 
RowData>>>,
+                            List<T>, Tuple2<T, RowData>, Tuple2<Integer, 
Tuple2<T, RowData>>>,
                     InputSelectable {
 
         private static final long serialVersionUID = 1L;
 
-        private transient List<T> boundaries;
-        private transient Collector<Tuple2<Integer, Pair<T, RowData>>> 
collector;
+        private final SerializableSupplier<Comparator<T>> 
keyComparatorSupplier;
 
-        private final Comparator<T> keyComparator;
+        private transient List<T> boundaries;
+        private transient Collector<Tuple2<Integer, Tuple2<T, RowData>>> 
collector;
+        private transient Comparator<T> keyComparator;
 
-        public AssignRangeIndexOperator(Comparator<T> comparator) {
-            this.keyComparator = comparator;
+        public AssignRangeIndexOperator(SerializableSupplier<Comparator<T>> 
keyComparatorSupplier) {
+            this.keyComparatorSupplier = keyComparatorSupplier;
         }
 
         @Override
         public void open() throws Exception {
             super.open();
+            this.keyComparator = keyComparatorSupplier.get();
             this.collector = new StreamRecordCollector<>(output);
         }
 
@@ -290,12 +297,12 @@ public class RangeShuffle {
         }
 
         @Override
-        public void processElement2(StreamRecord<Pair<T, RowData>> 
streamRecord) {
+        public void processElement2(StreamRecord<Tuple2<T, RowData>> 
streamRecord) {
             if (boundaries == null) {
                 throw new RuntimeException("There should be one data from the 
first input.");
             }
-            Pair<T, RowData> row = streamRecord.getValue();
-            collector.collect(new Tuple2<>(binarySearch(row.getLeft()), row));
+            Tuple2<T, RowData> row = streamRecord.getValue();
+            collector.collect(new Tuple2<>(binarySearch(row.f0), row));
         }
 
         @Override
@@ -326,12 +333,12 @@ public class RangeShuffle {
 
         /** A {@link KeySelector} to select by f0 of tuple2. */
         public static class Tuple2KeySelector<T>
-                implements KeySelector<Tuple2<Integer, Pair<T, RowData>>, 
Integer> {
+                implements KeySelector<Tuple2<Integer, Tuple2<T, RowData>>, 
Integer> {
 
             private static final long serialVersionUID = 1L;
 
             @Override
-            public Integer getKey(Tuple2<Integer, Pair<T, RowData>> tuple2) 
throws Exception {
+            public Integer getKey(Tuple2<Integer, Tuple2<T, RowData>> tuple2) 
throws Exception {
                 return tuple2.f0;
             }
         }
@@ -359,12 +366,13 @@ public class RangeShuffle {
     }
 
     /** Remove the range index and return the actual record. */
-    private static class RemoveRangeIndexOperator<T> extends 
TableStreamOperator<Pair<T, RowData>>
-            implements OneInputStreamOperator<Tuple2<Integer, Pair<T, 
RowData>>, Pair<T, RowData>> {
+    private static class RemoveRangeIndexOperator<T> extends 
TableStreamOperator<Tuple2<T, RowData>>
+            implements OneInputStreamOperator<
+                    Tuple2<Integer, Tuple2<T, RowData>>, Tuple2<T, RowData>> {
 
         private static final long serialVersionUID = 1L;
 
-        private transient Collector<Pair<T, RowData>> collector;
+        private transient Collector<Tuple2<T, RowData>> collector;
 
         @Override
         public void open() throws Exception {
@@ -373,7 +381,7 @@ public class RangeShuffle {
         }
 
         @Override
-        public void processElement(StreamRecord<Tuple2<Integer, Pair<T, 
RowData>>> streamRecord)
+        public void processElement(StreamRecord<Tuple2<Integer, Tuple2<T, 
RowData>>> streamRecord)
                 throws Exception {
             collector.collect(streamRecord.getValue().f1);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
new file mode 100644
index 000000000..e357aebd3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyComparatorSupplier;
+import org.apache.paimon.utils.Projection;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+import static 
org.apache.paimon.table.ChangelogWithKeyTableUtils.addKeyNamePrefix;
+
+/** Alphabetical order sorter to sort records by the given `orderColNames`. */
+public class OrderSorter extends TableSorter {
+
+    public OrderSorter(
+            StreamExecutionEnvironment batchTEnv,
+            DataStream<RowData> origin,
+            FileStoreTable table,
+            List<String> orderColNames) {
+        super(batchTEnv, origin, table, orderColNames);
+    }
+
+    @Override
+    public DataStream<RowData> sort() {
+        final RowType valueRowType = table.rowType();
+        final int[] keyProjectionMap = 
table.schema().projection(orderColNames);
+        final RowType keyRowType =
+                
addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueRowType));
+
+        return SortUtils.sortStreamByKey(
+                origin,
+                table,
+                keyRowType,
+                new InternalTypeInfo<>(
+                        new InternalRowTypeSerializer(
+                                keyRowType.getFieldTypes().toArray(new 
DataType[0]))),
+                new KeyComparatorSupplier(keyRowType),
+                new SortUtils.KeyAbstract<InternalRow>() {
+
+                    private transient org.apache.paimon.codegen.Projection 
keyProjection;
+
+                    @Override
+                    public void open() {
+                        // use key gen to speed up projection
+                        keyProjection = 
CodeGenUtils.newProjection(valueRowType, keyProjectionMap);
+                    }
+
+                    @Override
+                    public InternalRow apply(RowData value) {
+                        // deep copy by wrapper the Flink RowData
+                        return keyProjection.apply(new 
FlinkRowWrapper(value)).copy();
+                    }
+                },
+                row -> row);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
index d51cf1833..368639756 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -31,7 +31,6 @@ import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.sort.BinaryInMemorySortBuffer;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.MutableObjectIterator;
 
@@ -43,43 +42,29 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
 
 /** SortOperator to sort the `InternalRow`s by the `KeyType`. */
 public class SortOperator extends TableStreamOperator<InternalRow>
         implements OneInputStreamOperator<InternalRow, InternalRow>, 
BoundedOneInput {
 
-    private final RowType keyRowType;
-    private final RowType valueRowType;
+    private final RowType rowType;
     private final long maxMemory;
     private final int pageSize;
     private final int arity;
     private transient BinaryExternalSortBuffer buffer;
 
-    public SortOperator(RowType keyType, RowType valueRowType, long maxMemory, 
int pageSize) {
-        this.keyRowType = keyType;
-        this.valueRowType = valueRowType;
+    public SortOperator(RowType rowType, long maxMemory, int pageSize) {
+        this.rowType = rowType;
         this.maxMemory = maxMemory;
         this.pageSize = pageSize;
-        this.arity = keyType.getFieldCount() + valueRowType.getFieldCount();
+        this.arity = rowType.getFieldCount();
     }
 
     @Override
     public void open() throws Exception {
         super.open();
 
-        List<DataField> keyFields = keyRowType.getFields();
-        List<DataField> dataFields = valueRowType.getFields();
-
-        List<DataField> fields = new ArrayList<>();
-        fields.addAll(keyFields);
-        fields.addAll(dataFields);
-
-        RowType rowType = new RowType(fields);
-
         InternalRowSerializer serializer = InternalSerializers.create(rowType);
         NormalizedKeyComputer normalizedKeyComputer =
                 CodeGenUtils.newNormalizedKeyComputer(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
new file mode 100644
index 000000000..754086221
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.shuffle.RangeShuffle;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.SerializableSupplier;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * This is a table sorter which will sort the records by the key of the 
shuffleKeyComparator
+ * generates. It is a global sort method, we will shuffle the input stream 
through shuffleKey. After
+ * sorted, we convert datastream from paimon RowData back to Flink RowData
+ *
+ * <pre>
+ *                         toPaimonDataStream                         add key 
column                                  range shuffle by key                    
                local sort                                              remove 
key
+ * DataStream[RowData] --------------------> DataStream[PaimonRowData] 
-------------------> DataStream[PaimonRowData] -------------------------> 
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData 
sorted] ---------------------> DataStream[RowData sorted]
+ *                                                                             
                                                                                
                                                                      back to 
flink RowData
+ * </pre>
+ */
+public class SortUtils {
+
+    /**
+     * Sort the input stream by the key specified.
+     *
+     * @param inputStream the input data stream
+     * @param table the sorted file store table
+     * @param sortKeyType we will use paimon `BinaryExternalSortBuffer` to 
local sort, so we need to
+     *     specify the key type.
+     * @param keyTypeInformation we will use range shuffle in global sort, so 
we need to range
+     *     shuffle by the key first.
+     * @param shuffleKeyComparator comparator to compare the key when shuffle
+     * @param shuffleKeyAbstract abstract the key from the input `RowData`
+     * @param convertor convert the `KEY` to the sort key, then we can sort in
+     *     `BinaryExternalSortBuffer`.
+     * @return the global sorted data stream
+     * @param <KEY> the KEY type in range shuffle
+     */
+    public static <KEY> DataStream<RowData> sortStreamByKey(
+            final DataStream<RowData> inputStream,
+            final FileStoreTable table,
+            final RowType sortKeyType,
+            final TypeInformation<KEY> keyTypeInformation,
+            final SerializableSupplier<Comparator<KEY>> shuffleKeyComparator,
+            final KeyAbstract<KEY> shuffleKeyAbstract,
+            final ShuffleKeyConvertor<KEY> convertor) {
+
+        final RowType valueRowType = table.rowType();
+        final int parallelism = inputStream.getParallelism();
+        final long maxSortMemory = table.coreOptions().writeBufferSize();
+        final int pageSize = table.coreOptions().pageSize();
+
+        String sinkParallelismValue =
+                
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        final int sinkParallelism =
+                sinkParallelismValue == null
+                        ? inputStream.getParallelism()
+                        : Integer.parseInt(sinkParallelismValue);
+        final int sampleSize = sinkParallelism * 1000;
+        final int rangeNum = sinkParallelism * 10;
+
+        int keyFieldCount = sortKeyType.getFieldCount();
+        int valueFieldCount = valueRowType.getFieldCount();
+        final int[] valueProjectionMap = new int[valueFieldCount];
+        for (int i = 0; i < valueFieldCount; i++) {
+            valueProjectionMap[i] = i + keyFieldCount;
+        }
+
+        List<DataField> keyFields = sortKeyType.getFields();
+        List<DataField> dataFields = valueRowType.getFields();
+
+        List<DataField> fields = new ArrayList<>();
+        fields.addAll(keyFields);
+        fields.addAll(dataFields);
+        final RowType longRowType = new RowType(fields);
+        final InternalTypeInfo<InternalRow> internalRowType =
+                new InternalTypeInfo<>(
+                        new InternalRowTypeSerializer(
+                                longRowType.getFieldTypes().toArray(new 
DataType[0])));
+
+        // generate the KEY as the key of Pair.
+        DataStream<Tuple2<KEY, RowData>> inputWithKey =
+                inputStream
+                        .map(
+                                new RichMapFunction<RowData, Tuple2<KEY, 
RowData>>() {
+
+                                    @Override
+                                    public void open(Configuration parameters) 
throws Exception {
+                                        super.open(parameters);
+                                        shuffleKeyAbstract.open();
+                                    }
+
+                                    @Override
+                                    public Tuple2<KEY, RowData> map(RowData 
value) {
+                                        return 
Tuple2.of(shuffleKeyAbstract.apply(value), value);
+                                    }
+                                },
+                                new TupleTypeInfo<>(keyTypeInformation, 
inputStream.getType()))
+                        .setParallelism(parallelism);
+
+        // range shuffle by key
+        return RangeShuffle.rangeShuffleByKey(
+                        inputWithKey,
+                        shuffleKeyComparator,
+                        keyTypeInformation,
+                        sampleSize,
+                        rangeNum,
+                        sinkParallelism)
+                .map(
+                        a -> new JoinedRow(convertor.apply(a.f0), new 
FlinkRowWrapper(a.f1)),
+                        internalRowType)
+                .setParallelism(sinkParallelism)
+                // sort the output locally by `SortOperator`
+                .transform(
+                        "LOCAL SORT",
+                        internalRowType,
+                        new SortOperator(longRowType, maxSortMemory, pageSize))
+                .setParallelism(sinkParallelism)
+                // remove the key column from every row
+                .map(
+                        new RichMapFunction<InternalRow, InternalRow>() {
+
+                            private transient KeyProjectedRow keyProjectedRow;
+
+                            @Override
+                            public void open(Configuration parameters) {
+                                keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
+                            }
+
+                            @Override
+                            public InternalRow map(InternalRow value) {
+                                return keyProjectedRow.replaceRow(value);
+                            }
+                        },
+                        new InternalTypeInfo<>(
+                                new InternalRowTypeSerializer(
+                                        
valueRowType.getFieldTypes().toArray(new DataType[0]))))
+                .setParallelism(sinkParallelism)
+                .map(FlinkRowData::new, inputStream.getType())
+                .setParallelism(sinkParallelism);
+    }
+
+    /** Abstract key from a row data. */
+    interface KeyAbstract<KEY> extends Serializable {
+        default void open() {}
+
+        KEY apply(RowData value);
+    }
+
+    interface ShuffleKeyConvertor<KEY> extends Function<KEY, InternalRow>, 
Serializable {}
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index 1cf3572bc..6b27f17c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -74,8 +74,7 @@ public abstract class TableSorter {
             List<String> orderColumns) {
         switch (OrderType.of(sortStrategy)) {
             case ORDER:
-                // todo support alphabetical order
-                throw new IllegalArgumentException("Not supported yet.");
+                return new OrderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
             case ZORDER:
                 return new ZorderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
             case HILBERT:
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
index 465ab89d0..66051dc03 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -18,14 +18,24 @@
 
 package org.apache.paimon.flink.sorter;
 
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.action.SortCompactAction;
 import org.apache.paimon.sort.zorder.ZIndexer;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 
+import 
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -36,6 +46,9 @@ import java.util.List;
  */
 public class ZorderSorter extends TableSorter {
 
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", 
DataTypes.BYTES())));
+
     public ZorderSorter(
             StreamExecutionEnvironment batchTEnv,
             DataStream<RowData> origin,
@@ -57,7 +70,37 @@ public class ZorderSorter extends TableSorter {
      */
     private DataStream<RowData> sortStreamByZOrder(
             DataStream<RowData> inputStream, FileStoreTable table) {
-        ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
-        return ZorderSorterUtils.sortStreamByZorder(inputStream, zIndexer, 
table);
+        final ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
+        return SortUtils.sortStreamByKey(
+                inputStream,
+                table,
+                KEY_TYPE,
+                TypeInformation.of(byte[].class),
+                () ->
+                        (b1, b2) -> {
+                            assert b1.length == b2.length;
+                            for (int i = 0; i < b1.length; i++) {
+                                int ret = UnsignedBytes.compare(b1[i], b2[i]);
+                                if (ret != 0) {
+                                    return ret;
+                                }
+                            }
+                            return 0;
+                        },
+                new SortUtils.KeyAbstract<byte[]>() {
+                    @Override
+                    public void open() {
+                        zIndexer.open();
+                    }
+
+                    @Override
+                    public byte[] apply(RowData value) {
+                        byte[] zorder = zIndexer.index(new 
FlinkRowWrapper(value));
+                        // we can just return the reused bytes zorder, because 
the sample operator
+                        // will remember the record to sample.
+                        return Arrays.copyOf(zorder, zorder.length);
+                    }
+                },
+                GenericRow::of);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
deleted file mode 100644
index dd94255a8..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sorter;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
-import org.apache.paimon.flink.FlinkRowData;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.flink.shuffle.RangeShuffle;
-import org.apache.paimon.sort.zorder.ZIndexer;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.KeyProjectedRow;
-import org.apache.paimon.utils.Pair;
-
-import 
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.data.RowData;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-
-/**
- * This is a table sorter which will sort the records by the z-order of the 
z-indexer generates. It
- * is a global sort method, we will shuffle the input stream through z-order. 
After sorted, we
- * convert datastream from paimon RowData back to Flink RowData
- *
- * <pre>
- *                         toPaimonDataStream                        add 
z-order column                             range shuffle by z-order             
                    local sort                                              
remove z-index
- * DataStream[RowData] -------------------> DataStream[PaimonRowData] 
-------------------> DataStream[PaimonRowData] -------------------------> 
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData 
sorted] ----------------------------> DataStream[RowData sorted]
- *                                                                             
                                                                                
                                                                   back to 
flink RowData
- * </pre>
- */
-public class ZorderSorterUtils {
-
-    private static final RowType KEY_TYPE =
-            new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", 
DataTypes.BYTES())));
-
-    /**
-     * Sort the input stream by z-order.
-     *
-     * @param inputStream the stream wait to be ordered
-     * @param zIndexer generate z-index by the given row
-     * @param table the FileStoreTable
-     */
-    public static DataStream<RowData> sortStreamByZorder(
-            DataStream<RowData> inputStream, ZIndexer zIndexer, FileStoreTable 
table) {
-
-        final RowType valueRowType = table.rowType();
-        final int fieldCount = valueRowType.getFieldCount();
-        final int parallelism = inputStream.getParallelism();
-        final int sampleSize = parallelism * 1000;
-        final int rangeNum = parallelism * 10;
-        final long maxSortMemory = table.coreOptions().writeBufferSize();
-        final int pageSize = table.coreOptions().pageSize();
-
-        // generate the z-index as the key of Pair.
-        DataStream<Pair<byte[], RowData>> inputWithKey =
-                inputStream
-                        .map(
-                                new RichMapFunction<RowData, Pair<byte[], 
RowData>>() {
-
-                                    @Override
-                                    public void open(Configuration parameters) 
throws Exception {
-                                        super.open(parameters);
-                                        zIndexer.open();
-                                    }
-
-                                    @Override
-                                    public Pair<byte[], RowData> map(RowData 
value) {
-                                        byte[] zorder = zIndexer.index(new 
FlinkRowWrapper(value));
-                                        return Pair.of(Arrays.copyOf(zorder, 
zorder.length), value);
-                                    }
-                                })
-                        .setParallelism(parallelism);
-
-        // range shuffle by z-index key
-        return RangeShuffle.rangeShuffleByKey(
-                        inputWithKey,
-                        (Comparator<byte[]> & Serializable)
-                                (b1, b2) -> {
-                                    assert b1.length == b2.length;
-                                    for (int i = 0; i < b1.length; i++) {
-                                        int ret = UnsignedBytes.compare(b1[i], 
b2[i]);
-                                        if (ret != 0) {
-                                            return ret;
-                                        }
-                                    }
-                                    return 0;
-                                },
-                        byte[].class,
-                        sampleSize,
-                        rangeNum)
-                .map(
-                        a ->
-                                new JoinedRow(
-                                        GenericRow.of(a.getLeft()),
-                                        new FlinkRowWrapper(a.getRight())),
-                        TypeInformation.of(InternalRow.class))
-                .setParallelism(parallelism)
-                // sort the output locally by `SortOperator`
-                .transform(
-                        "LOCAL SORT",
-                        TypeInformation.of(InternalRow.class),
-                        new SortOperator(KEY_TYPE, valueRowType, 
maxSortMemory, pageSize))
-                .setParallelism(parallelism)
-                // remove the z-index column from every row
-                .map(
-                        new RichMapFunction<InternalRow, InternalRow>() {
-
-                            private transient KeyProjectedRow keyProjectedRow;
-
-                            @Override
-                            public void open(Configuration parameters) {
-                                int[] map = new int[fieldCount];
-                                for (int i = 0; i < map.length; i++) {
-                                    map[i] = i + 1;
-                                }
-                                keyProjectedRow = new KeyProjectedRow(map);
-                            }
-
-                            @Override
-                            public InternalRow map(InternalRow value) {
-                                return keyProjectedRow.replaceRow(value);
-                            }
-                        })
-                .setParallelism(parallelism)
-                .map(FlinkRowData::new, inputStream.getType())
-                .setParallelism(parallelism);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
new file mode 100644
index 000000000..1cf197386
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/InternalRowTypeSerializer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataType;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/** A TypeSerializer for {@link InternalRow}. */
+public class InternalRowTypeSerializer extends 
InternalTypeSerializer<InternalRow> {
+
+    private final InternalRowSerializer internalRowSerializer;
+
+    public InternalRowTypeSerializer(DataType... types) {
+        internalRowSerializer = new InternalRowSerializer(types);
+    }
+
+    @Override
+    public TypeSerializer<InternalRow> duplicate() {
+        return new 
InternalRowTypeSerializer(internalRowSerializer.fieldTypes());
+    }
+
+    @Override
+    public InternalRow createInstance() {
+        return new BinaryRow(internalRowSerializer.getArity());
+    }
+
+    @Override
+    public InternalRow copy(InternalRow from) {
+        return internalRowSerializer.copy(from);
+    }
+
+    @Override
+    public InternalRow copy(InternalRow from, InternalRow reuse) {
+        return internalRowSerializer.copyRowData(from, reuse);
+    }
+
+    @Override
+    public void serialize(InternalRow record, DataOutputView target) throws 
IOException {
+        BinaryRow row = internalRowSerializer.toBinaryRow(record);
+        target.writeInt(row.getSizeInBytes());
+        target.write(row.toBytes());
+    }
+
+    @Override
+    public InternalRow deserialize(DataInputView source) throws IOException {
+        return deserialize(createInstance(), source);
+    }
+
+    @Override
+    public InternalRow deserialize(InternalRow reuse, DataInputView source) 
throws IOException {
+        BinaryRow reuseRow = (BinaryRow) reuse;
+        int len = source.readInt();
+        byte[] bytes = new byte[len];
+        source.readFully(bytes);
+        reuseRow.pointTo(MemorySegment.wrap(bytes), 0, len);
+        return reuse;
+    }
+
+    @Override
+    public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+        int length = source.readInt();
+        target.writeInt(length);
+        target.write(source, length);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        InternalRowTypeSerializer that = (InternalRowTypeSerializer) o;
+        return Objects.equals(internalRowSerializer, 
that.internalRowSerializer);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(internalRowSerializer);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
deleted file mode 100644
index 3439c6d6e..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BinaryRowTypeSerializerTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.data.BinaryRow;
-
-import org.apache.flink.api.common.typeutils.SerializerTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-import java.util.Random;
-
-import static org.apache.paimon.io.DataFileTestUtils.row;
-
-/** Test for {@link BinaryRowTypeSerializer}. */
-public class BinaryRowTypeSerializerTest extends SerializerTestBase<BinaryRow> 
{
-
-    @Override
-    protected TypeSerializer<BinaryRow> createSerializer() {
-        return new BinaryRowTypeSerializer(2);
-    }
-
-    @Override
-    protected int getLength() {
-        return -1;
-    }
-
-    @Override
-    protected Class<BinaryRow> getTypeClass() {
-        return BinaryRow.class;
-    }
-
-    @Override
-    protected BinaryRow[] getTestData() {
-        Random rnd = new Random();
-        return new BinaryRow[] {
-            row(1, 1),
-            row(2, 2),
-            row(rnd.nextInt(), rnd.nextInt()),
-            row(rnd.nextInt(), rnd.nextInt())
-        };
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
index 0005f5e45..a4e2424f0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataTypes;
 
 import org.assertj.core.api.Assertions;
@@ -50,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** Order Rewrite Action tests for {@link SortCompactAction}. */
 public class OrderRewriteActionITCase extends ActionITCaseBase {
@@ -68,6 +70,74 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
         commit(commitMessages);
     }
 
+    @Test
+    public void testOrderBy() throws Exception {
+        prepareData(300, 1);
+        Assertions.assertThatCode(
+                        () ->
+                                order(
+                                        Arrays.asList(
+                                                "f0", "f1", "f2", "f3", "f4", 
"f5", "f6", "f7",
+                                                "f8", "f9", "f10", "f11", 
"f12", "f13", "f14",
+                                                "f15")))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testOrderResult() throws Exception {
+        prepareData(300, 2);
+        Assertions.assertThatCode(() -> order(Arrays.asList("f1", "f2")))
+                .doesNotThrowAnyException();
+
+        List<ManifestEntry> files =
+                ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+
+        ManifestEntry entry = files.get(0);
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withPartition(entry.partition())
+                        .withBucket(entry.bucket())
+                        .withDataFiles(Collections.singletonList(entry.file()))
+                        .build();
+
+        final AtomicInteger i = new AtomicInteger(Integer.MIN_VALUE);
+        getTable()
+                .newReadBuilder()
+                .newRead()
+                .createReader(dataSplit)
+                .forEachRemaining(
+                        a -> {
+                            Integer current = a.getInt(1);
+                            
Assertions.assertThat(current).isGreaterThanOrEqualTo(i.get());
+                            i.set(current);
+                        });
+
+        Assertions.assertThatCode(() -> order(Arrays.asList("f2", "f1")))
+                .doesNotThrowAnyException();
+
+        files = ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+
+        entry = files.get(0);
+        dataSplit =
+                DataSplit.builder()
+                        .withPartition(entry.partition())
+                        .withBucket(entry.bucket())
+                        .withDataFiles(Collections.singletonList(entry.file()))
+                        .build();
+
+        i.set(Integer.MIN_VALUE);
+        getTable()
+                .newReadBuilder()
+                .newRead()
+                .createReader(dataSplit)
+                .forEachRemaining(
+                        a -> {
+                            Integer current = a.getInt(2);
+                            
Assertions.assertThat(current).isGreaterThanOrEqualTo(i.get());
+                            i.set(current);
+                        });
+    }
+
     @Test
     public void testAllBasicTypeWorksWithZorder() throws Exception {
         prepareData(300, 1);
@@ -84,7 +154,7 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testZorderActionWorks() throws Exception {
-        prepareData(300, 30);
+        prepareData(300, 2);
         PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
         Predicate predicate = predicateBuilder.between(1, 100, 200);
 
@@ -113,6 +183,39 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
         Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
     }
 
+    @Test
+    public void testCompareZorderAndOrder() throws Exception {
+        prepareData(300, 10);
+        zorder(Arrays.asList("f2", "f1"));
+
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 10, 20);
+
+        List<ManifestEntry> filesZorder =
+                ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilterZorder =
+                ((AppendOnlyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+
+        order(Arrays.asList("f2", "f1"));
+        List<ManifestEntry> filesOrder =
+                ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilterOrder =
+                ((AppendOnlyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+
+        Assertions.assertThat(filesFilterZorder.size() / (double) 
filesZorder.size())
+                .isLessThan(filesFilterOrder.size() / (double) 
filesOrder.size());
+    }
+
     private void zorder(List<String> columns) throws Exception {
         SortCompactAction sortCompactAction =
                 new SortCompactAction(
@@ -125,6 +228,18 @@ public class OrderRewriteActionITCase extends 
ActionITCaseBase {
         sortCompactAction.run();
     }
 
+    private void order(List<String> columns) throws Exception {
+        SortCompactAction sortCompactAction =
+                new SortCompactAction(
+                        new Path(path.toUri()).toUri().toString(),
+                        "my_db",
+                        "Orders1",
+                        Collections.emptyMap());
+        sortCompactAction.withOrderStrategy("order");
+        sortCompactAction.withOrderColumns(columns);
+        sortCompactAction.run();
+    }
+
     public Catalog getCatalog() {
         if (catalog == null) {
             Options options = new Options();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
new file mode 100644
index 000000000..c9a8c3e99
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/InternalRowSerializerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+/** Tests for {@link InternalRowTypeSerializer}. */
+public class InternalRowSerializerTest {
+
+    private static final Random RANDOM = new Random();
+    private static final RowType rowType =
+            RowType.builder()
+                    .field("a", DataTypes.STRING())
+                    .field("b", DataTypes.INT())
+                    .field("c", DataTypes.BIGINT())
+                    .build();
+
+    @Test
+    public void testSerializeAndDeserilize() throws Exception {
+        DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(100);
+        InternalRowTypeSerializer internalRowTypeSerializer =
+                new 
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+        InternalRow row = GenericRow.of(randomString(), RANDOM.nextInt(), 
RANDOM.nextLong());
+        internalRowTypeSerializer.serialize(row, dataOutputSerializer);
+        InternalRow row1 =
+                internalRowTypeSerializer.deserialize(
+                        new 
DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer()));
+
+        Assertions.assertThat(row.getString(0)).isEqualTo(row1.getString(0));
+        Assertions.assertThat(row.getInt(1)).isEqualTo(row1.getInt(1));
+        Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
+    }
+
+    @Test
+    public void testEqual() {
+        InternalRowTypeSerializer internalRowTypeSerializer =
+                new 
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+        Assertions.assertThat(internalRowTypeSerializer)
+                .isEqualTo(internalRowTypeSerializer.duplicate());
+    }
+
+    @Test
+    public void testCopyFromView() {
+        InternalRowTypeSerializer internalRowTypeSerializer =
+                new 
InternalRowTypeSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
+
+        InternalRow row = GenericRow.of(randomString(), RANDOM.nextInt(), 
RANDOM.nextLong());
+        InternalRow row1 = internalRowTypeSerializer.copy(row);
+
+        Assertions.assertThat(row.getString(0)).isEqualTo(row1.getString(0));
+        Assertions.assertThat(row.getInt(1)).isEqualTo(row1.getInt(1));
+        Assertions.assertThat(row.getLong(2)).isEqualTo(row1.getLong(2));
+    }
+
+    private BinaryString randomString() {
+        int length = RANDOM.nextInt(100);
+        byte[] buffer = new byte[length];
+        for (int i = 0; i < length; i += 1) {
+            buffer[i] = (byte) ('A' + RANDOM.nextInt(26));
+        }
+        return BinaryString.fromBytes(buffer);
+    }
+}

Reply via email to