This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa377aad5 [Flink] Support hilbert sort for flink (#2854)
fa377aad5 is described below

commit fa377aad582de70dba42103f555866f8627ef544
Author: TaoZex <[email protected]>
AuthorDate: Fri Mar 8 10:46:54 2024 +0800

    [Flink] Support hilbert sort for flink (#2854)
---
 docs/content/engines/flink.md                      |   2 +-
 .../layouts/shortcodes/generated/sort-compact.html |   2 +-
 paimon-core/pom.xml                                |   6 +
 .../apache/paimon/sort/hilbert/HilbertIndexer.java | 307 +++++++++++++++++++++
 .../apache/paimon}/utils/ConvertBinaryUtil.java    |   2 +-
 .../paimon/utils}/ConvertBinaryUtilTest.java       |   8 +-
 .../apache/paimon/flink/sorter/HilbertSorter.java  | 104 +++++++
 .../apache/paimon/flink/sorter/TableSorter.java    |   3 +-
 .../SortCompactActionForDynamicBucketITCase.java   |  38 +++
 .../SortCompactActionForUnawareBucketITCase.java   |  76 +++++
 .../apache/paimon/spark/sort/SparkHilbertUDF.java  |   2 +-
 11 files changed, 539 insertions(+), 11 deletions(-)

diff --git a/docs/content/engines/flink.md b/docs/content/engines/flink.md
index 822cbd7d7..bbad1c66f 100644
--- a/docs/content/engines/flink.md
+++ b/docs/content/engines/flink.md
@@ -357,7 +357,7 @@ table options syntax: we use string to represent table 
options. The format is 'k
          TO compact a table. Arguments:
             <li>identifier: the target table identifier. Cannot be empty.</li>
             <li>partitions: partition filter.</li>
-            <li>order_strategy: 'order' or 'zorder' or 'none'. Left empty for 
'none'.</li>
+            <li>order_strategy: 'order' or 'zorder' or 'hilbert' or 'none'. 
Left empty for 'none'.</li>
             <li>order_columns: the columns need to be sort. Left empty if 
'order_strategy' is 'none'.</li>
             <li>table_options: additional dynamic options of the table.</li>
       </td>
diff --git a/docs/layouts/shortcodes/generated/sort-compact.html 
b/docs/layouts/shortcodes/generated/sort-compact.html
index 5a7a36721..c876ba730 100644
--- a/docs/layouts/shortcodes/generated/sort-compact.html
+++ b/docs/layouts/shortcodes/generated/sort-compact.html
@@ -45,7 +45,7 @@ under the License.
     <tbody>
     <tr>
         <td><h5>--order_strategy</h5></td>
-        <td>the order strategy now only support "zorder" and "order". For 
example: --order_strategy zorder</td>
+        <td>the order strategy now support "zorder" and "hilbert" and "order". 
For example: --order_strategy zorder</td>
     </tr>
     <tr>
         <td><h5>--order_by</h5></td>
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 2652286b0..965f2de67 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -190,6 +190,12 @@ under the License.
             <version>3.6.1</version>
         </dependency>
 
+        <dependency>
+            <groupId>com.github.davidmoten</groupId>
+            <artifactId>hilbert-curve</artifactId>
+            <version>0.2.2</version>
+        </dependency>
+
         <dependency>
             <groupId>org.xerial</groupId>
             <artifactId>sqlite-jdbc</artifactId>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java 
