This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0dbcff69f [core] support z-order range sort action (#1846)
0dbcff69f is described below
commit 0dbcff69f781341be10772a093578191aa4b82b1
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 23 21:00:21 2023 +0800
[core] support z-order range sort action (#1846)
---
LICENSE | 2 +
docs/content/concepts/append-only-table.md | 21 +
.../layouts/shortcodes/generated/sort-compact.html | 55 +++
.../org/apache/paimon/sort/zorder/ZIndexer.java | 398 ++++++++++++++++++
.../org/apache/paimon/utils/ZOrderByteUtils.java | 242 +++++++++++
.../apache/paimon/sort/zorder/ZIndexerTest.java | 70 ++++
.../apache/paimon/utils/TestZOrderByteUtil.java | 426 ++++++++++++++++++++
.../org/apache/paimon/flink/action/ActionBase.java | 46 +++
.../apache/paimon/flink/action/CompactAction.java | 4 +
.../paimon/flink/action/CompactActionFactory.java | 38 +-
.../paimon/flink/action/SortCompactAction.java | 132 ++++++
.../paimon/flink/action/TableActionBase.java | 49 +--
.../apache/paimon/flink/shuffle/RangeShuffle.java | 448 +++++++++++++++++++++
.../apache/paimon/flink/sorter/SortOperator.java | 124 ++++++
.../apache/paimon/flink/sorter/TableSorter.java | 117 ++++++
.../apache/paimon/flink/sorter/ZorderSorter.java | 63 +++
.../paimon/flink/sorter/ZorderSorterUtils.java | 157 ++++++++
.../source/assigners/PreAssignSplitAssigner.java | 2 +-
.../flink/action/OrderRewriteActionITCase.java | 229 +++++++++++
...rTest.java => KafkaLogStoreRegisterITCase.java} | 2 +-
20 files changed, 2572 insertions(+), 53 deletions(-)
diff --git a/LICENSE b/LICENSE
index d3f258fa3..ef08963ea 100644
--- a/LICENSE
+++ b/LICENSE
@@ -234,6 +234,8 @@ from http://flink.apache.org/ version 1.17.0
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
+paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
+paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
from http://iceberg.apache.org/ version 1.3.0
paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
diff --git a/docs/content/concepts/append-only-table.md
b/docs/content/concepts/append-only-table.md
index cc469019d..f81a906d8 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -227,6 +227,27 @@ behavior is exactly the same as [Append For Qeueue]({{<
ref "#compaction" >}}).
The auto compaction is only supported in Flink engine streaming mode. You can
also start a compaction job in flink by flink action in paimon
and disable all the other compaction by set `write-only`.
+### Sort Compact
+
+The data in a per-partition out of order will lead a slow select, compaction
may slow down the inserting. It is a good choice for you to set
+write-only for inserting job, and after per-partition data done, trigger a
partition `Sort Compact` action.
+
+You can trigger action by shell script:
+```shell
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
+ compact \
+ --warehouse hdfs:///path/to/warehouse \
+ --database test_db \
+ --table <tableName> \
+ --order-strategy <orderType> \
+ --order-by <col1,col2,...>
+```
+
+{{< generated/sort-compact >}}
+
+Other config is the same as [Compact Table]({{< ref
"concepts/file-operations#compact-table" >}})
+
### Streaming Source
Unaware-bucket mode append-only table supported streaming read and write, but
no longer guarantee order anymore. You cannot regard it
diff --git a/docs/layouts/shortcodes/generated/sort-compact.html
b/docs/layouts/shortcodes/generated/sort-compact.html
new file mode 100644
index 000000000..654904c3e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/sort-compact.html
@@ -0,0 +1,55 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+{{ $ref := ref . "maintenance/configurations.md" }}
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 15%">Configuration</th>
+ <th class="text-left" style="width: 85%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>--order-strategy</h5></td>
+ <td>the order strategy now only support zorder. For example:
--order-strategy zorder</td>
+ </tr>
+ <tr>
+ <td><h5>--order-by</h5></td>
+ <td>Specify the order columns. For example: --order-by col0, col1</td>
+ </tr>
+ </tbody>
+</table>
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
new file mode 100644
index 000000000..de281aae3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/zorder/ZIndexer.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static org.apache.paimon.utils.ZOrderByteUtils.NULL_BYTES;
+import static org.apache.paimon.utils.ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+
+/** Z-indexer for responsibility to generate z-index. */
+public class ZIndexer implements Serializable {
+
+ private final Set<RowProcessor> functionSet;
+ private final int[] fieldsIndex;
+ private final int totalBytes;
+ private transient ByteBuffer reuse;
+
+ public ZIndexer(RowType rowType, List<String> orderColumns) {
+ List<String> fields = rowType.getFieldNames();
+ fieldsIndex = new int[orderColumns.size()];
+ for (int i = 0; i < fieldsIndex.length; i++) {
+ int index = fields.indexOf(orderColumns.get(i));
+ if (index == -1) {
+ throw new IllegalArgumentException(
+ "Can't find column: "
+ + orderColumns.get(i)
+ + " in row type fields: "
+ + fields);
+ }
+ fieldsIndex[i] = index;
+ }
+ this.functionSet = constructFunctionMap(rowType.getFields());
+ this.totalBytes = PRIMITIVE_BUFFER_SIZE * this.fieldsIndex.length;
+ }
+
+ public void open() {
+ this.reuse = ByteBuffer.allocate(totalBytes);
+ functionSet.forEach(RowProcessor::open);
+ }
+
+ public int size() {
+ return totalBytes;
+ }
+
+ public byte[] index(InternalRow row) {
+ byte[][] columnBytes = new byte[fieldsIndex.length][];
+
+ int index = 0;
+ for (RowProcessor f : functionSet) {
+ columnBytes[index++] = f.zvalue(row);
+ }
+
+ return ZOrderByteUtils.interleaveBits(columnBytes, totalBytes, reuse);
+ }
+
+ public Set<RowProcessor> constructFunctionMap(List<DataField> fields) {
+ Set<RowProcessor> zorderFunctionSet = new LinkedHashSet<>();
+ // Construct zorderFunctionSet and fill dataTypes, rowFields
+ for (int fieldIndex = 0; fieldIndex < fieldsIndex.length;
fieldIndex++) {
+ int index = fieldsIndex[fieldIndex];
+ DataField field = fields.get(index);
+ zorderFunctionSet.add(zmapColumnToCalculator(field, index));
+ }
+ return zorderFunctionSet;
+ }
+
+ public static RowProcessor zmapColumnToCalculator(DataField field, int
index) {
+ DataType type = field.type();
+ return type.accept(new TypeVisitor(index));
+ }
+
+ /** Type Visitor to generate function map from row column to z-index. */
+ public static class TypeVisitor implements DataTypeVisitor<RowProcessor> {
+
+ private final int fieldIndex;
+
+ public TypeVisitor(int index) {
+ this.fieldIndex = index;
+ }
+
+ @Override
+ public RowProcessor visit(CharType charType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(charType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.stringToOrderedBytes(
+ o.toString(),
PRIMITIVE_BUFFER_SIZE, reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(VarCharType varCharType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(varCharType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.stringToOrderedBytes(
+ o.toString(),
PRIMITIVE_BUFFER_SIZE, reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(BooleanType booleanType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(booleanType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ if (o == null) {
+ return NULL_BYTES;
+ }
+ ZOrderByteUtils.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ reuse.put(0, (byte) ((boolean) o ? -127 : 0));
+ return reuse.array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(BinaryType binaryType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(binaryType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.byteTruncateOrFill(
+ (byte[]) o,
PRIMITIVE_BUFFER_SIZE, reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(VarBinaryType varBinaryType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(varBinaryType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.byteTruncateOrFill(
+ (byte[]) o,
PRIMITIVE_BUFFER_SIZE, reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(DecimalType decimalType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(decimalType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.byteTruncateOrFill(
+ ((Decimal)
o).toUnscaledBytes(),
+ PRIMITIVE_BUFFER_SIZE,
+ reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(TinyIntType tinyIntType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(tinyIntType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.tinyintToOrderedBytes((byte)
o, reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(SmallIntType smallIntType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(smallIntType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.shortToOrderedBytes((short)
o, reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(IntType intType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(intType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.intToOrderedBytes((int) o,
reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(BigIntType bigIntType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(bigIntType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.longToOrderedBytes((long) o,
reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(FloatType floatType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(floatType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.floatToOrderedBytes((float)
o, reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(DoubleType doubleType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(doubleType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ :
ZOrderByteUtils.doubleToOrderedBytes((double) o, reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(DateType dateType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(dateType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.intToOrderedBytes((int) o,
reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(TimeType timeType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(timeType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.intToOrderedBytes((int) o,
reuse).array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(TimestampType timestampType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(timestampType, fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.longToOrderedBytes(
+ ((Timestamp)
o).getMillisecond(), reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(LocalZonedTimestampType
localZonedTimestampType) {
+ final InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(localZonedTimestampType,
fieldIndex);
+ return new RowProcessor(
+ (row, reuse) -> {
+ Object o = fieldGetter.getFieldOrNull(row);
+ return o == null
+ ? NULL_BYTES
+ : ZOrderByteUtils.longToOrderedBytes(
+ ((Timestamp)
o).getMillisecond(), reuse)
+ .array();
+ });
+ }
+
+ @Override
+ public RowProcessor visit(ArrayType arrayType) {
+ throw new RuntimeException("Unsupported type");
+ }
+
+ @Override
+ public RowProcessor visit(MultisetType multisetType) {
+ throw new RuntimeException("Unsupported type");
+ }
+
+ @Override
+ public RowProcessor visit(MapType mapType) {
+ throw new RuntimeException("Unsupported type");
+ }
+
+ @Override
+ public RowProcessor visit(RowType rowType) {
+ throw new RuntimeException("Unsupported type");
+ }
+ }
+
+ /** Be used as converting row field record to devoted bytes. */
+ public static class RowProcessor implements Serializable {
+
+ private transient ByteBuffer reuse;
+ private final ZProcessFunction process;
+
+ public RowProcessor(ZProcessFunction process) {
+ this.process = process;
+ }
+
+ public void open() {
+ reuse = ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
+ }
+
+ public byte[] zvalue(InternalRow o) {
+ return process.apply(o, reuse);
+ }
+ }
+
+ interface ZProcessFunction extends BiFunction<InternalRow, ByteBuffer,
byte[]>, Serializable {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
new file mode 100644
index 000000000..1ea4194c2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.paimon.utils;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Within Z-Ordering the byte representations of objects being compared must
be ordered, this
+ * requires several types to be transformed when converted to bytes. The goal
is to map object's
+ * whose byte representation are not lexicographically ordered into
representations that are
+ * lexicographically ordered. Bytes produced should be compared
lexicographically as unsigned bytes,
+ * big-endian.
+ *
+ * <p>All types except for String are stored within an 8 Byte Buffer
+ *
+ * <p>Most of these techniques are derived from
+ *
https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/
+ */
+public class ZOrderByteUtils {
+
+ public static final int PRIMITIVE_BUFFER_SIZE = 8;
+ public static final byte[] NULL_BYTES = new byte[PRIMITIVE_BUFFER_SIZE];
+ private static ThreadLocal<CharsetEncoder> encoderThreadLocal = new
ThreadLocal<>();
+
+ static {
+ Arrays.fill(NULL_BYTES, (byte) 0x00);
+ }
+
+ private ZOrderByteUtils() {}
+
+ static ByteBuffer allocatePrimitiveBuffer() {
+ return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
+ }
+
+ /**
+ * Signed ints do not have their bytes in magnitude order because of the
sign bit. To fix this,
+ * flip the sign bit so that all negatives are ordered before positives.
This essentially shifts
+ * the 0 value so that we don't break our ordering when we cross the new 0
value.
+ */
+ public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) {
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ bytes.putLong(((long) val) ^ 0x8000000000000000L);
+ return bytes;
+ }
+
+ /**
+ * Signed longs are treated the same as the signed ints in {@link
#intToOrderedBytes(int,
+ * ByteBuffer)}.
+ */
+ public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) {
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ bytes.putLong(val ^ 0x8000000000000000L);
+ return bytes;
+ }
+
+ /**
+ * Signed shorts are treated the same as the signed ints in {@link
#intToOrderedBytes(int,
+ * ByteBuffer)}.
+ */
+ public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) {
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ bytes.putLong(((long) val) ^ 0x8000000000000000L);
+ return bytes;
+ }
+
+ /**
+ * Signed tiny ints are treated the same as the signed ints in {@link
#intToOrderedBytes(int,
+ * ByteBuffer)}.
+ */
+ public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse)
{
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ bytes.putLong(((long) val) ^ 0x8000000000000000L);
+ return bytes;
+ }
+
+ /**
+ * IEEE 754 : “If two floating-point numbers in the same format are
ordered (say, x {@literal <}
+ * y), they are ordered the same way when their bits are reinterpreted as
sign-magnitude
+ * integers.”
+ *
+ * <p>Which means floats can be treated as sign magnitude integers which
can then be converted
+ * into lexicographically comparable bytes.
+ */
+ public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) {
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ long lval = Double.doubleToLongBits(val);
+ lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
+ bytes.putLong(lval);
+ return bytes;
+ }
+
+ /**
+ * Doubles are treated the same as floats in {@link
#floatToOrderedBytes(float, ByteBuffer)}.
+ */
+ public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer
reuse) {
+ ByteBuffer bytes = reuse(reuse, PRIMITIVE_BUFFER_SIZE);
+ long lval = Double.doubleToLongBits(val);
+ lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
+ bytes.putLong(lval);
+ return bytes;
+ }
+
+ /**
+ * Strings are lexicographically sortable BUT if different byte array
lengths will ruin the
+ * Z-Ordering. (ZOrder requires that a given column contribute the same
number of bytes every
+ * time). This implementation just uses a set size to for all output byte
representations.
+ * Truncating longer strings and right padding 0 for shorter strings.
+ */
+ @SuppressWarnings("ByteBufferBackingArray")
+ public static ByteBuffer stringToOrderedBytes(String val, int length,
ByteBuffer reuse) {
+ CharsetEncoder encoder = encoderThreadLocal.get();
+ if (encoder == null) {
+ encoder = StandardCharsets.UTF_8.newEncoder();
+ encoderThreadLocal.set(encoder);
+ }
+
+ ByteBuffer bytes = reuse(reuse, length);
+ Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
+ if (val != null) {
+ CharBuffer inputBuffer = CharBuffer.wrap(val);
+ encoder.encode(inputBuffer, bytes, true);
+ }
+ return bytes;
+ }
+
+ /**
+ * Return a bytebuffer with the given bytes truncated to length, or filled
with 0's to length
+ * depending on whether the given bytes are larger or smaller than the
given length.
+ */
+ @SuppressWarnings("ByteBufferBackingArray")
+ public static ByteBuffer byteTruncateOrFill(byte[] val, int length,
ByteBuffer reuse) {
+ ByteBuffer bytes = reuse(reuse, length);
+ if (val.length < length) {
+ bytes.put(val, 0, val.length);
+ Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
+ } else {
+ bytes.put(val, 0, length);
+ }
+ return bytes;
+ }
+
+ public static byte[] interleaveBits(byte[][] columnsBinary, int
interleavedSize) {
+ return interleaveBits(columnsBinary, interleavedSize,
ByteBuffer.allocate(interleavedSize));
+ }
+
+ /**
+ * Interleave bits using a naive loop. Variable length inputs are allowed
but to get a
+ * consistent ordering it is required that every column contribute the
same number of bytes in
+ * each invocation. Bits are interleaved from all columns that have a bit
available at that
+ * position. Once a Column has no more bits to produce it is skipped in
the interleaving.
+ *
+ * @param columnsBinary an array of ordered byte representations of the
columns being ZOrdered
+ * @param interleavedSize the number of bytes to use in the output
+ * @return the columnbytes interleaved
+ */
+ // NarrowingCompoundAssignment is intended here. See
+ // https://github.com/apache/iceberg/pull/5200#issuecomment-1176226163
+ @SuppressWarnings({"ByteBufferBackingArray",
"NarrowingCompoundAssignment"})
+ public static byte[] interleaveBits(
+ byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) {
+ byte[] interleavedBytes = reuse.array();
+ Arrays.fill(interleavedBytes, 0, interleavedSize, (byte) 0x00);
+
+ int sourceColumn = 0;
+ int sourceByte = 0;
+ int sourceBit = 7;
+ int interleaveByte = 0;
+ int interleaveBit = 7;
+
+ while (interleaveByte < interleavedSize) {
+ // Take the source bit from source byte and move it to the output
bit position
+ interleavedBytes[interleaveByte] |=
+ (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit)
+ >>> sourceBit
+ << interleaveBit;
+ --interleaveBit;
+
+ // Check if an output byte has been completed
+ if (interleaveBit == -1) {
+ // Move to the next output byte
+ interleaveByte++;
+ // Move to the highest order bit of the new output byte
+ interleaveBit = 7;
+ }
+
+ // Check if the last output byte has been completed
+ if (interleaveByte == interleavedSize) {
+ break;
+ }
+
+ // Find the next source bit to interleave
+ do {
+ // Move to next column
+ ++sourceColumn;
+ if (sourceColumn == columnsBinary.length) {
+ // If the last source column was used, reset to next bit
of first column
+ sourceColumn = 0;
+ --sourceBit;
+ if (sourceBit == -1) {
+ // If the last bit of the source byte was used, reset
to the highest bit of
+ // the next
+ // byte
+ sourceByte++;
+ sourceBit = 7;
+ }
+ }
+ } while (columnsBinary[sourceColumn].length <= sourceByte);
+ }
+ return interleavedBytes;
+ }
+
+ public static ByteBuffer reuse(ByteBuffer reuse, int length) {
+ reuse.position(0);
+ reuse.limit(length);
+ return reuse;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
new file mode 100644
index 000000000..4e9c08993
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/sort/zorder/ZIndexerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.zorder;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ZOrderByteUtils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/** Tests for {@link ZIndexer}. */
+public class ZIndexerTest {
+
+ private static final Random RANDOM = new Random();
+
+ @Test
+ public void testZIndexer() {
+ RowType rowType = RowType.of(new IntType(), new BigIntType());
+
+ ZIndexer zIndexer = new ZIndexer(rowType, Arrays.asList("f0", "f1"));
+ zIndexer.open();
+
+ for (int i = 0; i < 1000; i++) {
+ int a = RANDOM.nextInt();
+ long b = RANDOM.nextLong();
+
+ InternalRow internalRow = GenericRow.of(a, b);
+
+ byte[] zOrder = zIndexer.index(internalRow);
+
+ byte[][] zCache = new byte[2][];
+ ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+ ZOrderByteUtils.intToOrderedBytes(a, byteBuffer);
+ zCache[0] = Arrays.copyOf(byteBuffer.array(), 8);
+
+ ZOrderByteUtils.longToOrderedBytes(b, byteBuffer);
+ zCache[1] = Arrays.copyOf(byteBuffer.array(), 8);
+
+ byte[] expectedZOrder = ZOrderByteUtils.interleaveBits(zCache, 16);
+
+ for (int j = 0; j < 16; j++) {
+ Assertions.assertThat(zOrder[j]).isEqualTo(expectedZOrder[j]);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
new file mode 100644
index 000000000..de1bf7709
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.paimon.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.shaded.com.google.common.primitives.UnsignedBytes;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+/** Tests for {@link ZOrderByteUtils}. */
+public class TestZOrderByteUtil {
+ private static final byte IIIIIIII = (byte) 255;
+ private static final byte IOIOIOIO = (byte) 170;
+ private static final byte OIOIOIOI = (byte) 85;
+ private static final byte OOOOIIII = (byte) 15;
+ private static final byte OOOOOOOI = (byte) 1;
+ private static final byte OOOOOOOO = (byte) 0;
+
+ private static final int NUM_TESTS = 100000;
+ private static final int NUM_INTERLEAVE_TESTS = 1000;
+
+ private final Random random = new Random(42);
+
+ private String bytesToString(byte[] bytes) {
+ StringBuilder result = new StringBuilder();
+ for (byte b : bytes) {
+ result.append(String.format("%8s", Integer.toBinaryString(b &
0xFF)).replace(' ', '0'));
+ }
+ return result.toString();
+ }
+
+ /** Returns a non-0 length byte array. */
+ private byte[] generateRandomBytes() {
+ int length = Math.abs(random.nextInt(100) + 1);
+ return generateRandomBytes(length);
+ }
+
+ /** Returns a byte array of a specified length. */
+ private byte[] generateRandomBytes(int length) {
+ byte[] result = new byte[length];
+ random.nextBytes(result);
+ return result;
+ }
+
+ /** Test method to ensure correctness of byte interleaving code. */
+ private String interleaveStrings(String[] strings) {
+ StringBuilder result = new StringBuilder();
+ int totalLength =
Arrays.stream(strings).mapToInt(String::length).sum();
+ int substringIndex = 0;
+ int characterIndex = 0;
+ while (characterIndex < totalLength) {
+ for (String str : strings) {
+ if (substringIndex < str.length()) {
+ result.append(str.charAt(substringIndex));
+ characterIndex++;
+ }
+ }
+ substringIndex++;
+ }
+ return result.toString();
+ }
+
+ /**
+ * Compares the result of a string based interleaving algorithm
implemented above versus the
+ * binary bit-shifting algorithm used in ZOrderByteUtils. Either both
algorithms are identically
+ * wrong or are both identically correct.
+ */
+ @Test
+ public void testInterleaveRandomExamples() {
+ for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) {
+ int numByteArrays = Math.abs(random.nextInt(6)) + 1;
+ byte[][] testBytes = new byte[numByteArrays][];
+ String[] testStrings = new String[numByteArrays];
+ for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) {
+ testBytes[byteIndex] = generateRandomBytes();
+ testStrings[byteIndex] = bytesToString(testBytes[byteIndex]);
+ }
+
+ int zOrderSize = Arrays.stream(testBytes).mapToInt(column ->
column.length).sum();
+ byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes,
zOrderSize);
+ String byteResultAsString = bytesToString(byteResult);
+
+ String stringResult = interleaveStrings(testStrings);
+
+ Assert.assertEquals(
+ "String interleave didn't match byte interleave",
+ stringResult,
+ byteResultAsString);
+ }
+ }
+
+ @Test
+ public void testReuseInterleaveBuffer() {
+ int numByteArrays = 2;
+ int colLength = 16;
+ ByteBuffer interleaveBuffer = ByteBuffer.allocate(numByteArrays *
colLength);
+ for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) {
+ byte[][] testBytes = new byte[numByteArrays][];
+ String[] testStrings = new String[numByteArrays];
+ for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) {
+ testBytes[byteIndex] = generateRandomBytes(colLength);
+ testStrings[byteIndex] = bytesToString(testBytes[byteIndex]);
+ }
+
+ byte[] byteResult =
+ ZOrderByteUtils.interleaveBits(
+ testBytes, numByteArrays * colLength,
interleaveBuffer);
+ String byteResultAsString = bytesToString(byteResult);
+
+ String stringResult = interleaveStrings(testStrings);
+
+ Assert.assertEquals(
+ "String interleave didn't match byte interleave",
+ stringResult,
+ byteResultAsString);
+ }
+ }
+
+ @Test
+ public void testInterleaveEmptyBits() {
+ byte[][] test = new byte[4][10];
+ byte[] expected = new byte[40];
+
+ Assert.assertArrayEquals(
+ "Should combine empty arrays", expected,
ZOrderByteUtils.interleaveBits(test, 40));
+ }
+
+ @Test
+ public void testInterleaveFullBits() {
+ byte[][] test = new byte[4][];
+ test[0] = new byte[] {IIIIIIII, IIIIIIII};
+ test[1] = new byte[] {IIIIIIII};
+ test[2] = new byte[0];
+ test[3] = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII};
+ byte[] expected = new byte[] {IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII,
IIIIIIII, IIIIIIII};
+
+ Assert.assertArrayEquals(
+ "Should combine full arrays", expected,
ZOrderByteUtils.interleaveBits(test, 6));
+ }
+
+ @Test
+ public void testInterleaveMixedBits() {
+ byte[][] test = new byte[4][];
+ test[0] = new byte[] {OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII};
+ test[1] = new byte[] {OOOOOOOI, OOOOOOOO, IIIIIIII};
+ test[2] = new byte[] {OOOOOOOI};
+ test[3] = new byte[] {OOOOOOOI};
+ byte[] expected =
+ new byte[] {
+ OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, IOIOIOIO,
IOIOIOIO, OIOIOIOI, OIOIOIOI,
+ OOOOIIII
+ };
+ Assert.assertArrayEquals(
+ "Should combine mixed byte arrays",
+ expected,
+ ZOrderByteUtils.interleaveBits(test, 9));
+ }
+
+ @Test
+ public void testIntOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ int aInt = random.nextInt();
+ int bInt = random.nextInt();
+ int intCompare = Integer.signum(Integer.compare(aInt, bInt));
+ byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of ints should match ordering of bytes,
%s ~ %s -> %s != %s ~ %s -> %s ",
+ aInt,
+ bInt,
+ intCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ intCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testLongOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ long aLong = random.nextInt();
+ long bLong = random.nextInt();
+ int longCompare = Integer.signum(Long.compare(aLong, bLong));
+ byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of longs should match ordering of bytes,
%s ~ %s -> %s != %s ~ %s -> %s ",
+ aLong,
+ bLong,
+ longCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ longCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testShortOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
+ short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1));
+ int longCompare = Integer.signum(Long.compare(aShort, bShort));
+ byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of longs should match ordering of bytes,
%s ~ %s -> %s != %s ~ %s -> %s ",
+ aShort,
+ bShort,
+ longCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ longCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testTinyOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
+ byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1));
+ int longCompare = Integer.signum(Long.compare(aByte, bByte));
+ byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of longs should match ordering of bytes,
%s ~ %s -> %s != %s ~ %s -> %s ",
+ aByte,
+ bByte,
+ longCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ longCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testFloatOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ float aFloat = random.nextFloat();
+ float bFloat = random.nextFloat();
+ int floatCompare = Integer.signum(Float.compare(aFloat, bFloat));
+ byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of floats should match ordering of
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+ aFloat,
+ bFloat,
+ floatCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ floatCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testDoubleOrdering() {
+ ByteBuffer aBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ ByteBuffer bBuffer = ZOrderByteUtils.allocatePrimitiveBuffer();
+ for (int i = 0; i < NUM_TESTS; i++) {
+ double aDouble = random.nextDouble();
+ double bDouble = random.nextDouble();
+ int doubleCompare = Integer.signum(Double.compare(aDouble,
bDouble));
+ byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of doubles should match ordering of
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+ aDouble,
+ bDouble,
+ doubleCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ doubleCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testStringOrdering() {
+ ByteBuffer aBuffer = ByteBuffer.allocate(128);
+ ByteBuffer bBuffer = ByteBuffer.allocate(128);
+ for (int i = 0; i < NUM_TESTS; i++) {
+ String aString = randomString();
+ String bString = randomString();
+ int stringCompare = Integer.signum(aString.compareTo(bString));
+ byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of strings should match ordering of
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+ aString,
+ bString,
+ stringCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ stringCompare,
+ byteCompare);
+ }
+ }
+
+ @Test
+ public void testByteTruncateOrFill() {
+ ByteBuffer aBuffer = ByteBuffer.allocate(128);
+ ByteBuffer bBuffer = ByteBuffer.allocate(128);
+ for (int i = 0; i < NUM_TESTS; i++) {
+ byte[] aBytesRaw = randomBytes();
+ byte[] bBytesRaw = randomBytes();
+ int stringCompare =
+ Integer.signum(
+ UnsignedBytes.lexicographicalComparator()
+ .compare(aBytesRaw, bBytesRaw));
+ byte[] aBytes = ZOrderByteUtils.byteTruncateOrFill(aBytesRaw, 128,
aBuffer).array();
+ byte[] bBytes = ZOrderByteUtils.byteTruncateOrFill(bBytesRaw, 128,
bBuffer).array();
+ int byteCompare =
+ Integer.signum(
+
UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes));
+
+ Assert.assertEquals(
+ String.format(
+ "Ordering of strings should match ordering of
bytes, %s ~ %s -> %s != %s ~ %s -> %s ",
+ aBytesRaw,
+ bBytesRaw,
+ stringCompare,
+ Arrays.toString(aBytes),
+ Arrays.toString(bBytes),
+ byteCompare),
+ stringCompare,
+ byteCompare);
+ }
+ }
+
+ private byte[] randomBytes() {
+ byte[] binary = new byte[random.nextInt(50)];
+ random.nextBytes(binary);
+ return binary;
+ }
+
+ private String randomString() {
+ int length = random.nextInt(50);
+ byte[] buffer = new byte[length];
+
+ for (int i = 0; i < length; i += 1) {
+ buffer[i] = (byte) ('a' + random.nextInt(26));
+ }
+
+ return new String(buffer);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 6242bec27..0d4d25354 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -21,16 +21,24 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.types.logical.LogicalType;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
/** Abstract base of {@link Action} for table. */
public abstract class ActionBase implements Action {
@@ -40,6 +48,8 @@ public abstract class ActionBase implements Action {
protected final Catalog catalog;
protected final FlinkCatalog flinkCatalog;
protected final String catalogName = "paimon-" + UUID.randomUUID();
+ protected final StreamExecutionEnvironment env;
+ protected final StreamTableEnvironment batchTEnv;
public ActionBase(String warehouse, Map<String, String> catalogConfig) {
catalogOptions = Options.fromMap(catalogConfig);
@@ -47,6 +57,12 @@ public abstract class ActionBase implements Action {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog,
catalogOptions);
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ batchTEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+
+ // register flink catalog to table environment
+ batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
+ batchTEnv.useCatalog(flinkCatalog.getName());
}
protected void execute(StreamExecutionEnvironment env, String defaultName)
throws Exception {
@@ -60,4 +76,34 @@ public abstract class ActionBase implements Action {
Options catalogOptions = this.catalogOptions;
return () -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
}
+
+ /**
+ * Extract {@link LogicalType}s from Flink {@link
org.apache.flink.table.types.DataType}s and
+ * convert to Paimon {@link DataType}s.
+ */
+ protected List<DataType> toPaimonTypes(
+ List<org.apache.flink.table.types.DataType> flinkDataTypes) {
+ return flinkDataTypes.stream()
+ .map(org.apache.flink.table.types.DataType::getLogicalType)
+ .map(LogicalTypeConversion::toDataType)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Check whether each {@link DataType} of actualTypes is compatible with
that of expectedTypes
+ * respectively.
+ */
+ protected boolean compatibleCheck(List<DataType> actualTypes,
List<DataType> expectedTypes) {
+ if (actualTypes.size() != expectedTypes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < actualTypes.size(); i++) {
+ if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i),
expectedTypes.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 27c5c9046..9d0b99484 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -119,4 +119,8 @@ public class CompactAction extends TableActionBase {
build(env);
execute(env, "Compact job");
}
+
+ public List<Map<String, String>> getPartitions() {
+ return partitions;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index 3af89e8bf..b74bd8b2f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -41,8 +42,29 @@ public class CompactActionFactory implements ActionFactory {
Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
- CompactAction action =
- new CompactAction(tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig);
+ CompactAction action;
+ if (params.has("order-strategy")) {
+ SortCompactAction sortCompactAction =
+ new SortCompactAction(tablePath.f0, tablePath.f1,
tablePath.f2, catalogConfig);
+
+ String strategy = params.get("order-strategy");
+ sortCompactAction.withOrderStrategy(strategy);
+
+ if (params.has("order-by")) {
+ String sqlOrderBy = params.get("order-by");
+ if (sqlOrderBy == null) {
+ throw new IllegalArgumentException("Please specify
\"order-by\".");
+ }
+
sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(",")));
+ } else {
+ throw new IllegalArgumentException(
+ "Please specify order columns in parameter
--order-by.");
+ }
+
+ action = sortCompactAction;
+ } else {
+ action = new CompactAction(tablePath.f0, tablePath.f1,
tablePath.f2, catalogConfig);
+ }
if (params.has("partition")) {
List<Map<String, String>> partitions = getPartitions(params);
@@ -61,7 +83,9 @@ public class CompactActionFactory implements ActionFactory {
System.out.println("Syntax:");
System.out.println(
" compact --warehouse <warehouse-path> --database
<database-name> "
- + "--table <table-name> [--partition
<partition-name>]");
+ + "--table <table-name> [--partition <partition-name>]"
+ + "[--order-strategy <order-strategy>]"
+ + "[--order-by <order-columns>]");
System.out.println(
" compact --warehouse s3://path/to/warehouse --database
<database-name> "
+ "--table <table-name> [--catalog-conf
<paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]");
@@ -70,6 +94,12 @@ public class CompactActionFactory implements ActionFactory {
System.out.println("Partition name syntax:");
System.out.println(" key1=value1,key2=value2,...");
+
+ System.out.println();
+ System.out.println("Note:");
+ System.out.println(
+ " order compact now only support append-only table with
bucket=-1, please don't specify --order-strategy parameter if your table does
not meet the request");
+ System.out.println(" order-strategy now only support zorder in batch
mode");
System.out.println();
System.out.println("Examples:");
@@ -84,6 +114,8 @@ public class CompactActionFactory implements ActionFactory {
" compact --warehouse s3:///path/to/warehouse "
+ "--database test_db "
+ "--table test_table "
+ + "--order-strategy zorder "
+ + "--order-by a,b,c "
+ "--catalog-conf s3.endpoint=https://****.com "
+ "--catalog-conf s3.access-key=***** "
+ "--catalog-conf s3.secret-key=***** ");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
new file mode 100644
index 000000000..ff5e1a90d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.FlinkSinkBuilder;
+import org.apache.paimon.flink.sorter.TableSorter;
+import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compact with sort action. */
+public class SortCompactAction extends CompactAction {
+
+ private String sortStrategy;
+ private List<String> orderColumns;
+
+ public SortCompactAction(
+ String warehouse,
+ String database,
+ String tableName,
+ Map<String, String> catalogConfig) {
+ super(warehouse, database, tableName, catalogConfig);
+
+ checkArgument(
+ table instanceof AppendOnlyFileStoreTable,
+ "Only sort compaction works with append-only table for now.");
+ table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "true"));
+ }
+
+ @Override
+ public void run() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ build(env);
+ execute(env, "Sort Compact Job");
+ }
+
+ public void build(StreamExecutionEnvironment env) {
+ // only support batch sort yet
+ if (env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+ != RuntimeExecutionMode.BATCH) {
+ throw new IllegalArgumentException(
+ "Only support batch mode yet, please set
-Dexecution.runtime-mode=BATCH");
+ }
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ if (!(fileStoreTable instanceof AppendOnlyFileStoreTable)) {
+ throw new IllegalArgumentException("Sort Compact only supports
append-only table yet");
+ }
+ if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
+ throw new IllegalArgumentException("Sort Compact only supports
bucket=-1 yet.");
+ }
+ Map<String, String> tableConfig = fileStoreTable.options();
+ FlinkSourceBuilder sourceBuilder =
+ new FlinkSourceBuilder(
+ ObjectIdentifier.of(
+ catalogName,
+ identifier.getDatabaseName(),
+ identifier.getObjectName()),
+ fileStoreTable);
+
+ if (getPartitions() != null) {
+ Predicate partitionPredicate =
+ PredicateBuilder.or(
+ getPartitions().stream()
+ .map(p -> PredicateBuilder.partition(p,
table.rowType()))
+ .toArray(Predicate[]::new));
+ sourceBuilder.withPredicate(partitionPredicate);
+ }
+
+ String scanParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ if (scanParallelism != null) {
+ sourceBuilder.withParallelism(Integer.parseInt(scanParallelism));
+ }
+
+ DataStream<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(false).build();
+ TableSorter sorter =
+ TableSorter.getSorter(env, source, fileStoreTable,
sortStrategy, orderColumns);
+ DataStream<RowData> sorted = sorter.sort();
+
+ FlinkSinkBuilder flinkSinkBuilder = new
FlinkSinkBuilder(fileStoreTable);
+ flinkSinkBuilder.withInput(sorted).withOverwritePartition(new
HashMap<>());
+ String sinkParallelism =
tableConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ if (sinkParallelism != null) {
+
flinkSinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+ }
+
+ flinkSinkBuilder.build();
+ }
+
+ public void withOrderStrategy(String sortStrategy) {
+ this.sortStrategy = sortStrategy;
+ }
+
+ public void withOrderColumns(List<String> orderColumns) {
+ this.orderColumns = orderColumns;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 84114d1c6..3b8147a3e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -21,22 +21,15 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.utils.TableEnvironmentUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypeCasts;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,18 +37,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/** Abstract base of {@link Action} for table. */
public abstract class TableActionBase extends ActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(TableActionBase.class);
- protected final StreamExecutionEnvironment env;
- protected final StreamTableEnvironment batchTEnv;
- protected final Identifier identifier;
-
protected Table table;
+ protected final Identifier identifier;
TableActionBase(
String warehouse,
@@ -63,12 +52,6 @@ public abstract class TableActionBase extends ActionBase {
String tableName,
Map<String, String> catalogConfig) {
super(warehouse, catalogConfig);
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- batchTEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
-
- // register flink catalog to table environment
- batchTEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
- batchTEnv.useCatalog(flinkCatalog.getName());
identifier = new Identifier(databaseName, tableName);
try {
table = catalog.getTable(identifier);
@@ -79,36 +62,6 @@ public abstract class TableActionBase extends ActionBase {
}
}
- /**
- * Extract {@link LogicalType}s from Flink {@link
org.apache.flink.table.types.DataType}s and
- * convert to Paimon {@link DataType}s.
- */
- protected List<DataType> toPaimonTypes(
- List<org.apache.flink.table.types.DataType> flinkDataTypes) {
- return flinkDataTypes.stream()
- .map(org.apache.flink.table.types.DataType::getLogicalType)
- .map(LogicalTypeConversion::toDataType)
- .collect(Collectors.toList());
- }
-
- /**
- * Check whether each {@link DataType} of actualTypes is compatible with
that of expectedTypes
- * respectively.
- */
- protected boolean compatibleCheck(List<DataType> actualTypes,
List<DataType> expectedTypes) {
- if (actualTypes.size() != expectedTypes.size()) {
- return false;
- }
-
- for (int i = 0; i < actualTypes.size(); i++) {
- if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i),
expectedTypes.get(i))) {
- return false;
- }
- }
-
- return true;
- }
-
/** Sink {@link DataStream} dataStream to table with Flink Table API in
batch environment. */
protected void batchSink(DataStream<RowData> dataStream) {
List<Transformation<?>> transformations =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
new file mode 100644
index 000000000..d07615e93
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -0,0 +1,448 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.shuffle;
+
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+/**
+ * RangeShuffle Util to shuffle the input stream by the sampling range. See
`rangeShuffleBykey`
+ * method how to build the topo.
+ */
+public class RangeShuffle {
+
+ /**
+ * The RelNode with range-partition distribution will create the following
transformations.
+ *
+ * <p>Explanation of the following figure: "[LSample, n]" means operator
is LSample and
+ * parallelism is n, "LSample" means LocalSampleOperator, "GSample" means
GlobalSampleOperator,
+ * "ARange" means AssignRangeId, "RRange" means RemoveRangeId.
+ *
+ * <pre>{@code
+ * [IN,n]->[LSample,n]->[GSample,1]-BROADCAST
+ * \ \
+ *
-----------------------------BATCH-[ARange,n]-PARTITION->[RRange,m]->
+ * }</pre>
+ *
+ * <p>The streams except the sample and histogram process stream will been
blocked, so the the
+ * sample and histogram process stream does not care about
requiredExchangeMode.
+ */
+ public static <T> DataStream<Pair<T, RowData>> rangeShuffleByKey(
+ DataStream<Pair<T, RowData>> inputDataStream,
+ Comparator<T> keyComparator,
+ Class<T> keyClass,
+ int sampleSize,
+ int rangeNum) {
+ Transformation<Pair<T, RowData>> input =
inputDataStream.getTransformation();
+
+ OneInputTransformation<Pair<T, RowData>, T> keyInput =
+ new OneInputTransformation<>(
+ input,
+ "ABSTRACT KEY",
+ new StreamMap<>(Pair::getLeft),
+ TypeInformation.of(keyClass),
+ input.getParallelism());
+
+ // 1. Fixed size sample in each partitions.
+ OneInputTransformation<T, Tuple2<Double, T>> localSample =
+ new OneInputTransformation<>(
+ keyInput,
+ "LOCAL SAMPLE",
+ new LocalSampleOperator<>(sampleSize),
+ new TupleTypeInfo<>(
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
TypeInformation.of(keyClass)),
+ keyInput.getParallelism());
+
+ // 2. Collect all the samples and gather them into a sorted key range.
+ OneInputTransformation<Tuple2<Double, T>, List<T>> sampleAndHistogram =
+ new OneInputTransformation<>(
+ localSample,
+ "GLOBAL SAMPLE",
+ new GlobalSampleOperator<>(sampleSize, keyComparator,
rangeNum),
+ new ListTypeInfo<>(TypeInformation.of(keyClass)),
+ 1);
+
+ // 3. Take range boundaries as broadcast input and take the tuple of
partition id and
+ // record as output.
+ // The shuffle mode of input edge must be BATCH to avoid dead lock. See
+ // DeadlockBreakupProcessor.
+ TwoInputTransformation<List<T>, Pair<T, RowData>, Tuple2<Integer,
Pair<T, RowData>>>
+ preparePartition =
+ new TwoInputTransformation<>(
+ new PartitionTransformation<>(
+ sampleAndHistogram,
+ new BroadcastPartitioner<>(),
+ StreamExchangeMode.BATCH),
+ new PartitionTransformation<>(
+ input,
+ new ForwardPartitioner<>(),
+ StreamExchangeMode.BATCH),
+ "ASSIGN RANGE INDEX",
+ new AssignRangeIndexOperator<>(keyComparator),
+ new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
input.getOutputType()),
+ input.getParallelism());
+
+ // 4. Remove the partition id. (shuffle according range partition)
+ return new DataStream<>(
+ inputDataStream.getExecutionEnvironment(),
+ new OneInputTransformation<>(
+ new PartitionTransformation<>(
+ preparePartition,
+ new CustomPartitionerWrapper<>(
+ new
AssignRangeIndexOperator.RangePartitioner(rangeNum),
+ new
AssignRangeIndexOperator.Tuple2KeySelector<>()),
+ StreamExchangeMode.BATCH),
+ "REMOVE KEY",
+ new RemoveRangeIndexOperator<>(),
+ input.getOutputType(),
+ input.getParallelism()));
+ }
+
+ /**
+ * LocalSampleOperator wraps the sample logic on the partition side (the
first phase of
+ * distributed sample algorithm). Outputs sampled weight with record.
+ *
+ * <p>See {@link Sampler}.
+ */
+ @Internal
+ public static class LocalSampleOperator<T> extends
TableStreamOperator<Tuple2<Double, T>>
+ implements OneInputStreamOperator<T, Tuple2<Double, T>>,
BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int numSample;
+
+ private transient Collector<Tuple2<Double, T>> collector;
+ private transient Sampler<T> sampler;
+
+ public LocalSampleOperator(int numSample) {
+ this.numSample = numSample;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.collector = new StreamRecordCollector<>(output);
+ sampler = new Sampler<>(numSample, System.nanoTime());
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> streamRecord) throws
Exception {
+ sampler.collect(streamRecord.getValue());
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ Iterator<Tuple2<Double, T>> sampled = sampler.sample();
+ while (sampled.hasNext()) {
+ collector.collect(sampled.next());
+ }
+ }
+ }
+
+ /**
+ * Global sample for range partition. Inputs weight with record. Outputs
list of sampled record.
+ *
+ * <p>See {@link Sampler}.
+ */
+ @Internal
+ public static class GlobalSampleOperator<T> extends
TableStreamOperator<List<T>>
+ implements OneInputStreamOperator<Tuple2<Double, T>, List<T>>,
BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int numSample;
+ private final int rangesNum;
+ private final Comparator<T> keyComparator;
+
+ private transient Collector<List<T>> collector;
+ private transient Sampler<T> sampler;
+
+ public GlobalSampleOperator(int numSample, Comparator<T> comparator,
int rangesNum) {
+ this.numSample = numSample;
+ this.keyComparator = comparator;
+ this.rangesNum = rangesNum;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ //noinspection unchecked
+ this.sampler = new Sampler<>(numSample, 0L);
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Double, T>> record)
throws Exception {
+ Tuple2<Double, T> tuple = record.getValue();
+ sampler.collect(tuple.f0, tuple.f1);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ Iterator<Tuple2<Double, T>> sampled = sampler.sample();
+
+ List<T> sampledData = new ArrayList<>();
+ while (sampled.hasNext()) {
+ sampledData.add(sampled.next().f1);
+ }
+
+ sampledData.sort(keyComparator);
+
+ int boundarySize = rangesNum - 1;
+ T[] boundaries = (T[]) new Object[boundarySize];
+ if (sampledData.size() > 0) {
+ double avgRange = sampledData.size() / (double) rangesNum;
+ for (int i = 1; i < rangesNum; i++) {
+ T record = sampledData.get((int) (i * avgRange));
+ boundaries[i - 1] = record;
+ }
+ }
+
+ collector.collect(Arrays.asList(boundaries));
+ }
+ }
+
+ /**
+ * This two-input-operator require a input with RangeBoundaries as
broadcast input, and generate
+ * Tuple2 which includes range index and record from the other input
itself as output.
+ */
+ @Internal
+ public static class AssignRangeIndexOperator<T>
+ extends TableStreamOperator<Tuple2<Integer, Pair<T, RowData>>>
+ implements TwoInputStreamOperator<
+ List<T>, Pair<T, RowData>, Tuple2<Integer, Pair<T,
RowData>>>,
+ InputSelectable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient List<T> boundaries;
+ private transient Collector<Tuple2<Integer, Pair<T, RowData>>>
collector;
+
+ private final Comparator<T> keyComparator;
+
+ public AssignRangeIndexOperator(Comparator<T> comparator) {
+ this.keyComparator = comparator;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<List<T>> streamRecord) throws
Exception {
+ this.boundaries = streamRecord.getValue();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Pair<T, RowData>>
streamRecord) throws Exception {
+ if (boundaries == null) {
+ throw new RuntimeException("There should be one data from the
first input.");
+ }
+ Pair<T, RowData> row = streamRecord.getValue();
+ collector.collect(new Tuple2<>(binarySearch(row.getLeft()), row));
+ }
+
+ @Override
+ public InputSelection nextSelection() {
+ return boundaries == null ? InputSelection.FIRST :
InputSelection.ALL;
+ }
+
+ private int binarySearch(T key) {
+ int low = 0;
+ int high = this.boundaries.size() - 1;
+
+ while (low <= high) {
+ final int mid = (low + high) >>> 1;
+ final int result = keyComparator.compare(key,
this.boundaries.get(mid));
+
+ if (result > 0) {
+ low = mid + 1;
+ } else if (result < 0) {
+ high = mid - 1;
+ } else {
+ return mid;
+ }
+ }
+ // key not found, but the low index is the target
+ // bucket, since the boundaries are the upper bound
+ return low;
+ }
+
+ /** A {@link KeySelector} to select by f0 of tuple2. */
+ public static class Tuple2KeySelector<T>
+ implements KeySelector<Tuple2<Integer, Pair<T, RowData>>,
Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Tuple2<Integer, Pair<T, RowData>> tuple2)
throws Exception {
+ return tuple2.f0;
+ }
+ }
+
+ /** A {@link Partitioner} to partition by id with range. */
+ public static class RangePartitioner implements Partitioner<Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int totalRangeNum;
+
+ public RangePartitioner(int totalRangeNum) {
+ this.totalRangeNum = totalRangeNum;
+ }
+
+ @Override
+ public int partition(Integer key, int numPartitions) {
+ Preconditions.checkArgument(
+ numPartitions < totalRangeNum,
+ "Num of subPartitions should < totalRangeNum: " +
totalRangeNum);
+ int partition = key / (totalRangeNum / numPartitions);
+ return Math.min(numPartitions - 1, partition);
+ }
+ }
+ }
+
+ /** Remove the range index and return the actual record. */
+ @Internal
+ public static class RemoveRangeIndexOperator<T> extends
TableStreamOperator<Pair<T, RowData>>
+ implements OneInputStreamOperator<Tuple2<Integer, Pair<T,
RowData>>, Pair<T, RowData>> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Collector<Pair<T, RowData>> collector;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ this.collector = new StreamRecordCollector<>(output);
+ }
+
+ @Override
+ public void processElement(StreamRecord<Tuple2<Integer, Pair<T,
RowData>>> streamRecord)
+ throws Exception {
+ collector.collect(streamRecord.getValue().f1);
+ }
+ }
+
+ /**
+ * A simple in memory implementation Sampling, and with only one pass
through the input
+ * iteration whose size is unpredictable. The basic idea behind this
sampler implementation is
+ * to generate a random number for each input element as its weight,
select the top K elements
+ * with max weight. As the weights are generated randomly, so are the
selected top K elements.
+ * In the first phase, we generate random numbers as the weights for each
element and select top
+ * K elements as the output of each partitions. In the second phase, we
select top K elements
+ * from all the outputs of the first phase.
+ *
+ * <p>This implementation refers to the algorithm described in <a
+ * href="researcher.ibm.com/files/us-dpwoodru/tw11.pdf">"Optimal Random
Sampling from
+ * Distributed Streams Revisited"</a>.
+ */
+ @Internal
+ public static class Sampler<T> {
+
+ private final int numSamples;
+ private final Random random;
+ private final PriorityQueue<Tuple2<Double, T>> queue;
+
+ private int index = 0;
+ private Tuple2<Double, T> smallest = null;
+
+ /**
+ * Create a new sampler with reservoir size and a supplied random
number generator.
+ *
+ * @param numSamples Maximum number of samples to retain in reservoir,
must be non-negative.
+ */
+ Sampler(int numSamples, long seed) {
+ Preconditions.checkArgument(numSamples >= 0, "numSamples should be
non-negative.");
+ this.numSamples = numSamples;
+ this.random = new XORShiftRandom(seed);
+ this.queue = new PriorityQueue<>(numSamples,
Comparator.comparingDouble(o -> o.f0));
+ }
+
+ void collect(T rowData) {
+ collect(random.nextDouble(), rowData);
+ }
+
+ void collect(double weight, T key) {
+ if (index < numSamples) {
+ // Fill the queue with first K elements from input.
+ addQueue(weight, key);
+ } else {
+ // Remove the element with the smallest weight,
+ // and append current element into the queue.
+ if (weight > smallest.f0) {
+ queue.remove();
+ addQueue(weight, key);
+ }
+ }
+ index++;
+ }
+
+ private void addQueue(double weight, T row) {
+ queue.add(new Tuple2<>(weight, row));
+ smallest = queue.peek();
+ }
+
+ Iterator<Tuple2<Double, T>> sample() {
+ return queue.iterator();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
new file mode 100644
index 000000000..d51cf1833
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.NormalizedKeyComputer;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.disk.IOManagerImpl.splitPaths;
+
+/** SortOperator to sort the `InternalRow`s by the `KeyType`. */
+public class SortOperator extends TableStreamOperator<InternalRow>
+ implements OneInputStreamOperator<InternalRow, InternalRow>,
BoundedOneInput {
+
+ private final RowType keyRowType;
+ private final RowType valueRowType;
+ private final long maxMemory;
+ private final int pageSize;
+ private final int arity;
+ private transient BinaryExternalSortBuffer buffer;
+
+ public SortOperator(RowType keyType, RowType valueRowType, long maxMemory,
int pageSize) {
+ this.keyRowType = keyType;
+ this.valueRowType = valueRowType;
+ this.maxMemory = maxMemory;
+ this.pageSize = pageSize;
+ this.arity = keyType.getFieldCount() + valueRowType.getFieldCount();
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ List<DataField> keyFields = keyRowType.getFields();
+ List<DataField> dataFields = valueRowType.getFields();
+
+ List<DataField> fields = new ArrayList<>();
+ fields.addAll(keyFields);
+ fields.addAll(dataFields);
+
+ RowType rowType = new RowType(fields);
+
+ InternalRowSerializer serializer = InternalSerializers.create(rowType);
+ NormalizedKeyComputer normalizedKeyComputer =
+ CodeGenUtils.newNormalizedKeyComputer(
+ rowType.getFieldTypes(), "MemTableKeyComputer");
+ RecordComparator keyComparator =
+ CodeGenUtils.newRecordComparator(rowType.getFieldTypes(),
"MemTableComparator");
+
+ MemorySegmentPool memoryPool = new HeapMemorySegmentPool(maxMemory,
pageSize);
+
+ BinaryInMemorySortBuffer inMemorySortBuffer =
+ BinaryInMemorySortBuffer.createBuffer(
+ normalizedKeyComputer, serializer, keyComparator,
memoryPool);
+
+ Configuration jobConfig = getContainingTask().getJobConfiguration();
+
+ buffer =
+ new BinaryExternalSortBuffer(
+ new BinaryRowSerializer(serializer.getArity()),
+ keyComparator,
+ memoryPool.pageSize(),
+ inMemorySortBuffer,
+ new
IOManagerImpl(splitPaths(jobConfig.get(CoreOptions.TMP_DIRS))),
+ jobConfig.getInteger(
+
ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES));
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ if (buffer.size() > 0) {
+ MutableObjectIterator<BinaryRow> iterator =
buffer.sortedIterator();
+ BinaryRow binaryRow = new BinaryRow(arity);
+ while ((binaryRow = iterator.next(binaryRow)) != null) {
+ output.collect(new StreamRecord<>(binaryRow));
+ }
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<InternalRow> element) throws
Exception {
+ buffer.write(element.getValue());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
new file mode 100644
index 000000000..1cf3572bc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.action.SortCompactAction;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** An abstract TableSorter for {@link SortCompactAction}. */
+public abstract class TableSorter {
+
+ protected final StreamExecutionEnvironment batchTEnv;
+ protected final DataStream<RowData> origin;
+ protected final FileStoreTable table;
+ protected final List<String> orderColNames;
+
+ public TableSorter(
+ StreamExecutionEnvironment batchTEnv,
+ DataStream<RowData> origin,
+ FileStoreTable table,
+ List<String> orderColNames) {
+ this.batchTEnv = batchTEnv;
+ this.origin = origin;
+ this.table = table;
+ this.orderColNames = orderColNames;
+ checkColNames();
+ }
+
+ private void checkColNames() {
+ if (orderColNames.size() < 1) {
+ throw new IllegalArgumentException("order column names should not
be empty.");
+ }
+ List<String> columnNames = table.rowType().getFieldNames();
+ for (String zColumn : orderColNames) {
+ if (!columnNames.contains(zColumn)) {
+ throw new RuntimeException(
+ "Can't find column "
+ + zColumn
+ + " in table columns. Possible columns are ["
+ + columnNames.stream().reduce((a, b) -> a +
"," + b).get()
+ + "]");
+ }
+ }
+ }
+
+ public abstract DataStream<RowData> sort();
+
+ public static TableSorter getSorter(
+ StreamExecutionEnvironment batchTEnv,
+ DataStream<RowData> origin,
+ FileStoreTable fileStoreTable,
+ String sortStrategy,
+ List<String> orderColumns) {
+ switch (OrderType.of(sortStrategy)) {
+ case ORDER:
+ // todo support alphabetical order
+ throw new IllegalArgumentException("Not supported yet.");
+ case ZORDER:
+ return new ZorderSorter(batchTEnv, origin, fileStoreTable,
orderColumns);
+ case HILBERT:
+ // todo support hilbert curve
+ throw new IllegalArgumentException("Not supported yet.");
+ default:
+ throw new IllegalArgumentException("cannot match order type: "
+ sortStrategy);
+ }
+ }
+
+ enum OrderType {
+ ORDER("order"),
+ ZORDER("zorder"),
+ HILBERT("hilbert");
+
+ private final String orderType;
+
+ OrderType(String orderType) {
+ this.orderType = orderType;
+ }
+
+ @Override
+ public String toString() {
+ return "order type: " + orderType;
+ }
+
+ public static OrderType of(String orderType) {
+ if (ORDER.orderType.equals(orderType)) {
+ return ORDER;
+ } else if (ZORDER.orderType.equals(orderType)) {
+ return ZORDER;
+ } else if (HILBERT.orderType.equals(orderType)) {
+ return HILBERT;
+ }
+
+ throw new IllegalArgumentException("cannot match type: " +
orderType + " for ordering");
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
new file mode 100644
index 000000000..465ab89d0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.flink.action.SortCompactAction;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/**
+ * This is a table sorter which will sort the records by the z-order of
specified columns. It works
+ * on stream api. It computes the z-order-index by {@link ZIndexer}. After add
the column of
+ * z-order, it does the range shuffle and sort. Finally, {@link
SortCompactAction} will remove the
+ * "z-order" column and insert sorted record to overwrite the origin table.
+ */
+public class ZorderSorter extends TableSorter {
+
+ public ZorderSorter(
+ StreamExecutionEnvironment batchTEnv,
+ DataStream<RowData> origin,
+ FileStoreTable table,
+ List<String> zOrderColNames) {
+ super(batchTEnv, origin, table, zOrderColNames);
+ }
+
+ @Override
+ public DataStream<RowData> sort() {
+ return sortStreamByZOrder(origin, table);
+ }
+
+ /**
+ * Sort the input stream by the given order columns with z-order.
+ *
+ * @param inputStream the stream waited to be sorted
+ * @return the sorted data stream
+ */
+ private DataStream<RowData> sortStreamByZOrder(
+ DataStream<RowData> inputStream, FileStoreTable table) {
+ ZIndexer zIndexer = new ZIndexer(table.rowType(), orderColNames);
+ return ZorderSorterUtils.sortStreamByZorder(inputStream, zIndexer,
table);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
new file mode 100644
index 000000000..7e6c78192
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/ZorderSorterUtils.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.shuffle.RangeShuffle;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.Pair;
+
+import
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+
+/**
+ * This is a table sorter which will sort the records by the z-order of the
z-indexer generates. It
+ * is a global sort method, we will shuffle the input stream through z-order.
After sorted, we
+ * convert datastream from paimon RowData back to Flink RowData
+ *
+ * <pre>
+ * toPaimonDataStream add
z-order column range shuffle by z-order
local sort
remove z-index
+ * DataStream[RowData] -------------------> DataStream[PaimonRowData]
-------------------> DataStream[PaimonRowData] ------------------------->
DataStream[PaimonRowData] -----------------------> DataStream[PaimonRowData
sorted] ----------------------------> DataStream[RowData sorted]
+ *
back to
flink RowData
+ * </pre>
+ */
+public class ZorderSorterUtils {
+
+ private static final RowType KEY_TYPE =
+ new RowType(Collections.singletonList(new DataField(0, "Z_INDEX",
DataTypes.BYTES())));
+
+ /**
+ * Sort the input stream by z-order.
+ *
+ * @param inputStream the stream wait to be ordered
+ * @param zIndexer generate z-index by the given row
+ * @param table the FileStoreTable
+ * @return
+ */
+ public static DataStream<RowData> sortStreamByZorder(
+ DataStream<RowData> inputStream, ZIndexer zIndexer, FileStoreTable
table) {
+
+ final RowType valueRowType = table.rowType();
+ final int fieldCount = valueRowType.getFieldCount();
+ final int parallelism = inputStream.getParallelism();
+ final int sampleSize = parallelism * 1000;
+ final int rangeNum = parallelism * 10;
+ final long maxSortMemory = table.coreOptions().writeBufferSize();
+ final int pageSize = table.coreOptions().pageSize();
+
+ // generate the z-index as the key of Pair.
+ DataStream<Pair<byte[], RowData>> inputWithKey =
+ inputStream
+ .map(
+ new RichMapFunction<RowData, Pair<byte[],
RowData>>() {
+
+ @Override
+ public void open(Configuration parameters)
throws Exception {
+ super.open(parameters);
+ zIndexer.open();
+ }
+
+ @Override
+ public Pair<byte[], RowData> map(RowData
value) {
+ byte[] zorder = zIndexer.index(new
FlinkRowWrapper(value));
+ return Pair.of(Arrays.copyOf(zorder,
zorder.length), value);
+ }
+ })
+ .setParallelism(parallelism);
+
+ // range shuffle by z-index key
+ return RangeShuffle.rangeShuffleByKey(
+ inputWithKey,
+ (Comparator<byte[]> & Serializable)
+ (b1, b2) -> {
+ assert b1.length == b2.length;
+ for (int i = 0; i < b1.length; i++) {
+ int ret = UnsignedBytes.compare(b1[i],
b2[i]);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return 0;
+ },
+ byte[].class,
+ sampleSize,
+ rangeNum)
+ .map(
+ a ->
+ new JoinedRow(
+ GenericRow.of(a.getLeft()),
+ new FlinkRowWrapper(a.getRight())),
+ TypeInformation.of(InternalRow.class))
+ .setParallelism(parallelism)
+ // sort the output locally by `SortOperator`
+ .transform(
+ "LOCAL SORT",
+ TypeInformation.of(InternalRow.class),
+ new SortOperator(KEY_TYPE, valueRowType,
maxSortMemory, pageSize))
+ .setParallelism(parallelism)
+ // remove the z-index column from every row
+ .map(
+ new RichMapFunction<InternalRow, InternalRow>() {
+
+ private transient KeyProjectedRow keyProjectedRow;
+
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ int[] map = new int[fieldCount];
+ for (int i = 0; i < map.length; i++) {
+ map[i] = i + 1;
+ }
+ keyProjectedRow = new KeyProjectedRow(map);
+ }
+
+ @Override
+ public InternalRow map(InternalRow value) throws
Exception {
+ return keyProjectedRow.replaceRow(value);
+ }
+ })
+ .setParallelism(parallelism)
+ .map(FlinkRowData::new, inputStream.getType())
+ .setParallelism(parallelism);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
index 63e24026f..2fb4ee2e4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java
@@ -35,7 +35,7 @@ import java.util.Map;
import java.util.Queue;
/**
- * Pre-calculate which splits each task should process according to the
weight, and then distribute
+ * Pre-calculate which splits each task should zvalue according to the weight,
and then distribute
* the splits fairly.
*/
public class PreAssignSplitAssigner implements SplitAssigner {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
new file mode 100644
index 000000000..256a5460f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/** Order Rewrite Action tests for {@link SortCompactAction}. */
+public class OrderRewriteActionITCase extends ActionITCaseBase {
+
+ private static final Random random = new Random();
+
+ private Catalog catalog;
+ @TempDir private java.nio.file.Path path;
+
+ private void prepareData(int size, int loop) throws Exception {
+ createTable();
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (int i = 0; i < loop; i++) {
+ commitMessages.addAll(writeData(size));
+ }
+ commit(commitMessages);
+ }
+
+ @Test
+ public void testAllBasicTypeWorksWithZorder() throws Exception {
+ new CompactActionFactory().printHelp();
+ prepareData(300, 1);
+ // All the basic types should support zorder
+ Assertions.assertThatCode(
+ () ->
+ zorder(
+ Arrays.asList(
+ "f0", "f1", "f2", "f3", "f4",
"f5", "f6", "f7",
+ "f8", "f9", "f10", "f11",
"f12", "f13", "f14",
+ "f15")))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testZorderActionWorks() throws Exception {
+ prepareData(300, 30);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(getTable().rowType());
+ Predicate predicate = predicateBuilder.between(1, 100, 200);
+
+ List<ManifestEntry> files =
+ ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+ List<ManifestEntry> filesFilter =
+ ((AppendOnlyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withFilter(predicate)
+ .plan()
+ .files();
+ // before zorder, we don't filter any file
+ Assertions.assertThat(files.size()).isEqualTo(filesFilter.size());
+
+ zorder(Arrays.asList("f2", "f1"));
+
+ files = ((AppendOnlyFileStoreTable)
getTable()).store().newScan().plan().files();
+ filesFilter =
+ ((AppendOnlyFileStoreTable) getTable())
+ .store()
+ .newScan()
+ .withFilter(predicate)
+ .plan()
+ .files();
+ Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+ System.out.println("before: " + files.size() + " after: " +
filesFilter.size());
+ }
+
+ private void zorder(List<String> columns) throws Exception {
+ SortCompactAction sortCompactAction =
+ new SortCompactAction(
+ new Path(path.toUri()).toUri().toString(),
+ "my_db",
+ "Orders1",
+ Collections.emptyMap());
+ sortCompactAction.withOrderStrategy("zorder");
+ sortCompactAction.withOrderColumns(columns);
+ sortCompactAction.run();
+ }
+
+ public Catalog getCatalog() {
+ if (catalog == null) {
+ Options options = new Options();
+ options.set(CatalogOptions.WAREHOUSE, new
Path(path.toUri()).toUri().toString());
+ catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
+ }
+ return catalog;
+ }
+
+ public void createTable() throws Exception {
+ getCatalog().createDatabase("my_db", true);
+ getCatalog().createTable(identifier(), schema(), true);
+ }
+
+ public Identifier identifier() {
+ return Identifier.create("my_db", "Orders1");
+ }
+
+ private void commit(List<CommitMessage> messages) throws Exception {
+ getTable().newBatchWriteBuilder().newCommit().commit(messages);
+ }
+
+ // schema with all the basic types.
+ private static Schema schema() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.TINYINT());
+ schemaBuilder.column("f1", DataTypes.INT());
+ schemaBuilder.column("f2", DataTypes.SMALLINT());
+ schemaBuilder.column("f3", DataTypes.STRING());
+ schemaBuilder.column("f4", DataTypes.DOUBLE());
+ schemaBuilder.column("f5", DataTypes.CHAR(10));
+ schemaBuilder.column("f6", DataTypes.VARCHAR(10));
+ schemaBuilder.column("f7", DataTypes.BOOLEAN());
+ schemaBuilder.column("f8", DataTypes.DATE());
+ schemaBuilder.column("f9", DataTypes.TIME());
+ schemaBuilder.column("f10", DataTypes.TIMESTAMP());
+ schemaBuilder.column("f11", DataTypes.DECIMAL(10, 2));
+ schemaBuilder.column("f12", DataTypes.BYTES());
+ schemaBuilder.column("f13", DataTypes.FLOAT());
+ schemaBuilder.column("f14", DataTypes.BINARY(10));
+ schemaBuilder.column("f15", DataTypes.VARBINARY(10));
+ schemaBuilder.option("bucket", "-1");
+ schemaBuilder.option("scan.parallelism", "6");
+ schemaBuilder.option("sink.parallelism", "3");
+ schemaBuilder.option("target-file-size", "1 M");
+ schemaBuilder.partitionKeys("f0");
+ return schemaBuilder.build();
+ }
+
+ private List<CommitMessage> writeData(int size) throws Exception {
+ List<CommitMessage> messages = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ messages.addAll(writeOnce(getTable(), i, size));
+ }
+
+ return messages;
+ }
+
+ public Table getTable() throws Exception {
+ return getCatalog().getTable(identifier());
+ }
+
+ private static List<CommitMessage> writeOnce(Table table, int p, int size)
throws Exception {
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < size; i++) {
+ for (int j = 0; j < size; j++) {
+ batchTableWrite.write(data(p, i, j));
+ }
+ }
+ return batchTableWrite.prepareCommit();
+ }
+ }
+
+ private static InternalRow data(int p, int i, int j) {
+ return GenericRow.of(
+ (byte) p,
+ j,
+ (short) i,
+ BinaryString.fromString(String.valueOf(j)),
+ 0.1 + i,
+ BinaryString.fromString(String.valueOf(j)),
+ BinaryString.fromString(String.valueOf(i)),
+ j % 2 == 1,
+ i,
+ j,
+ Timestamp.fromEpochMillis(i),
+ Decimal.zero(10, 2),
+ String.valueOf(i).getBytes(),
+ (float) 0.1 + j,
+ randomBytes(),
+ randomBytes());
+ }
+
+ private static byte[] randomBytes() {
+ byte[] binary = new byte[random.nextInt(10)];
+ random.nextBytes(binary);
+ return binary;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
index 4db0d4111..08e182464 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegisterITCase.java
@@ -43,7 +43,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Fail.fail;
/** Tests for {@link KafkaLogStoreRegister}. */
-public class KafkaLogStoreRegisterTest extends KafkaTableTestBase {
+public class KafkaLogStoreRegisterITCase extends KafkaTableTestBase {
private static final String DATABASE = "mock_db";
private static final String TABLE = "mock_table";