This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dbcff69f [core] support z-order range sort action (#1846)
0dbcff69f is described below

commit 0dbcff69f781341be10772a093578191aa4b82b1
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 23 21:00:21 2023 +0800

    [core] support z-order range sort action (#1846)
---
 LICENSE                                            |   2 +
 docs/content/concepts/append-only-table.md         |  21 +
 .../layouts/shortcodes/generated/sort-compact.html |  55 +++
 .../org/apache/paimon/sort/zorder/ZIndexer.java    | 398 ++++++++++++++++++
 .../org/apache/paimon/utils/ZOrderByteUtils.java   | 242 +++++++++++
 .../apache/paimon/sort/zorder/ZIndexerTest.java    |  70 ++++
 .../apache/paimon/utils/TestZOrderByteUtil.java    | 426 ++++++++++++++++++++
 .../org/apache/paimon/flink/action/ActionBase.java |  46 +++
 .../apache/paimon/flink/action/CompactAction.java  |   4 +
 .../paimon/flink/action/CompactActionFactory.java  |  38 +-
 .../paimon/flink/action/SortCompactAction.java     | 132 ++++++
 .../paimon/flink/action/TableActionBase.java       |  49 +--
 .../apache/paimon/flink/shuffle/RangeShuffle.java  | 448 +++++++++++++++++++++
 .../apache/paimon/flink/sorter/SortOperator.java   | 124 ++++++
 .../apache/paimon/flink/sorter/TableSorter.java    | 117 ++++++
 .../apache/paimon/flink/sorter/ZorderSorter.java   |  63 +++
 .../paimon/flink/sorter/ZorderSorterUtils.java     | 157 ++++++++
 .../source/assigners/PreAssignSplitAssigner.java   |   2 +-
 .../flink/action/OrderRewriteActionITCase.java     | 229 +++++++++++
 ...rTest.java => KafkaLogStoreRegisterITCase.java} |   2 +-
 20 files changed, 2572 insertions(+), 53 deletions(-)

diff --git a/LICENSE b/LICENSE
index d3f258fa3..ef08963ea 100644
--- a/LICENSE
+++ b/LICENSE
@@ -234,6 +234,8 @@ from http://flink.apache.org/ version 1.17.0
 
 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
+paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
+paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
 from http://iceberg.apache.org/ version 1.3.0
 
 paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
diff --git a/docs/content/concepts/append-only-table.md 
b/docs/content/concepts/append-only-table.md
index cc469019d..f81a906d8 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -227,6 +227,27 @@ behavior is exactly the same as [Append For Qeueue]({{< 
ref "#compaction" >}}).
 The auto compaction is only supported in Flink engine streaming mode. You can 
also start a compaction job in flink by flink action in paimon
 and disable all the other compaction by set `write-only`.
 
+### Sort Compact
+
+The data in a per-partition out of order will lead a slow select, compaction 
may slow down the inserting. It is a good choice for you to set 
+write-only for inserting job, and after per-partition data done, trigger a 
partition `Sort Compact` action. 
+
+You can trigger action by shell script:
+```shell
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
+    compact \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table <tableName> \
+    --order-strategy <orderType> \
+    --order-by <col1,col2,...>
+```
+
+{{< generated/sort-compact >}}
+
+Other config is the same as [Compact Table]({{< ref 
"concepts/file-operations#compact-table" >}})
+
 ### Streaming Source
 
 Unaware-bucket mode append-only table supported streaming read and write, but 
no longer guarantee order anymore. You cannot regard it 
diff --git a/docs/layouts/shortcodes/generated/sort-compact.html 
b/docs/layouts/shortcodes/generated/sort-compact.html
new file mode 100644
index 000000000..654904c3e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/sort-compact.html
@@ -0,0 +1,55 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 15%">Configuration</th>
+        <th class="text-left" style="width: 85%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>--order-strategy</h5></td>
+        <td>the order strategy now only support zorder. For example: 
--order-strategy zorder</td>
+    </tr>
+    <tr>
+        <td><h5>--order-by</h5></td>
+        <td>Specify the order columns. For example: --order-by col0, col1</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java 
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
new file mode 100644
index 000000000..de281aae3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static org.apache.paimon.utils.ZOrderByteUtils.NULL_BYTES;
+import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+
+/** Z-indexer for responsibility to generate z-index. */
+public class ZIndexer implements Serializable {
+
+    private final Set<RowProcessor> functionSet;
+    private final int[] fieldsIndex;
+    private final int totalBytes;
+    private transient ByteBuffer reuse;
+
+    public ZIndexer(RowType rowType, List<String> orderColumns) {
+        List<String> fields = rowType.getFieldNames();
+        fieldsIndex = new int[orderColumns.size()];
+        for (int i = 0; i < fieldsIndex.length; i++) {
+            int index = fields.indexOf(orderColumns.get(i));
+            if (index == -1) {
+                throw new IllegalArgumentException(
+                        "Can't find column: "
+                                + orderColumns.get(i)
+                                + " in row type fields: "
+                                + fields);
+            }
+            fieldsIndex[i] = index;
+        }
+        this.functionSet = constructFunctionMap(rowType.getFields());
+        this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length;
+    }
+
+    public void open() {
+        this.reuse = ByteBuffer.allocate(totalBytes);
+        functionSet.forEach(RowProcessor::open);
+    }
+
+    public int size() {
+        return totalBytes;
+    }
+
+    public byte[] index(InternalRow row) {
+        byte[][] columnBytes = new byte[fieldsIndex.length][];
+
+        int index = 0;
+        for (RowProcessor f : functionSet) {
+            columnBytes[index++] = f.zvalue(row);
+        }
+
+        return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse);
+    }
+
+    public Set<RowProcessor> constructFunctionMap(List<DataField> fields) {
+        Set<RowProcessor> zorderFunctionSet = new LinkedHashSet<>();
+        // Construct zorderFunctionSet and fill dataTypes, rowFields
+        for (int fieldIndex = 0; fieldIndex < fieldsIndex.length; 
fieldIndex++) {
+            int index = fieldsIndex[fieldIndex];
+            DataField field = fields.get(index);
+            zorderFunctionSet.add(zmapColumnToCalculator(field, index));
+        }
+        return zorderFunctionSet;
+    }
+
+    public static RowProcessor zmapColumnToCalculator(DataField field, int 
index) {
+        DataType type = field.type();
+        return type.accept(new TypeVisitor(index));
+    }
+
+    /** Type Visitor to generate function map from row column to z-index. */
+    public static class TypeVisitor implements DataTypeVisitor<RowProcessor> {
+
+        private final int fieldIndex;
+
+        public TypeVisitor(int index) {
+            this.fieldIndex = index;
+        }
+
+        @Override
+        public RowProcessor visit(CharType charType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(charType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.stringToOrderedBytes(
+                                                o.toString(), 
PRIMITIVE_BUFFER_SIZE, reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(VarCharType varCharType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(varCharType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.stringToOrderedBytes(
+                                                o.toString(), 
PRIMITIVE_BUFFER_SIZE, reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(BooleanType booleanType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(booleanType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        if (o == null) {
+                            return NULL_BYTES;
+                        }
+                        ZOrderByteUtils.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+                        reuse.put(0, (byte) ((boolean) o ? -127 : 0));
+                        return reuse.array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(BinaryType binaryType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(binaryType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.byteTruncateOrFill(
+                                                (byte[]) o, 
PRIMITIVE_BUFFER_SIZE, reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(VarBinaryType varBinaryType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(varBinaryType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.byteTruncateOrFill(
+                                                (byte[]) o, 
PRIMITIVE_BUFFER_SIZE, reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(DecimalType decimalType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(decimalType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.byteTruncateOrFill(
+                                                ((Decimal) 
o).toUnscaledBytes(),
+                                                PRIMITIVE_BUFFER_SIZE,
+                                                reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(TinyIntType tinyIntType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(tinyIntType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.tinyintToOrderedBytes((byte) 
o, reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(SmallIntType smallIntType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(smallIntType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.shortToOrderedBytes((short) 
o, reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(IntType intType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(intType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.intToOrderedBytes((int) o, 
reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(BigIntType bigIntType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(bigIntType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.longToOrderedBytes((long) o, 
reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(FloatType floatType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(floatType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.floatToOrderedBytes((float) 
o, reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(DoubleType doubleType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(doubleType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : 
ZOrderByteUtils.doubleToOrderedBytes((double) o, reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(DateType dateType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(dateType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.intToOrderedBytes((int) o, 
reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(TimeType timeType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(timeType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.intToOrderedBytes((int) o, 
reuse).array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(TimestampType timestampType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(timestampType, fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.longToOrderedBytes(
+                                                ((Timestamp) 
o).getMillisecond(), reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(LocalZonedTimestampType 
localZonedTimestampType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(localZonedTimestampType, 
fieldIndex);
+            return new RowProcessor(
+                    (row, reuse) -> {
+                        Object o = fieldGetter.getFieldOrNull(row);
+                        return o == null
+                                ? NULL_BYTES
+                                : ZOrderByteUtils.longToOrderedBytes(
+                                                ((Timestamp) 
o).getMillisecond(), reuse)
+                                        .array();
+                    });
+        }
+
+        @Override
+        public RowProcessor visit(ArrayType arrayType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public RowProcessor visit(MultisetType multisetType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public RowProcessor visit(MapType mapType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public RowProcessor visit(RowType rowType) {
+            throw new RuntimeException("Unsupported type");
+        }
+    }
+
+    /** Be used as converting row field record to devoted bytes. */
+    public static class RowProcessor implements Serializable {
+
+        private transient ByteBuffer reuse;
+        private final ZProcessFunction process;
+
+        public RowProcessor(ZProcessFunction process) {
+            this.process = process;
+        }
+
+        public void open() {
+            reuse = ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
+        }
+
+        public byte[] zvalue(InternalRow o) {
+            return process.apply(o, reuse);
+        }
+    }
+
+    interface ZProcessFunction extends BiFunction<InternalRow, ByteBuffer, 
byte[]>, Serializable {}
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
new file mode 100644
index 000000000..1ea4194c2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Within Z-Ordering the byte representations of objects being compared must 
be ordered, this
+ * requires several types to be transformed when converted to bytes. The goal 
is to map object's
+ * whose byte representation are not lexicographically ordered into 
representations that are
+ * lexicographically ordered. Bytes produced should be compared 
lexicographically as unsigned bytes,
+ * big-endian.
+ *
+ * <p>All types except for String are stored within an 8 Byte Buffer
+ *
+ * <p>Most of these techniques are derived from
+ * 
https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/
+ */
+public class ZOrderByteUtils {
+
+    public static final int PRIMITIVE_BUFFER_SIZE = 8;
+    public static final byte[] NULL_BYTES = new byte[PRIMITIVE_BUFFER_SIZE];
+    private static ThreadLocal<CharsetEncoder> encoderThreadLocal = new 
ThreadLocal<>();
+
+    static {
+        Arrays.fill(NULL_BYTES, (byte) 0x00);
+    }
+
+    private ZOrderByteUtils() {}
+
+    static ByteBuffer allocatePrimitiveBuffer() {
+        return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
+    }
+
+    /**
+     * Signed ints do not have their bytes in magnitude order because of the 
sign bit. To fix this,
+     * flip the sign bit so that all negatives are ordered before positives. 
This essentially shifts
+     * the 0 value so that we don't break our ordering when we cross the new 0 
value.
+     */
+    public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) {
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        bytes.putLong(((long) val) ^ 0x8000000000000000L);
+        return bytes;
+    }
+
+    /**
+     * Signed longs are treated the same as the signed ints in {@link 
#intToOrderedBytes(int,
+     * ByteBuffer)}.
+     */
+    public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) {
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        bytes.putLong(val ^ 0x8000000000000000L);
+        return bytes;
+    }
+
+    /**
+     * Signed shorts are treated the same as the signed ints in {@link 
#intToOrderedBytes(int,
+     * ByteBuffer)}.
+     */
+    public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) {
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        bytes.putLong(((long) val) ^ 0x8000000000000000L);
+        return bytes;
+    }
+
+    /**
+     * Signed tiny ints are treated the same as the signed ints in {@link 
#intToOrderedBytes(int,
+     * ByteBuffer)}.
+     */
+    public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) 
{
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        bytes.putLong(((long) val) ^ 0x8000000000000000L);
+        return bytes;
+    }
+
+    /**
+     * IEEE 754 : “If two floating-point numbers in the same format are 
ordered (say, x {@literal <}
+     * y), they are ordered the same way when their bits are reinterpreted as 
sign-magnitude
+     * integers.”
+     *
+     * <p>Which means floats can be treated as sign magnitude integers which 
can then be converted
+     * into lexicographically comparable bytes.
+     */
+    public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) {
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        long lval = Double.doubleToLongBits(val);
+        lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
+        bytes.putLong(lval);
+        return bytes;
+    }
+
+    /**
+     * Doubles are treated the same as floats in {@link 
#floatToOrderedBytes(float, ByteBuffer)}.
+     */
+    public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer 
reuse) {
+        ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+        long lval = Double.doubleToLongBits(val);
+        lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
+        bytes.putLong(lval);
+        return bytes;
+    }
+
+    /**
+     * Strings are lexicographically sortable BUT if different byte array 
lengths will ruin the
+     * Z-Ordering. (ZOrder requires that a given column contribute the same 
number of bytes every
+     * time). This implementation just uses a set size to for all output byte 
representations.
+     * Truncating longer strings and right padding 0 for shorter strings.
+     */
+    @SuppressWarnings("ByteBufferBackingArray")
+    public static ByteBuffer stringToOrderedBytes(String val, int length, 
ByteBuffer reuse) {
+        CharsetEncoder encoder = encoderThreadLocal.get();
+        if (encoder == null) {
+            encoder = StandardCharsets.UTF_8.newEncoder();
+            encoderThreadLocal.set(encoder);
+        }
+
+        ByteBuffer bytes = reuse(reuse, length);
+        Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
+        if (val != null) {
+            CharBuffer inputBuffer = CharBuffer.wrap(val);
+            encoder.encode(inputBuffer, bytes, true);
+        }
+        return bytes;
+    }
+
+    /**
+     * Return a bytebuffer with the given bytes truncated to length, or filled 
with 0's to length
+     * depending on whether the given bytes are larger or smaller than the 
given length.
+     */
+    @SuppressWarnings("ByteBufferBackingArray")
+    public static ByteBuffer byteTruncateOrFill(byte[] val, int length, 
ByteBuffer reuse) {
+        ByteBuffer bytes = reuse(reuse, length);
+        if (val.length < length) {
+            bytes.put(val, 0, val.length);
+            Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
+        } else {
+            bytes.put(val, 0, length);
+        }
+        return bytes;
+    }
+
+    public static byte[] interleaveBits(byte[][] columnsBinary, int 
interleavedSize) {
+        return interleaveBits(columnsBinary, interleavedSize, 
ByteBuffer.allocate(interleavedSize));
+    }
+
+    /**
+     * Interleave bits using a naive loop. Variable length inputs are allowed 
but to get a
+     * consistent ordering it is required that every column contribute the 
same number of bytes in
+     * each invocation. Bits are interleaved from all columns that have a bit 
available at that
+     * position. Once a Column has no more bits to produce it is skipped in 
the interleaving.
+     *
+     * @param columnsBinary an array of ordered byte representations of the 
columns being ZOrdered
+     * @param interleavedSize the number of bytes to use in the output
+     * @return the columnbytes interleaved
+     */
+    // NarrowingCompoundAssignment is intended here. See
+    // https://github.com/apache/iceberg/pull/5200#issuecomment-1176226163
+    @SuppressWarnings({"ByteBufferBackingArray", 
"NarrowingCompoundAssignment"})
+    public static byte[] interleaveBits(
+            byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) {
+        byte[] interleavedBytes = reuse.array();
+        Arrays.fill(interleavedBytes, 0, interleavedSize, (byte) 0x00);
+
+        int sourceColumn = 0;
+        int sourceByte = 0;
+        int sourceBit = 7;
+        int interleaveByte = 0;
+        int interleaveBit = 7;
+
+        while (interleaveByte < interleavedSize) {
+            // Take the source bit from source byte and move it to the output 
bit position
+            interleavedBytes[interleaveByte] |=
+                    (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit)
+                            >>> sourceBit
+                            << interleaveBit;
+            --interleaveBit;
+
+            // Check if an output byte has been completed
+            if (interleaveBit == -1) {
+                // Move to the next output byte
+                interleaveByte++;
+                // Move to the highest order bit of the new output byte
+                interleaveBit = 7;
+            }
+
+            // Check if the last output byte has been completed
+            if (interleaveByte == interleavedSize) {
+                break;
+            }
+
+            // Find the next source bit to interleave
+            do {
+                // Move to next column
+                ++sourceColumn;
+                if (sourceColumn == columnsBinary.length) {
+                    // If the last source column was used, reset to next bit 
of first column
+                    sourceColumn = 0;
+                    --sourceBit;
+                    if (sourceBit == -1) {
+                        // If the last bit of the source byte was used, reset 
to the highest bit of
+                        // the next
+                        // byte
+                        sourceByte++;
+                        sourceBit = 7;
+                    }
+                }
+            } while (columnsBinary[sourceColumn].length <= sourceByte);
+        }
+        return interleavedBytes;
+    }
+
+    public static ByteBuffer reuse(ByteBuffer reuse, int length) {
+        reuse.position(0);
+        reuse.limit(length);
+        return reuse;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
new file mode 100644
index 000000000..4e9c08993
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/** Tests for {@link ZIndexer}. */
+public class ZIndexerTest {
+
+    private static final Random RANDOM = new Random();
+
+    @Test
+    public void testZIndexer() {
+        RowType rowType = RowType.of(new IntType(), new BigIntType());
+
+        ZIndexer zIndexer = new ZIndexer(rowType, Arrays.asList("f0", "f1"));
+        zIndexer.open();
+
+        for (int i = 0; i < 1000; i++) {
+            int a = RANDOM.nextInt();
+            long b = RANDOM.nextLong();
+
+            InternalRow internalRow = GenericRow.of(a, b);
+
+            byte[] zOrder = zIndexer.index(internalRow);
+
+            byte[][] zCache = new byte[2][];
+            ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+            ZOrderByteUtils.intToOrderedBytes(a, byteBuffer);
+            zCache[0] = Arrays.copyOf(byteBuffer.array(), 8);
+
+            ZOrderByteUtils.longToOrderedBytes(b, byteBuffer);
+            zCache[1] = Arrays.copyOf(byteBuffer.array(), 8);
+
+            byte[] expectedZOrder = ZOrderByteUtils.interleaveBits(zCache, 16);
+
+            for (int j = 0; j < 16; j++) {
+                Assertions.assertThat(zOrder[j]).isEqualTo(expectedZOrder[j]);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
new file mode 100644
index 000000000..de1bf7709
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.paimon.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.primitives.UnsignedBytes;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/** Tests for {@link ZOrderByteUtils}. */
+public class TestZOrderByteUtil {
+    private static final byte IIIIIIII = (byte) 255;
+    private static final byte IOIOIOIO = (byte) 170;
+    private static final byte OIOIOIOI = (byte) 85;
+    private static final byte OOOOIIII = (byte) 15;
+    private static final byte OOOOOOOI = (byte) 1;
+    private static final byte OOOOOOOO = (byte) 0;
+
+    private static final int NUM_TESTS = 100000;
+    private static final int NUM_INTERLEAVE_TESTS = 1000;
+
+    private final Random random = new Random(42);
+
+    private String bytesToString(byte[] bytes) {
+        StringBuilder result = new StringBuilder();
+        for (byte b : bytes) {
+            result.append(String.format("%8s", Integer.toBinaryString(b & 
0xFF)).replace(' ', '0'));
+        }
+        return result.toString();
+    }
+
+    /** Returns a non-0 length byte array. */
+    private byte[] generateRandomBytes() {
+        int length = Math.abs(random.nextInt(100) + 1);
+        return generateRandomBytes(length);
+    }
+
+    /** Returns a byte array of a specified length. */
+    private byte[] generateRandomBytes(int length) {
+        byte[] result = new byte[length];
+        random.nextBytes(result);
+        return result;
+    }
+
+    /** Test method to ensure correctness of byte interleaving code. */
+    private String interleaveStrings(String[] strings) {
+        StringBuilder result = new StringBuilder();
+        int totalLength = 
Arrays.stream(strings).mapToInt(String::length).sum();
+        int substringIndex = 0;
+        int characterIndex = 0;
+        while (characterIndex < totalLength) {
+            for (String str : strings) {
+                if (substringIndex < str.length()) {
+                    result.append(str.charAt(substringIndex));
+                    characterIndex++;
+                }
+            }
+            substringIndex++;
+        }
+        return result.toString();
+    }
+
+    /**
+     * Compares the result of a string based interleaving algorithm 
implemented above versus the
+     * binary bit-shifting algorithm used in ZOrderByteUtils. Either both 
algorithms are identically
+     * wrong or are both identically correct.
+     */
+    @Test
+    public void testInterleaveRandomExamples() {
+        for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) {
+            int numByteArrays = Math.abs(random.nextInt(6)) + 1;
+            byte[][] testBytes = new byte[numByteArrays][];
+            String[] testStrings = new String[numByteArrays];
+            for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) {
+                testBytes[byteIndex] = generateRandomBytes();
+                testStrings[byteIndex] = bytesToString(testBytes[byteIndex]);
+            }
+
+            int zOrderSize = Arrays.stream(testBytes).mapToInt(column -> 
column.length).sum();
+            byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes, 
zOrderSize);
+            String byteResultAsString = bytesToString(byteResult);
+
+            String stringResult = interleaveStrings(testStrings);
+
+            Assert.assertEquals(
+                    "String interleave didn't match byte interleave",
+                    stringResult,
+                    byteResultAsString);
+        }
+    }
+
+    @Test
+    public void testReuseInterleaveBuffer() {
+        int numByteArrays = 2;
+        int colLength = 16;
+        ByteBuffer interleaveBuffer = ByteBuffer.allocate(numByteArrays * 
colLength);
+        for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) {
+            byte[][] testBytes = new byte[numByteArrays][];
+            String[] testStrings = new String[numByteArrays];
+            for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) {
+                testBytes[byteIndex] = generateRandomBytes(colLength);
+                testStrings[byteIndex] = bytesToString(testBytes[byteIndex]);
+            }
+
+            byte[] byteResult =
+                    ZOrderByteUtils.interleaveBits(
+                            testBytes, numByteArrays * colLength, 
interleaveBuffer);
+            String byteResultAsString = bytesToString(byteResult);
+
+            String stringResult = interleaveStrings(testStrings);
+
+            Assert.assertEquals(
+                    "String interleave didn't match byte interleave",
+                    stringResult,
+                    byteResultAsString);
+        }
+    }
+
+    @Test
+    public void testInterleaveEmptyBits() {
+        byte[][] test = new byte[4][10];
+        byte[] expected = new byte[40];
+
+        Assert.assertArrayEquals(
+                "Should combine empty arrays", expected, 
ZOrderByteUtils.interleaveBits(test, 40));
+    }
+
+    @Test
+    public void testInterleaveFullBits() {
+        byte[][] test = new byte[4][];
+        test[0] = new byte[] {IIIIIIII, IIIIIIII};
+        test[1] = new byte[] {IIIIIIII};
+        test[2] = new byte[0];
+        test[3] = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII};
+        byte[] expected = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, 
IIIIIIII, IIIIIIII};
+
+        Assert.assertArrayEquals(
+                "Should combine full arrays", expected, 
ZOrderByteUtils.interleaveBits(test, 6));
+    }
+
+    @Test
+    public void testInterleaveMixedBits() {
+        byte[][] test = new byte[4][];
+        test[0] = new byte[] {OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII};
+        test[1] = new byte[] {OOOOOOOI, OOOOOOOO, IIIIIIII};
+        test[2] = new byte[] {OOOOOOOI};
+        test[3] = new byte[] {OOOOOOOI};
+        byte[] expected =
+                new byte[] {
+                    OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, IOIOIOIO, 
IOIOIOIO, OIOIOIOI, OIOIOIOI,
+                    OOOOIIII
+                };
+        Assert.assertArrayEquals(
+                "Should combine mixed byte arrays",
+                expected,
+                ZOrderByteUtils.interleaveBits(test, 9));
+    }
+
+    @Test
+    public void testIntOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            int aInt = random.nextInt();
+            int bInt = random.nextInt();
+            int intCompare = Integer.signum(Integer.compare(aInt, bInt));
+            byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of ints should match ordering of bytes, 
%s ~ %s -> %s != %s ~ %s -> %s ",
+                            aInt,
+                            bInt,
+                            intCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    intCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testLongOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            long aLong = random.nextInt();
+            long bLong = random.nextInt();
+            int longCompare = Integer.signum(Long.compare(aLong, bLong));
+            byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of longs should match ordering of bytes, 
%s ~ %s -> %s != %s ~ %s -> %s ",
+                            aLong,
+                            bLong,
+                            longCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    longCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testShortOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
+            short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
+            int longCompare = Integer.signum(Long.compare(aShort, bShort));
+            byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of longs should match ordering of bytes, 
%s ~ %s -> %s != %s ~ %s -> %s ",
+                            aShort,
+                            bShort,
+                            longCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    longCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testTinyOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
+            byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
+            int longCompare = Integer.signum(Long.compare(aByte, bByte));
+            byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of longs should match ordering of bytes, 
%s ~ %s -> %s != %s ~ %s -> %s ",
+                            aByte,
+                            bByte,
+                            longCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    longCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testFloatOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            float aFloat = random.nextFloat();
+            float bFloat = random.nextFloat();
+            int floatCompare = Integer.signum(Float.compare(aFloat, bFloat));
+            byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of floats should match ordering of 
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+                            aFloat,
+                            bFloat,
+                            floatCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    floatCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testDoubleOrdering() {
+        ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+        for (int i = 0; i < NUM_TESTS; i++) {
+            double aDouble = random.nextDouble();
+            double bDouble = random.nextDouble();
+            int doubleCompare = Integer.signum(Double.compare(aDouble, 
bDouble));
+            byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of doubles should match ordering of 
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+                            aDouble,
+                            bDouble,
+                            doubleCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    doubleCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testStringOrdering() {
+        ByteBuffer aBuffer = ByteBuffer.allocate(128);
+        ByteBuffer bBuffer = ByteBuffer.allocate(128);
+        for (int i = 0; i < NUM_TESTS; i++) {
+            String aString = randomString();
+            String bString = randomString();
+            int stringCompare = Integer.signum(aString.compareTo(bString));
+            byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of strings should match ordering of 
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+                            aString,
+                            bString,
+                            stringCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    stringCompare,
+                    byteCompare);
+        }
+    }
+
+    @Test
+    public void testByteTruncateOrFill() {
+        ByteBuffer aBuffer = ByteBuffer.allocate(128);
+        ByteBuffer bBuffer = ByteBuffer.allocate(128);
+        for (int i = 0; i < NUM_TESTS; i++) {
+            byte[] aBytesRaw = randomBytes();
+            byte[] bBytesRaw = randomBytes();
+            int stringCompare =
+                    Integer.signum(
+                            UnsignedBytes.lexicographicalComparator()
+                                    .compare(aBytesRaw, bBytesRaw));
+            byte[] aBytes = ZOrderByteUtils.byteTruncateOrFill(aBytesRaw, 128, 
aBuffer).array();
+            byte[] bBytes = ZOrderByteUtils.byteTruncateOrFill(bBytesRaw, 128, 
bBuffer).array();
+            int byteCompare =
+                    Integer.signum(
+                            
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+            Assert.assertEquals(
+                    String.format(
+                            "Ordering of strings should match ordering of 
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+                            aBytesRaw,
+                            bBytesRaw,
+                            stringCompare,
+                            Arrays.toString(aBytes),
+                            Arrays.toString(bBytes),
+                            byteCompare),
+                    stringCompare,
+                    byteCompare);
+        }
+    }
+
+    private byte[] randomBytes() {
+        byte[] binary = new byte[random.nextInt(50)];
+        random.nextBytes(binary);
+        return binary;
+    }
+
+    private String randomString() {
+        int length = random.nextInt(50);
+        byte[] buffer = new byte[length];
+
+        for (int i = 0; i < length; i += 1) {
+            buffer[i] = (byte) ('a' + random.nextInt(26));
+        }
+
+        return new String(buffer);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 6242bec27..0d4d25354 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -21,16 +21,24 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
 
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.types.logical.LogicalType;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /** Abstract base of {@link Action} for table. */
 public abstract class ActionBase implements Action {
@@ -40,6 +48,8 @@ public abstract class ActionBase implements Action {
     protected final Catalog catalog;
     protected final FlinkCatalog flinkCatalog;
     protected final String catalogName = "paimon-" + UUID.randomUUID();
+    protected final StreamExecutionEnvironment env;
+    protected final StreamTableEnvironment batchTEnv;
 
     public ActionBase(String warehouse, Map<String, String> catalogConfig) {
         catalogOptions = Options.fromMap(catalogConfig);
@@ -47,6 +57,12 @@ public abstract class ActionBase implements Action {
 
         catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
         flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, 
catalogOptions);
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        batchTEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+
+        // register flink catalog to table environment
+        batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
+        batchTEnv.useCatalog(flinkCatalog.getName());
     }
 
     protected void execute(StreamExecutionEnvironment env, String defaultName) 
throws Exception {
@@ -60,4 +76,34 @@ public abstract class ActionBase implements Action {
         Options catalogOptions = this.catalogOptions;
         return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
     }
+
+    /**
+     * Extract {@link LogicalType}s from Flink {@link 
org.apache.flink.table.types.DataType}s and
+     * convert to Paimon {@link DataType}s.
+     */
+    protected List<DataType> toPaimonTypes(
+            List<org.apache.flink.table.types.DataType> flinkDataTypes) {
+        return flinkDataTypes.stream()
+                .map(org.apache.flink.table.types.DataType::getLogicalType)
+                .map(LogicalTypeConversion::toDataType)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Check whether each {@link DataType} of actualTypes is compatible with 
that of expectedTypes
+     * respectively.
+     */
+    protected boolean compatibleCheck(List<DataType> actualTypes, 
List<DataType> expectedTypes) {
+        if (actualTypes.size() != expectedTypes.size()) {
+            return false;
+        }
+
+        for (int i = 0; i < actualTypes.size(); i++) {
+            if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), 
expectedTypes.get(i))) {
+                return false;
+            }
+        }
+
+        return true;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 27c5c9046..9d0b99484 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -119,4 +119,8 @@ public class CompactAction extends TableActionBase {
         build(env);
         execute(env, "Compact job");
     }
+
+    public List<Map<String, String>> getPartitions() {
+        return partitions;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 3af89e8bf..b74bd8b2f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -41,8 +42,29 @@ public class CompactActionFactory implements ActionFactory {
 
         Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
 
-        CompactAction action =
-                new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig);
+        CompactAction action;
+        if (params.has("order-strategy")) {
+            SortCompactAction sortCompactAction =
+                    new SortCompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2, catalogConfig);
+
+            String strategy = params.get("order-strategy");
+            sortCompactAction.withOrderStrategy(strategy);
+
+            if (params.has("order-by")) {
+                String sqlOrderBy = params.get("order-by");
+                if (sqlOrderBy == null) {
+                    throw new IllegalArgumentException("Please specify 
\"order-by\".");
+                }
+                
sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(",")));
+            } else {
+                throw new IllegalArgumentException(
+                        "Please specify order columns in parameter 
--order-by.");
+            }
+
+            action = sortCompactAction;
+        } else {
+            action = new CompactAction(tablePath.f0, tablePath.f1, 
tablePath.f2, catalogConfig);
+        }
 
         if (params.has("partition")) {
             List<Map<String, String>> partitions = getPartitions(params);
@@ -61,7 +83,9 @@ public class CompactActionFactory implements ActionFactory {
         System.out.println("Syntax:");
         System.out.println(
                 "  compact --warehouse <warehouse-path> --database 
<database-name> "
-                        + "--table <table-name> [--partition 
<partition-name>]");
+                        + "--table <table-name> [--partition <partition-name>]"
+                        + "[--order-strategy <order-strategy>]"
+                        + "[--order-by <order-columns>]");
         System.out.println(
                 "  compact --warehouse s3://path/to/warehouse --database 
<database-name> "
                         + "--table <table-name> [--catalog-conf 
<paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]");
@@ -70,6 +94,12 @@ public class CompactActionFactory implements ActionFactory {
 
         System.out.println("Partition name syntax:");
         System.out.println("  key1=value1,key2=value2,...");
+
+        System.out.println();
+        System.out.println("Note:");
+        System.out.println(
+                "  order compact now only support append-only table with 
bucket=-1, please don't specify --order-strategy parameter if your table does 
not meet the request");
+        System.out.println("  order-strategy now only support zorder in batch 
mode");
         System.out.println();
 
         System.out.println("Examples:");
@@ -84,6 +114,8 @@ public class CompactActionFactory implements ActionFactory {
                 "  compact --warehouse s3:///path/to/warehouse "
                         + "--database test_db "
                         + "--table test_table "
+                        + "--order-strategy zorder "
+                        + "--order-by a,b,c "
                         + "--catalog-conf s3.endpoint=https://****.com "
                         + "--catalog-conf s3.access-key=***** "
                         + "--catalog-conf s3.secret-key=***** ");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
new file mode 100644
index 000000000..ff5e1a90d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compact with sort action. */
+public class SortCompactAction extends CompactAction {
+
+    private String sortStrategy;
+    private List<String> orderColumns;
+
+    public SortCompactAction(
+            String warehouse,
+            String database,
+            String tableName,
+            Map<String, String> catalogConfig) {
+        super(warehouse, database, tableName, catalogConfig);
+
+        checkArgument(
+                table instanceof AppendOnlyFileStoreTable,
+                "Only sort compaction works with append-only table for now.");
+        table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
+    }
+
+    @Override
+    public void run() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        build(env);
+        execute(env, "Sort Compact Job");
+    }
+
+    public void build(StreamExecutionEnvironment env) {
+        // only support batch sort yet
+        if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                != RuntimeExecutionMode.BATCH) {
+            throw new IllegalArgumentException(
+                    "Only support batch mode yet, please set 
-Dexecution.runtime-mode=BATCH");
+        }
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
+            throw new IllegalArgumentException("Sort Compact only supports 
append-only table yet");
+        }
+        if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
+            throw new IllegalArgumentException("Sort Compact only supports 
bucket=-1 yet.");
+        }
+        Map<String, String> tableConfig = fileStoreTable.options();
+        FlinkSourceBuilder sourceBuilder =
+                new FlinkSourceBuilder(
+                        ObjectIdentifier.of(
+                                catalogName,
+                                identifier.getDatabaseName(),
+                                identifier.getObjectName()),
+                        fileStoreTable);
+
+        if (getPartitions() != null) {
+            Predicate partitionPredicate =
+                    PredicateBuilder.or(
+                            getPartitions().stream()
+                                    .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
+                                    .toArray(Predicate[]::new));
+            sourceBuilder.withPredicate(partitionPredicate);
+        }
+
+        String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (scanParallelism != null) {
+            sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
+        }
+
+        DataStream<RowData> source = 
sourceBuilder.withEnv(env).withContinuousMode(false).build();
+        TableSorter sorter =
+                TableSorter.getSorter(env, source, fileStoreTable, 
sortStrategy, orderColumns);
+        DataStream<RowData> sorted = sorter.sort();
+
+        FlinkSinkBuilder flinkSinkBuilder = new 
FlinkSinkBuilder(fileStoreTable);
+        flinkSinkBuilder.withInput(sorted).withOverwritePartition(new 
HashMap<>());
+        String sinkParallelism = 
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+        if (sinkParallelism != null) {
+            
flinkSinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+        }
+
+        flinkSinkBuilder.build();
+    }
+
+    public void withOrderStrategy(String sortStrategy) {
+        this.sortStrategy = sortStrategy;
+    }
+
+    public void withOrderColumns(List<String> orderColumns) {
+        this.orderColumns = orderColumns;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 84114d1c6..3b8147a3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -21,22 +21,15 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.utils.TableEnvironmentUtils;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypeCasts;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,18 +37,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 /** Abstract base of {@link Action} for table. */
 public abstract class TableActionBase extends ActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TableActionBase.class);
 
-    protected final StreamExecutionEnvironment env;
-    protected final StreamTableEnvironment batchTEnv;
-    protected final Identifier identifier;
-
     protected Table table;
+    protected final Identifier identifier;
 
     TableActionBase(
             String warehouse,
@@ -63,12 +52,6 @@ public abstract class TableActionBase extends ActionBase {
             String tableName,
             Map<String, String> catalogConfig) {
         super(warehouse, catalogConfig);
-        env = StreamExecutionEnvironment.getExecutionEnvironment();
-        batchTEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
-
-        // register flink catalog to table environment
-        batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
-        batchTEnv.useCatalog(flinkCatalog.getName());
         identifier = new Identifier(databaseName, tableName);
         try {
             table = catalog.getTable(identifier);
@@ -79,36 +62,6 @@ public abstract class TableActionBase extends ActionBase {
         }
     }
 
-    /**
-     * Extract {@link LogicalType}s from Flink {@link 
org.apache.flink.table.types.DataType}s and
-     * convert to Paimon {@link DataType}s.
-     */
-    protected List<DataType> toPaimonTypes(
-            List<org.apache.flink.table.types.DataType> flinkDataTypes) {
-        return flinkDataTypes.stream()
-                .map(org.apache.flink.table.types.DataType::getLogicalType)
-                .map(LogicalTypeConversion::toDataType)
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * Check whether each {@link DataType} of actualTypes is compatible with 
that of expectedTypes
-     * respectively.
-     */
-    protected boolean compatibleCheck(List<DataType> actualTypes, 
List<DataType> expectedTypes) {
-        if (actualTypes.size() != expectedTypes.size()) {
-            return false;
-        }
-
-        for (int i = 0; i < actualTypes.size(); i++) {
-            if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), 
expectedTypes.get(i))) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
     /** Sink {@link DataStream} dataStream to table with Flink Table API in 
batch environment. */
     protected void batchSink(DataStream<RowData> dataStream) {
         List<Transformation<?>> transformations =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
new file mode 100644
index 000000000..d07615e93
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.shuffle;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+/**
+ * RangeShuffle Util to shuffle the input stream by the sampling range. See 
`rangeShuffleBykey`
+ * method how to build the topo.
+ */
+public class RangeShuffle {
+
+    /**
+     * The RelNode with range-partition distribution will create the following 
transformations.
+     *
+     * <p>Explanation of the following figure: "[LSample, n]" means operator 
is LSample and
+     * parallelism is n, "LSample" means LocalSampleOperator, "GSample" means 
GlobalSampleOperator,
+     * "ARange" means AssignRangeId, "RRange" means RemoveRangeId.
+     *
+     * <pre>{@code
+     * [IN,n]->[LSample,n]->[GSample,1]-BROADCAST
+     *    \                                    \
+     *     
-----------------------------BATCH-[ARange,n]-PARTITION->[RRange,m]->
+     * }</pre>
+     *
+     * <p>The streams except the sample and histogram process stream will been 
blocked, so the the
+     * sample and histogram process stream does not care about 
requiredExchangeMode.
+     */
+    public static <T> DataStream<Pair<T, RowData>> rangeShuffleByKey(
+            DataStream<Pair<T, RowData>> inputDataStream,
+            Comparator<T> keyComparator,
+            Class<T> keyClass,
+            int sampleSize,
+            int rangeNum) {
+        Transformation<Pair<T, RowData>> input = 
inputDataStream.getTransformation();
+
+        OneInputTransformation<Pair<T, RowData>, T> keyInput =
+                new OneInputTransformation<>(
+                        input,
+                        "ABSTRACT KEY",
+                        new StreamMap<>(Pair::getLeft),
+                        TypeInformation.of(keyClass),
+                        input.getParallelism());
+
+        // 1. Fixed size sample in each partitions.
+        OneInputTransformation<T, Tuple2<Double, T>> localSample =
+                new OneInputTransformation<>(
+                        keyInput,
+                        "LOCAL SAMPLE",
+                        new LocalSampleOperator<>(sampleSize),
+                        new TupleTypeInfo<>(
+                                BasicTypeInfo.DOUBLE_TYPE_INFO, 
TypeInformation.of(keyClass)),
+                        keyInput.getParallelism());
+
+        // 2. Collect all the samples and gather them into a sorted key range.
+        OneInputTransformation<Tuple2<Double, T>, List<T>> sampleAndHistogram =
+                new OneInputTransformation<>(
+                        localSample,
+                        "GLOBAL SAMPLE",
+                        new GlobalSampleOperator<>(sampleSize, keyComparator, 
rangeNum),
+                        new ListTypeInfo<>(TypeInformation.of(keyClass)),
+                        1);
+
+        // 3. Take range boundaries as broadcast input and take the tuple of 
partition id and
+        // record as output.
+        // The shuffle mode of input edge must be BATCH to avoid dead lock. See
+        // DeadlockBreakupProcessor.
+        TwoInputTransformation<List<T>, Pair<T, RowData>, Tuple2<Integer, 
Pair<T, RowData>>>
+                preparePartition =
+                        new TwoInputTransformation<>(
+                                new PartitionTransformation<>(
+                                        sampleAndHistogram,
+                                        new BroadcastPartitioner<>(),
+                                        StreamExchangeMode.BATCH),
+                                new PartitionTransformation<>(
+                                        input,
+                                        new ForwardPartitioner<>(),
+                                        StreamExchangeMode.BATCH),
+                                "ASSIGN RANGE INDEX",
+                                new AssignRangeIndexOperator<>(keyComparator),
+                                new TupleTypeInfo<>(
+                                        BasicTypeInfo.INT_TYPE_INFO, 
input.getOutputType()),
+                                input.getParallelism());
+
+        // 4. Remove the partition id. (shuffle according range partition)
+        return new DataStream<>(
+                inputDataStream.getExecutionEnvironment(),
+                new OneInputTransformation<>(
+                        new PartitionTransformation<>(
+                                preparePartition,
+                                new CustomPartitionerWrapper<>(
+                                        new 
AssignRangeIndexOperator.RangePartitioner(rangeNum),
+                                        new 
AssignRangeIndexOperator.Tuple2KeySelector<>()),
+                                StreamExchangeMode.BATCH),
+                        "REMOVE KEY",
+                        new RemoveRangeIndexOperator<>(),
+                        input.getOutputType(),
+                        input.getParallelism()));
+    }
+
+    /**
+     * LocalSampleOperator wraps the sample logic on the partition side (the 
first phase of
+     * distributed sample algorithm). Outputs sampled weight with record.
+     *
+     * <p>See {@link Sampler}.
+     */
+    @Internal
+    public static class LocalSampleOperator<T> extends 
TableStreamOperator<Tuple2<Double, T>>
+            implements OneInputStreamOperator<T, Tuple2<Double, T>>, 
BoundedOneInput {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int numSample;
+
+        private transient Collector<Tuple2<Double, T>> collector;
+        private transient Sampler<T> sampler;
+
+        public LocalSampleOperator(int numSample) {
+            this.numSample = numSample;
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            this.collector = new StreamRecordCollector<>(output);
+            sampler = new Sampler<>(numSample, System.nanoTime());
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> streamRecord) throws 
Exception {
+            sampler.collect(streamRecord.getValue());
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            Iterator<Tuple2<Double, T>> sampled = sampler.sample();
+            while (sampled.hasNext()) {
+                collector.collect(sampled.next());
+            }
+        }
+    }
+
+    /**
+     * Global sample for range partition. Inputs weight with record. Outputs 
list of sampled record.
+     *
+     * <p>See {@link Sampler}.
+     */
+    @Internal
+    public static class GlobalSampleOperator<T> extends 
TableStreamOperator<List<T>>
+            implements OneInputStreamOperator<Tuple2<Double, T>, List<T>>, 
BoundedOneInput {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int numSample;
+        private final int rangesNum;
+        private final Comparator<T> keyComparator;
+
+        private transient Collector<List<T>> collector;
+        private transient Sampler<T> sampler;
+
+        public GlobalSampleOperator(int numSample, Comparator<T> comparator, 
int rangesNum) {
+            this.numSample = numSample;
+            this.keyComparator = comparator;
+            this.rangesNum = rangesNum;
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            //noinspection unchecked
+            this.sampler = new Sampler<>(numSample, 0L);
+            this.collector = new StreamRecordCollector<>(output);
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Double, T>> record) 
throws Exception {
+            Tuple2<Double, T> tuple = record.getValue();
+            sampler.collect(tuple.f0, tuple.f1);
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            Iterator<Tuple2<Double, T>> sampled = sampler.sample();
+
+            List<T> sampledData = new ArrayList<>();
+            while (sampled.hasNext()) {
+                sampledData.add(sampled.next().f1);
+            }
+
+            sampledData.sort(keyComparator);
+
+            int boundarySize = rangesNum - 1;
+            T[] boundaries = (T[]) new Object[boundarySize];
+            if (sampledData.size() > 0) {
+                double avgRange = sampledData.size() / (double) rangesNum;
+                for (int i = 1; i < rangesNum; i++) {
+                    T record = sampledData.get((int) (i * avgRange));
+                    boundaries[i - 1] = record;
+                }
+            }
+
+            collector.collect(Arrays.asList(boundaries));
+        }
+    }
+
+    /**
+     * This two-input-operator require a input with RangeBoundaries as 
broadcast input, and generate
+     * Tuple2 which includes range index and record from the other input 
itself as output.
+     */
+    @Internal
+    public static class AssignRangeIndexOperator<T>
+            extends TableStreamOperator<Tuple2<Integer, Pair<T, RowData>>>
+            implements TwoInputStreamOperator<
+                            List<T>, Pair<T, RowData>, Tuple2<Integer, Pair<T, 
RowData>>>,
+                    InputSelectable {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient List<T> boundaries;
+        private transient Collector<Tuple2<Integer, Pair<T, RowData>>> 
collector;
+
+        private final Comparator<T> keyComparator;
+
+        public AssignRangeIndexOperator(Comparator<T> comparator) {
+            this.keyComparator = comparator;
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            this.collector = new StreamRecordCollector<>(output);
+        }
+
+        @Override
+        public void processElement1(StreamRecord<List<T>> streamRecord) throws 
Exception {
+            this.boundaries = streamRecord.getValue();
+        }
+
+        @Override
+        public void processElement2(StreamRecord<Pair<T, RowData>> 
streamRecord) throws Exception {
+            if (boundaries == null) {
+                throw new RuntimeException("There should be one data from the 
first input.");
+            }
+            Pair<T, RowData> row = streamRecord.getValue();
+            collector.collect(new Tuple2<>(binarySearch(row.getLeft()), row));
+        }
+
+        @Override
+        public InputSelection nextSelection() {
+            return boundaries == null ? InputSelection.FIRST : 
InputSelection.ALL;
+        }
+
+        private int binarySearch(T key) {
+            int low = 0;
+            int high = this.boundaries.size() - 1;
+
+            while (low <= high) {
+                final int mid = (low + high) >>> 1;
+                final int result = keyComparator.compare(key, 
this.boundaries.get(mid));
+
+                if (result > 0) {
+                    low = mid + 1;
+                } else if (result < 0) {
+                    high = mid - 1;
+                } else {
+                    return mid;
+                }
+            }
+            // key not found, but the low index is the target
+            // bucket, since the boundaries are the upper bound
+            return low;
+        }
+
+        /** A {@link KeySelector} to select by f0 of tuple2. */
+        public static class Tuple2KeySelector<T>
+                implements KeySelector<Tuple2<Integer, Pair<T, RowData>>, 
Integer> {
+
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Integer getKey(Tuple2<Integer, Pair<T, RowData>> tuple2) 
throws Exception {
+                return tuple2.f0;
+            }
+        }
+
+        /** A {@link Partitioner} to partition by id with range. */
+        public static class RangePartitioner implements Partitioner<Integer> {
+
+            private static final long serialVersionUID = 1L;
+
+            private final int totalRangeNum;
+
+            public RangePartitioner(int totalRangeNum) {
+                this.totalRangeNum = totalRangeNum;
+            }
+
+            @Override
+            public int partition(Integer key, int numPartitions) {
+                Preconditions.checkArgument(
+                        numPartitions < totalRangeNum,
+                        "Num of subPartitions should < totalRangeNum: " + 
totalRangeNum);
+                int partition = key / (totalRangeNum / numPartitions);
+                return Math.min(numPartitions - 1, partition);
+            }
+        }
+    }
+
+    /** Remove the range index and return the actual record. */
+    @Internal
+    public static class RemoveRangeIndexOperator<T> extends 
TableStreamOperator<Pair<T, RowData>>
+            implements OneInputStreamOperator<Tuple2<Integer, Pair<T, 
RowData>>, Pair<T, RowData>> {
+
+        private static final long serialVersionUID = 1L;
+
+        private transient Collector<Pair<T, RowData>> collector;
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            this.collector = new StreamRecordCollector<>(output);
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Integer, Pair<T, 
RowData>>> streamRecord)
+                throws Exception {
+            collector.collect(streamRecord.getValue().f1);
+        }
+    }
+
+    /**
+     * A simple in memory implementation Sampling, and with only one pass 
through the input
+     * iteration whose size is unpredictable. The basic idea behind this 
sampler implementation is
+     * to generate a random number for each input element as its weight, 
select the top K elements
+     * with max weight. As the weights are generated randomly, so are the 
selected top K elements.
+     * In the first phase, we generate random numbers as the weights for each 
element and select top
+     * K elements as the output of each partitions. In the second phase, we 
select top K elements
+     * from all the outputs of the first phase.
+     *
+     * <p>This implementation refers to the algorithm described in <a
+     * href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">"Optimal Random 
Sampling from
+     * Distributed Streams Revisited"</a>.
+     */
+    @Internal
+    public static class Sampler<T> {
+
+        private final int numSamples;
+        private final Random random;
+        private final PriorityQueue<Tuple2<Double, T>> queue;
+
+        private int index = 0;
+        private Tuple2<Double, T> smallest = null;
+
+        /**
+         * Create a new sampler with reservoir size and a supplied random 
number generator.
+         *
+         * @param numSamples Maximum number of samples to retain in reservoir, 
must be non-negative.
+         */
+        Sampler(int numSamples, long seed) {
+            Preconditions.checkArgument(numSamples >= 0, "numSamples should be 
non-negative.");
+            this.numSamples = numSamples;
+            this.random = new XORShiftRandom(seed);
+            this.queue = new PriorityQueue<>(numSamples, 
Comparator.comparingDouble(o -> o.f0));
+        }
+
+        void collect(T rowData) {
+            collect(random.nextDouble(), rowData);
+        }
+
+        void collect(double weight, T key) {
+            if (index < numSamples) {
+                // Fill the queue with first K elements from input.
+                addQueue(weight, key);
+            } else {
+                // Remove the element with the smallest weight,
+                // and append current element into the queue.
+                if (weight > smallest.f0) {
+                    queue.remove();
+                    addQueue(weight, key);
+                }
+            }
+            index++;
+        }
+
+        private void addQueue(double weight, T row) {
+            queue.add(new Tuple2<>(weight, row));
+            smallest = queue.peek();
+        }
+
+        Iterator<Tuple2<Double, T>> sample() {
+            return queue.iterator();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
new file mode 100644
index 000000000..d51cf1833
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
+/** SortOperator to sort the `InternalRow`s by the `KeyType`. */
+public class SortOperator extends TableStreamOperator<InternalRow>
+        implements OneInputStreamOperator<InternalRow, InternalRow>, 
BoundedOneInput {
+
+    private final RowType keyRowType;
+    private final RowType valueRowType;
+    private final long maxMemory;
+    private final int pageSize;
+    private final int arity;
+    private transient BinaryExternalSortBuffer buffer;
+
+    public SortOperator(RowType keyType, RowType valueRowType, long maxMemory, 
int pageSize) {
+        this.keyRowType = keyType;
+        this.valueRowType = valueRowType;
+        this.maxMemory = maxMemory;
+        this.pageSize = pageSize;
+        this.arity = keyType.getFieldCount() + valueRowType.getFieldCount();
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        List<DataField> keyFields = keyRowType.getFields();
+        List<DataField> dataFields = valueRowType.getFields();
+
+        List<DataField> fields = new ArrayList<>();
+        fields.addAll(keyFields);
+        fields.addAll(dataFields);
+
+        RowType rowType = new RowType(fields);
+
+        InternalRowSerializer serializer = InternalSerializers.create(rowType);
+        NormalizedKeyComputer normalizedKeyComputer =
+                CodeGenUtils.newNormalizedKeyComputer(
+                        rowType.getFieldTypes(), "MemTableKeyComputer");
+        RecordComparator keyComparator =
+                CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), 
"MemTableComparator");
+
+        MemorySegmentPool memoryPool = new HeapMemorySegmentPool(maxMemory, 
pageSize);
+
+        BinaryInMemorySortBuffer inMemorySortBuffer =
+                BinaryInMemorySortBuffer.createBuffer(
+                        normalizedKeyComputer, serializer, keyComparator, 
memoryPool);
+
+        Configuration jobConfig = getContainingTask().getJobConfiguration();
+
+        buffer =
+                new BinaryExternalSortBuffer(
+                        new BinaryRowSerializer(serializer.getArity()),
+                        keyComparator,
+                        memoryPool.pageSize(),
+                        inMemorySortBuffer,
+                        new 
IOManagerImpl(splitPaths(jobConfig.get(CoreOptions.TMP_DIRS))),
+                        jobConfig.getInteger(
+                                
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        if (buffer.size() > 0) {
+            MutableObjectIterator<BinaryRow> iterator = 
buffer.sortedIterator();
+            BinaryRow binaryRow = new BinaryRow(arity);
+            while ((binaryRow = iterator.next(binaryRow)) != null) {
+                output.collect(new StreamRecord<>(binaryRow));
+            }
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<InternalRow> element) throws 
Exception {
+        buffer.write(element.getValue());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
new file mode 100644
index 000000000..1cf3572bc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.action.SortCompactAction;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** An abstract TableSorter for {@link SortCompactAction}. */
+public abstract class TableSorter {
+
+    protected final StreamExecutionEnvironment batchTEnv;
+    protected final DataStream<RowData> origin;
+    protected final FileStoreTable table;
+    protected final List<String> orderColNames;
+
+    public TableSorter(
+            StreamExecutionEnvironment batchTEnv,
+            DataStream<RowData> origin,
+            FileStoreTable table,
+            List<String> orderColNames) {
+        this.batchTEnv = batchTEnv;
+        this.origin = origin;
+        this.table = table;
+        this.orderColNames = orderColNames;
+        checkColNames();
+    }
+
+    private void checkColNames() {
+        if (orderColNames.size() < 1) {
+            throw new IllegalArgumentException("order column names should not 
be empty.");
+        }
+        List<String> columnNames = table.rowType().getFieldNames();
+        for (String zColumn : orderColNames) {
+            if (!columnNames.contains(zColumn)) {
+                throw new RuntimeException(
+                        "Can't find column "
+                                + zColumn
+                                + " in table columns. Possible columns are ["
+                                + columnNames.stream().reduce((a, b) -> a + 
"," + b).get()
+                                + "]");
+            }
+        }
+    }
+
+    public abstract DataStream<RowData> sort();
+
+    public static TableSorter getSorter(
+            StreamExecutionEnvironment batchTEnv,
+            DataStream<RowData> origin,
+            FileStoreTable fileStoreTable,
+            String sortStrategy,
+            List<String> orderColumns) {
+        switch (OrderType.of(sortStrategy)) {
+            case ORDER:
+                // todo support alphabetical order
+                throw new IllegalArgumentException("Not supported yet.");
+            case ZORDER:
+                return new ZorderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
+            case HILBERT:
+                // todo support hilbert curve
+                throw new IllegalArgumentException("Not supported yet.");
+            default:
+                throw new IllegalArgumentException("cannot match order type: " 
+ sortStrategy);
+        }
+    }
+
+    enum OrderType {
+        ORDER("order"),
+        ZORDER("zorder"),
+        HILBERT("hilbert");
+
+        private final String orderType;
+
+        OrderType(String orderType) {
+            this.orderType = orderType;
+        }
+
+        @Override
+        public String toString() {
+            return "order type: " + orderType;
+        }
+
+        public static OrderType of(String orderType) {
+            if (ORDER.orderType.equals(orderType)) {
+                return ORDER;
+            } else if (ZORDER.orderType.equals(orderType)) {
+                return ZORDER;
+            } else if (HILBERT.orderType.equals(orderType)) {
+                return HILBERT;
+            }
+
+            throw new IllegalArgumentException("cannot match type: " + 
orderType + " for ordering");
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
new file mode 100644
index 000000000..465ab89d0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.action.SortCompactAction;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/**
+ * This is a table sorter which will sort the records by the z-order of 
specified columns. It works
+ * on stream api. It computes the z-order-index by {@link ZIndexer}. After add 
the column of
+ * z-order, it does the range shuffle and sort. Finally, {@link 
SortCompactAction} will remove the
+ * "z-order" column and insert sorted record to overwrite the origin table.
+ */
+public class ZorderSorter extends TableSorter {
+
+    public ZorderSorter(
+            StreamExecutionEnvironment batchTEnv,
+            DataStream<RowData> origin,
+            FileStoreTable table,
+            List<String> zOrderColNames) {
+        super(batchTEnv, origin, table, zOrderColNames);
+    }
+
+    @Override
+    public DataStream<RowData> sort() {
+        return sortStreamByZOrder(origin, table);
+    }
+
+    /**
+     * Sort the input stream by the given order columns with z-order.
+     *
+     * @param inputStream the stream waited to be sorted
+     * @return the sorted data stream
+     */
+    private DataStream<RowData> sortStreamByZOrder(
+            DataStream<RowData> inputStream, FileStoreTable table) {
+        ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
+        return ZorderSorterUtils.sortStreamByZorder(inputStream, zIndexer, 
table);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
new file mode 100644
index 000000000..7e6c78192
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.shuffle.RangeShuffle;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.Pair;
+
+import 
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * This is a table sorter which will sort the records by the z-order of the 
z-indexer generates. It
+ * is a global sort method, we will shuffle the input stream through z-order. 
After sorted, we
+ * convert datastream from paimon RowData back to Flink RowData
+ *
+ * <pre>
+ *                         toPaimonDataStream                        add 
z-order column                             range shuffle by z-order             
                    local sort                                              
remove z-index
+ * DataStream[RowData] -------------------> DataStream[PaimonRowData] 
-------------------> DataStream[PaimonRowData] -------------------------> 
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData 
sorted] ----------------------------> DataStream[RowData sorted]
+ *                                                                             
                                                                                
                                                                   back to 
flink RowData
+ * </pre>
+ */
+public class ZorderSorterUtils {
+
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", 
DataTypes.BYTES())));
+
+    /**
+     * Sort the input stream by z-order.
+     *
+     * @param inputStream the stream wait to be ordered
+     * @param zIndexer generate z-index by the given row
+     * @param table the FileStoreTable
+     * @return
+     */
+    public static DataStream<RowData> sortStreamByZorder(
+            DataStream<RowData> inputStream, ZIndexer zIndexer, FileStoreTable 
table) {
+
+        final RowType valueRowType = table.rowType();
+        final int fieldCount = valueRowType.getFieldCount();
+        final int parallelism = inputStream.getParallelism();
+        final int sampleSize = parallelism * 1000;
+        final int rangeNum = parallelism * 10;
+        final long maxSortMemory = table.coreOptions().writeBufferSize();
+        final int pageSize = table.coreOptions().pageSize();
+
+        // generate the z-index as the key of Pair.
+        DataStream<Pair<byte[], RowData>> inputWithKey =
+                inputStream
+                        .map(
+                                new RichMapFunction<RowData, Pair<byte[], 
RowData>>() {
+
+                                    @Override
+                                    public void open(Configuration parameters) 
throws Exception {
+                                        super.open(parameters);
+                                        zIndexer.open();
+                                    }
+
+                                    @Override
+                                    public Pair<byte[], RowData> map(RowData 
value) {
+                                        byte[] zorder = zIndexer.index(new 
FlinkRowWrapper(value));
+                                        return Pair.of(Arrays.copyOf(zorder, 
zorder.length), value);
+                                    }
+                                })
+                        .setParallelism(parallelism);
+
+        // range shuffle by z-index key
+        return RangeShuffle.rangeShuffleByKey(
+                        inputWithKey,
+                        (Comparator<byte[]> & Serializable)
+                                (b1, b2) -> {
+                                    assert b1.length == b2.length;
+                                    for (int i = 0; i < b1.length; i++) {
+                                        int ret = UnsignedBytes.compare(b1[i], 
b2[i]);
+                                        if (ret != 0) {
+                                            return ret;
+                                        }
+                                    }
+                                    return 0;
+                                },
+                        byte[].class,
+                        sampleSize,
+                        rangeNum)
+                .map(
+                        a ->
+                                new JoinedRow(
+                                        GenericRow.of(a.getLeft()),
+                                        new FlinkRowWrapper(a.getRight())),
+                        TypeInformation.of(InternalRow.class))
+                .setParallelism(parallelism)
+                // sort the output locally by `SortOperator`
+                .transform(
+                        "LOCAL SORT",
+                        TypeInformation.of(InternalRow.class),
+                        new SortOperator(KEY_TYPE, valueRowType, 
maxSortMemory, pageSize))
+                .setParallelism(parallelism)
+                // remove the z-index column from every row
+                .map(
+                        new RichMapFunction<InternalRow, InternalRow>() {
+
+                            private transient KeyProjectedRow keyProjectedRow;
+
+                            @Override
+                            public void open(Configuration parameters) throws 
Exception {
+                                int[] map = new int[fieldCount];
+                                for (int i = 0; i < map.length; i++) {
+                                    map[i] = i + 1;
+                                }
+                                keyProjectedRow = new KeyProjectedRow(map);
+                            }
+
+                            @Override
+                            public InternalRow map(InternalRow value) throws 
Exception {
+                                return keyProjectedRow.replaceRow(value);
+                            }
+                        })
+                .setParallelism(parallelism)
+                .map(FlinkRowData::new, inputStream.getType())
+                .setParallelism(parallelism);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 63e24026f..2fb4ee2e4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -35,7 +35,7 @@ import java.util.Map;
 import java.util.Queue;
 
 /**
- * Pre-calculate which splits each task should process according to the 
weight, and then distribute
+ * Pre-calculate which splits each task should zvalue according to the weight, 
and then distribute
  * the splits fairly.
  */
 public class PreAssignSplitAssigner implements SplitAssigner {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
new file mode 100644
index 000000000..256a5460f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/** Order Rewrite Action tests for {@link SortCompactAction}. */
+public class OrderRewriteActionITCase extends ActionITCaseBase {
+
+    private static final Random random = new Random();
+
+    private Catalog catalog;
+    @TempDir private java.nio.file.Path path;
+
+    private void prepareData(int size, int loop) throws Exception {
+        createTable();
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (int i = 0; i < loop; i++) {
+            commitMessages.addAll(writeData(size));
+        }
+        commit(commitMessages);
+    }
+
+    @Test
+    public void testAllBasicTypeWorksWithZorder() throws Exception {
+        new CompactActionFactory().printHelp();
+        prepareData(300, 1);
+        // All the basic types should support zorder
+        Assertions.assertThatCode(
+                        () ->
+                                zorder(
+                                        Arrays.asList(
+                                                "f0", "f1", "f2", "f3", "f4", 
"f5", "f6", "f7",
+                                                "f8", "f9", "f10", "f11", 
"f12", "f13", "f14",
+                                                "f15")))
+                .doesNotThrowAnyException();
+    }
+
+    @Test
+    public void testZorderActionWorks() throws Exception {
+        prepareData(300, 30);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 100, 200);
+
+        List<ManifestEntry> files =
+                ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+        List<ManifestEntry> filesFilter =
+                ((AppendOnlyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        // before zorder, we don't filter any file
+        Assertions.assertThat(files.size()).isEqualTo(filesFilter.size());
+
+        zorder(Arrays.asList("f2", "f1"));
+
+        files = ((AppendOnlyFileStoreTable) 
getTable()).store().newScan().plan().files();
+        filesFilter =
+                ((AppendOnlyFileStoreTable) getTable())
+                        .store()
+                        .newScan()
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+        System.out.println("before: " + files.size() + " after: " + 
filesFilter.size());
+    }
+
+    private void zorder(List<String> columns) throws Exception {
+        SortCompactAction sortCompactAction =
+                new SortCompactAction(
+                        new Path(path.toUri()).toUri().toString(),
+                        "my_db",
+                        "Orders1",
+                        Collections.emptyMap());
+        sortCompactAction.withOrderStrategy("zorder");
+        sortCompactAction.withOrderColumns(columns);
+        sortCompactAction.run();
+    }
+
+    public Catalog getCatalog() {
+        if (catalog == null) {
+            Options options = new Options();
+            options.set(CatalogOptions.WAREHOUSE, new 
Path(path.toUri()).toUri().toString());
+            catalog = 
CatalogFactory.createCatalog(CatalogContext.create(options));
+        }
+        return catalog;
+    }
+
+    public void createTable() throws Exception {
+        getCatalog().createDatabase("my_db", true);
+        getCatalog().createTable(identifier(), schema(), true);
+    }
+
+    public Identifier identifier() {
+        return Identifier.create("my_db", "Orders1");
+    }
+
+    private void commit(List<CommitMessage> messages) throws Exception {
+        getTable().newBatchWriteBuilder().newCommit().commit(messages);
+    }
+
+    // schema with all the basic types.
+    private static Schema schema() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.TINYINT());
+        schemaBuilder.column("f1", DataTypes.INT());
+        schemaBuilder.column("f2", DataTypes.SMALLINT());
+        schemaBuilder.column("f3", DataTypes.STRING());
+        schemaBuilder.column("f4", DataTypes.DOUBLE());
+        schemaBuilder.column("f5", DataTypes.CHAR(10));
+        schemaBuilder.column("f6", DataTypes.VARCHAR(10));
+        schemaBuilder.column("f7", DataTypes.BOOLEAN());
+        schemaBuilder.column("f8", DataTypes.DATE());
+        schemaBuilder.column("f9", DataTypes.TIME());
+        schemaBuilder.column("f10", DataTypes.TIMESTAMP());
+        schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2));
+        schemaBuilder.column("f12", DataTypes.BYTES());
+        schemaBuilder.column("f13", DataTypes.FLOAT());
+        schemaBuilder.column("f14", DataTypes.BINARY(10));
+        schemaBuilder.column("f15", DataTypes.VARBINARY(10));
+        schemaBuilder.option("bucket", "-1");
+        schemaBuilder.option("scan.parallelism", "6");
+        schemaBuilder.option("sink.parallelism", "3");
+        schemaBuilder.option("target-file-size", "1 M");
+        schemaBuilder.partitionKeys("f0");
+        return schemaBuilder.build();
+    }
+
+    private List<CommitMessage> writeData(int size) throws Exception {
+        List<CommitMessage> messages = new ArrayList<>();
+        for (int i = 0; i < 2; i++) {
+            messages.addAll(writeOnce(getTable(), i, size));
+        }
+
+        return messages;
+    }
+
+    public Table getTable() throws Exception {
+        return getCatalog().getTable(identifier());
+    }
+
+    private static List<CommitMessage> writeOnce(Table table, int p, int size) 
throws Exception {
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < size; i++) {
+                for (int j = 0; j < size; j++) {
+                    batchTableWrite.write(data(p, i, j));
+                }
+            }
+            return batchTableWrite.prepareCommit();
+        }
+    }
+
+    private static InternalRow data(int p, int i, int j) {
+        return GenericRow.of(
+                (byte) p,
+                j,
+                (short) i,
+                BinaryString.fromString(String.valueOf(j)),
+                0.1 + i,
+                BinaryString.fromString(String.valueOf(j)),
+                BinaryString.fromString(String.valueOf(i)),
+                j % 2 == 1,
+                i,
+                j,
+                Timestamp.fromEpochMillis(i),
+                Decimal.zero(10, 2),
+                String.valueOf(i).getBytes(),
+                (float) 0.1 + j,
+                randomBytes(),
+                randomBytes());
+    }
+
+    private static byte[] randomBytes() {
+        byte[] binary = new byte[random.nextInt(10)];
+        random.nextBytes(binary);
+        return binary;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 4db0d4111..08e182464 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -43,7 +43,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Fail.fail;
 
 /** Tests for {@link KafkaLogStoreRegister}. */
-public class KafkaLogStoreRegisterTest extends KafkaTableTestBase {
+public class KafkaLogStoreRegisterITCase extends KafkaTableTestBase {
     private static final String DATABASE = "mock_db";
 
     private static final String TABLE = "mock_table";

Reply via email to