b/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java
new file mode 100644
index 000000000..e0e55795f
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/hilbert/HilbertIndexer.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.sort.hilbert;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ConvertBinaryUtil;
+
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Hilbert indexer for responsibility to generate hilbert-index. */
+public class HilbertIndexer implements Serializable {
+
+    private static final long PRIMITIVE_EMPTY = Long.MAX_VALUE;
+    private static final int BITS_NUM = 63;
+
+    private final Set<RowProcessor> functionSet;
+    private final int[] fieldsIndex;
+
+    public HilbertIndexer(RowType rowType, List<String> orderColumns) {
+        checkArgument(orderColumns.size() > 1, "Hilbert sort needs at least 
two columns.");
+        List<String> fields = rowType.getFieldNames();
+        fieldsIndex = new int[orderColumns.size()];
+        for (int i = 0; i < fieldsIndex.length; i++) {
+            int index = fields.indexOf(orderColumns.get(i));
+            if (index == -1) {
+                throw new IllegalArgumentException(
+                        "Can't find column: "
+                                + orderColumns.get(i)
+                                + " in row type fields: "
+                                + fields);
+            }
+            fieldsIndex[i] = index;
+        }
+        this.functionSet = constructFunctionMap(rowType.getFields());
+    }
+
+    public void open() {
+        functionSet.forEach(RowProcessor::open);
+    }
+
+    public byte[] index(InternalRow row) {
+        Long[] columnLongs = new Long[fieldsIndex.length];
+
+        int index = 0;
+        for (RowProcessor f : functionSet) {
+            columnLongs[index++] = f.hilbertValue(row);
+        }
+        return hilbertCurvePosBytes(columnLongs);
+    }
+
+    public Set<RowProcessor> constructFunctionMap(List<DataField> fields) {
+        Set<RowProcessor> hilbertFunctionSet = new LinkedHashSet<>();
+
+        // Construct hilbertFunctionSet and fill dataTypes, rowFields
+        for (int index : fieldsIndex) {
+            DataField field = fields.get(index);
+            hilbertFunctionSet.add(hmapColumnToCalculator(field, index));
+        }
+        return hilbertFunctionSet;
+    }
+
+    public static RowProcessor hmapColumnToCalculator(DataField field, int 
index) {
+        DataType type = field.type();
+        return new RowProcessor(type.accept(new TypeVisitor(index)));
+    }
+
+    /** Type Visitor to generate function map from row column to 
hilbert-index. */
+    public static class TypeVisitor implements 
DataTypeVisitor<HProcessFunction>, Serializable {
+
+        private final int fieldIndex;
+
+        public TypeVisitor(int index) {
+            this.fieldIndex = index;
+        }
+
+        @Override
+        public HProcessFunction visit(CharType charType) {
+            return (row) -> {
+                if (row.isNullAt(fieldIndex)) {
+                    return PRIMITIVE_EMPTY;
+                } else {
+                    BinaryString binaryString = row.getString(fieldIndex);
+
+                    return 
ConvertBinaryUtil.convertBytesToLong(binaryString.toBytes());
+                }
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(VarCharType varCharType) {
+            return (row) -> {
+                if (row.isNullAt(fieldIndex)) {
+                    return PRIMITIVE_EMPTY;
+                } else {
+                    BinaryString binaryString = row.getString(fieldIndex);
+
+                    return 
ConvertBinaryUtil.convertBytesToLong(binaryString.toBytes());
+                }
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(BooleanType booleanType) {
+            return (row) -> {
+                if (row.isNullAt(fieldIndex)) {
+                    return PRIMITIVE_EMPTY;
+                }
+                return row.getBoolean(fieldIndex) ? PRIMITIVE_EMPTY : 0;
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(BinaryType binaryType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex)
+                            ? PRIMITIVE_EMPTY
+                            : 
ConvertBinaryUtil.convertBytesToLong(row.getBinary(fieldIndex));
+        }
+
+        @Override
+        public HProcessFunction visit(VarBinaryType varBinaryType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex)
+                            ? PRIMITIVE_EMPTY
+                            : 
ConvertBinaryUtil.convertBytesToLong(row.getBinary(fieldIndex));
+        }
+
+        @Override
+        public HProcessFunction visit(DecimalType decimalType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(decimalType, fieldIndex);
+            return (row) -> {
+                Object o = fieldGetter.getFieldOrNull(row);
+                return o == null ? PRIMITIVE_EMPTY : ((Decimal) 
o).toBigDecimal().longValue();
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(TinyIntType tinyIntType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex)
+                            ? PRIMITIVE_EMPTY
+                            : ConvertBinaryUtil.convertBytesToLong(
+                                    new byte[] {row.getByte(fieldIndex)});
+        }
+
+        @Override
+        public HProcessFunction visit(SmallIntType smallIntType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : (long) 
row.getShort(fieldIndex);
+        }
+
+        @Override
+        public HProcessFunction visit(IntType intType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : (long) 
row.getInt(fieldIndex);
+        }
+
+        @Override
+        public HProcessFunction visit(BigIntType bigIntType) {
+            return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : 
row.getLong(fieldIndex);
+        }
+
+        @Override
+        public HProcessFunction visit(FloatType floatType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex)
+                            ? PRIMITIVE_EMPTY
+                            : 
Double.doubleToLongBits(row.getFloat(fieldIndex));
+        }
+
+        @Override
+        public HProcessFunction visit(DoubleType doubleType) {
+            return (row) ->
+                    row.isNullAt(fieldIndex)
+                            ? PRIMITIVE_EMPTY
+                            : 
Double.doubleToLongBits(row.getDouble(fieldIndex));
+        }
+
+        @Override
+        public HProcessFunction visit(DateType dateType) {
+            return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : 
row.getLong(fieldIndex);
+        }
+
+        @Override
+        public HProcessFunction visit(TimeType timeType) {
+            return (row) -> row.isNullAt(fieldIndex) ? PRIMITIVE_EMPTY : 
row.getLong(fieldIndex);
+        }
+
+        @Override
+        public HProcessFunction visit(TimestampType timestampType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(timestampType, fieldIndex);
+            return (row) -> {
+                Object o = fieldGetter.getFieldOrNull(row);
+                return o == null ? PRIMITIVE_EMPTY : ((Timestamp) 
o).getMillisecond();
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(LocalZonedTimestampType 
localZonedTimestampType) {
+            final InternalRow.FieldGetter fieldGetter =
+                    InternalRow.createFieldGetter(localZonedTimestampType, 
fieldIndex);
+            return (row) -> {
+                Object o = fieldGetter.getFieldOrNull(row);
+                return o == null ? PRIMITIVE_EMPTY : ((Timestamp) 
o).getMillisecond();
+            };
+        }
+
+        @Override
+        public HProcessFunction visit(ArrayType arrayType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public HProcessFunction visit(MultisetType multisetType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public HProcessFunction visit(MapType mapType) {
+            throw new RuntimeException("Unsupported type");
+        }
+
+        @Override
+        public HProcessFunction visit(RowType rowType) {
+            throw new RuntimeException("Unsupported type");
+        }
+    }
+
+    /** Be used as converting row field record to devoted bytes. */
+    public static class RowProcessor implements Serializable {
+        private final HProcessFunction process;
+
+        public RowProcessor(HProcessFunction process) {
+            this.process = process;
+        }
+
+        public void open() {}
+
+        public Long hilbertValue(InternalRow o) {
+            return process.apply(o);
+        }
+    }
+
+    private byte[] hilbertCurvePosBytes(Long[] points) {
+        long[] data = 
Arrays.stream(points).mapToLong(Long::longValue).toArray();
+        HilbertCurve hilbertCurve = 
HilbertCurve.bits(BITS_NUM).dimensions(points.length);
+        BigInteger index = hilbertCurve.index(data);
+        return ConvertBinaryUtil.paddingToNByte(index.toByteArray(), BITS_NUM);
+    }
+
+    /** Process function interface. */
+    public interface HProcessFunction extends Function<InternalRow, Long>, 
Serializable {}
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
 b/paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java
similarity index 98%
rename from 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
rename to 
paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java
index d2acbf422..000815c29 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/ConvertBinaryUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ConvertBinaryUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.utils;
+package org.apache.paimon.utils;
 
 import java.nio.charset.StandardCharsets;
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
 b/paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java
similarity index 88%
rename from 
paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java
index c97a2fbfb..0a5efcfec 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/ConvertBinaryUtilTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ConvertBinaryUtilTest.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark;
-
-import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+package org.apache.paimon.utils;
 
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
@@ -26,8 +24,8 @@ import org.junit.jupiter.api.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.Random;
 
-import static 
org.apache.paimon.spark.utils.ConvertBinaryUtil.convertBytesToLong;
-import static 
org.apache.paimon.spark.utils.ConvertBinaryUtil.convertStringToLong;
+import static org.apache.paimon.utils.ConvertBinaryUtil.convertBytesToLong;
+import static org.apache.paimon.utils.ConvertBinaryUtil.convertStringToLong;
 
 /** Test for {@link ConvertBinaryUtil}. */
 public class ConvertBinaryUtilTest {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
new file mode 100644
index 000000000..944e0fc43
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/HilbertSorter.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sorter;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.action.SortCompactAction;
+import org.apache.paimon.sort.hilbert.HilbertIndexer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import 
org.apache.paimon.shade.guava30.com.google.common.primitives.UnsignedBytes;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is a table sorter which will sort the records by the hilbert curve of 
specified columns. It
+ * works on stream api. It computes the hilbert index by {@link 
HilbertIndexer}. After add the
+ * column of hilbert, it does the range shuffle and sort. Finally, {@link 
SortCompactAction} will
+ * remove the "hilbert" column and insert sorted record to overwrite the 
origin table.
+ */
+public class HilbertSorter extends TableSorter {
+
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new DataField(0, "H_INDEX", 
DataTypes.BYTES())));
+
+    public HilbertSorter(
+            StreamExecutionEnvironment batchTEnv,
+            DataStream<RowData> origin,
+            FileStoreTable table,
+            List<String> colNames) {
+        super(batchTEnv, origin, table, colNames);
+    }
+
+    @Override
+    public DataStream<RowData> sort() {
+        return sortStreamByHilbert(origin, table);
+    }
+
+    /**
+     * Sort the input stream by the given order columns with hilbert curve.
+     *
+     * @param inputStream the stream waited to be sorted
+     * @return the sorted data stream
+     */
+    private DataStream<RowData> sortStreamByHilbert(
+            DataStream<RowData> inputStream, FileStoreTable table) {
+        final HilbertIndexer hilbertIndexer = new 
HilbertIndexer(table.rowType(), orderColNames);
+        return SortUtils.sortStreamByKey(
+                inputStream,
+                table,
+                KEY_TYPE,
+                TypeInformation.of(byte[].class),
+                () ->
+                        (b1, b2) -> {
+                            assert b1.length == b2.length;
+                            for (int i = 0; i < b1.length; i++) {
+                                int ret = UnsignedBytes.compare(b1[i], b2[i]);
+                                if (ret != 0) {
+                                    return ret;
+                                }
+                            }
+                            return 0;
+                        },
+                new SortUtils.KeyAbstract<byte[]>() {
+                    @Override
+                    public void open() {
+                        hilbertIndexer.open();
+                    }
+
+                    @Override
+                    public byte[] apply(RowData value) {
+                        byte[] hilbert = hilbertIndexer.index(new 
FlinkRowWrapper(value));
+                        return Arrays.copyOf(hilbert, hilbert.length);
+                    }
+                },
+                GenericRow::of);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
index e5f1db0c2..e4f85ea9d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/TableSorter.java
@@ -78,8 +78,7 @@ public abstract class TableSorter {
             case ZORDER:
                 return new ZorderSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
             case HILBERT:
-                // todo support hilbert curve
-                throw new IllegalArgumentException("Not supported yet.");
+                return new HilbertSorter(batchTEnv, origin, fileStoreTable, 
orderColumns);
             default:
                 throw new IllegalArgumentException("cannot match order type: " 
+ sortStrategy);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index ff3616e65..a5195c2f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -107,6 +107,36 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
                 .isLessThan(filesFilter.size() / (double) files.size());
     }
 
+    @Test
+    public void testDynamicBucketSortWithOrderAndHilbert() throws Exception {
+        createTable();
+
+        commit(writeData(100));
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 100L, 200L);
+
+        // order f2,f1 will make predicate of f1 perform worse.
+        order(Arrays.asList("f2", "f1"));
+        List<ManifestEntry> files = 
getTable().store().newScan().plan().files();
+        List<ManifestEntry> filesFilter =
+                ((KeyValueFileStoreScan) getTable().store().newScan())
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+
+        hilbert(Arrays.asList("f2", "f1"));
+
+        List<ManifestEntry> filesHilbert = 
getTable().store().newScan().plan().files();
+        List<ManifestEntry> filesFilterHilbert =
+                ((KeyValueFileStoreScan) getTable().store().newScan())
+                        .withValueFilter(predicate)
+                        .plan()
+                        .files();
+
+        Assertions.assertThat(filesFilterHilbert.size() / (double) 
filesHilbert.size())
+                .isLessThan(filesFilter.size() / (double) files.size());
+    }
+
     @Test
     public void testDynamicBucketSortWithStringType() throws Exception {
         createTable();
@@ -146,6 +176,14 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
         }
     }
 
+    private void hilbert(List<String> columns) throws Exception {
+        if (RANDOM.nextBoolean()) {
+            createAction("hilbert", columns).run();
+        } else {
+            callProcedure("hilbert", columns);
+        }
+    }
+
     private void order(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
             createAction("order", columns).run();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index cadd70e89..ee43ca58e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -155,6 +155,20 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
                 .doesNotThrowAnyException();
     }
 
+    @Test
+    public void testAllBasicTypeWorksWithHilbert() throws Exception {
+        prepareData(300, 1);
+        // All the basic types should support hilbert
+        Assertions.assertThatCode(
+                        () ->
+                                hilbert(
+                                        Arrays.asList(
+                                                "f0", "f1", "f2", "f3", "f4", 
"f5", "f6", "f7",
+                                                "f8", "f9", "f10", "f11", 
"f12", "f13", "f14",
+                                                "f15")))
+                .doesNotThrowAnyException();
+    }
+
     @Test
     public void testZorderActionWorks() throws Exception {
         prepareData(300, 2);
@@ -181,6 +195,33 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
         Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
     }
 
+    @Test
+    public void testHilbertActionWorks() throws Exception {
+        prepareData(300, 2);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 100, 200);
+
+        List<ManifestEntry> files = 
getTable().store().newScan().plan().files();
+        List<ManifestEntry> filesFilter =
+                ((AppendOnlyFileStoreScan) getTable().store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+
+        // before hilbert, we don't filter any file
+        Assertions.assertThat(files.size()).isEqualTo(filesFilter.size());
+
+        hilbert(Arrays.asList("f2", "f1"));
+
+        files = getTable().store().newScan().plan().files();
+        filesFilter =
+                ((AppendOnlyFileStoreScan) getTable().store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+        Assertions.assertThat(files.size()).isGreaterThan(filesFilter.size());
+    }
+
     @Test
     public void testCompareZorderAndOrder() throws Exception {
         prepareData(300, 10);
@@ -208,6 +249,33 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
                 .isLessThan(filesFilterOrder.size() / (double) 
filesOrder.size());
     }
 
+    @Test
+    public void testCompareHilbertAndOrder() throws Exception {
+        prepareData(300, 10);
+
+        hilbert(Arrays.asList("f2", "f1"));
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(getTable().rowType());
+        Predicate predicate = predicateBuilder.between(1, 10, 20);
+
+        List<ManifestEntry> filesHilbert = 
getTable().store().newScan().plan().files();
+        List<ManifestEntry> filesFilterHilbert =
+                ((AppendOnlyFileStoreScan) getTable().store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+
+        order(Arrays.asList("f2", "f1"));
+        List<ManifestEntry> filesOrder = 
getTable().store().newScan().plan().files();
+        List<ManifestEntry> filesFilterOrder =
+                ((AppendOnlyFileStoreScan) getTable().store().newScan())
+                        .withFilter(predicate)
+                        .plan()
+                        .files();
+
+        Assertions.assertThat(filesFilterHilbert.size() / (double) 
filesHilbert.size())
+                .isLessThan(filesFilterOrder.size() / (double) 
filesOrder.size());
+    }
+
     @Test
     public void testTableConf() throws Exception {
         createTable();
@@ -270,6 +338,14 @@ public class SortCompactActionForUnawareBucketITCase 
extends ActionITCaseBase {
         }
     }
 
+    private void hilbert(List<String> columns) throws Exception {
+        if (RANDOM.nextBoolean()) {
+            createAction("hilbert", columns).run();
+        } else {
+            callProcedure("hilbert", columns);
+        }
+    }
+
     private void order(List<String> columns) throws Exception {
         if (RANDOM.nextBoolean()) {
             createAction("order", columns).run();
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
index 115974ac0..ed2a1bab0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkHilbertUDF.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.sort;
 
-import org.apache.paimon.spark.utils.ConvertBinaryUtil;
+import org.apache.paimon.utils.ConvertBinaryUtil;
 
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.expressions.UserDefinedFunction;

Reply via email to