This is an automated email from the ASF dual-hosted git repository.

yaozhq pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 49fd5c66 feat: Optimizing Sorting Performance via Radix Sort Algorithm 
(#589)
49fd5c66 is described below

commit 49fd5c6666e7f7e1bcd3b5c5a8e1166e816dd46c
Author: Fan Tianlan <[email protected]>
AuthorDate: Mon Oct 13 10:16:52 2025 +0800

    feat: Optimizing Sorting Performance via Radix Sort Algorithm (#589)
    
    * Optimizing Sorting Performance via Radix Sort Algorithm
    
    * Fix style
    
    * Optimizing the efficiency of the radix sort algorithm
    
    * Max String Precision
    
    * add tests
    
    * modify pom
    
    * fix style
    
    * fix style2
    
    * fix style2
    
    * refractor equals for BinaryStringType
    
    * concurrency-safe sorting
---
 .../java/org/apache/geaflow/common/type/Types.java |   4 +-
 .../common/type/primitive/BinaryStringType.java    |  31 +++
 .../geaflow/dsl/catalog/console/CatalogUtil.java   |   6 +-
 .../org/apache/geaflow/dsl/util/SqlTypeUtil.java   |   9 +-
 geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml    |  11 +
 .../runtime/function/table/OrderByHeapSort.java    |  76 +++++
 .../runtime/function/table/OrderByRadixSort.java   |  59 ++++
 .../dsl/runtime/function/table/OrderByTimSort.java |  62 +++++
 .../function/table/order/MultiFieldRadixSort.java  | 264 ++++++++++++++++++
 .../dsl/runtime/function/table/order/SortInfo.java |  20 ++
 .../dsl/runtime/plan/PhysicSortRelNode.java        |  15 +-
 .../runtime/benchmark/OrderMemoryBenchmark.java    | 178 ++++++++++++
 .../dsl/runtime/benchmark/OrderTimeBenchmark.java  | 307 +++++++++++++++++++++
 .../geaflow/dsl/runtime/plan/StepPlanTest.java     |   2 +-
 .../dsl/runtime/query/MultiFieldRadixSortTest.java | 297 ++++++++++++++++++++
 .../apache/geaflow/dsl/runtime/query/SortTest.java |   9 +
 .../src/test/resources/expect/sort_004.txt         |  22 ++
 .../resources/query/sort_004.sql}                  |  42 +--
 geaflow/geaflow-dsl/pom.xml                        |   2 +-
 19 files changed, 1385 insertions(+), 31 deletions(-)

diff --git 
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
 
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
index ea365a0a..64481553 100644
--- 
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
+++ 
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/Types.java
@@ -94,7 +94,7 @@ public class Types {
         return TYPE_IMMUTABLE_MAP.get(type);
     }
 
-    public static IType<?> of(String typeName) {
+    public static IType<?> of(String typeName, int precision) {
         if (typeName == null) {
             throw new IllegalArgumentException("typeName is null");
         }
@@ -116,7 +116,7 @@ public class Types {
             case TYPE_NAME_DECIMAL:
                 return DECIMAL;
             case TYPE_NAME_BINARY_STRING:
-                return BINARY_STRING;
+                return new BinaryStringType(precision);
             case TYPE_NAME_TIMESTAMP:
                 return TIMESTAMP;
             case TYPE_NAME_DATE:
diff --git 
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
 
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
index 1d4bf2be..d9817f07 100644
--- 
a/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
+++ 
b/geaflow/geaflow-common/src/main/java/org/apache/geaflow/common/type/primitive/BinaryStringType.java
@@ -27,6 +27,37 @@ public class BinaryStringType implements IType<BinaryString> 
{
 
     public static final BinaryStringType INSTANCE = new BinaryStringType();
 
+    private int precision;
+
+    public BinaryStringType() {
+
+    }
+
+    public BinaryStringType(int precision) {
+        this.precision = precision;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null) {
+            return false;
+        }
+
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        return true;
+    }
+
     @Override
     public String getName() {
         return Types.TYPE_NAME_BINARY_STRING;
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
index e3aa4db6..6f200a17 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-catalog/src/main/java/org/apache/geaflow/dsl/catalog/console/CatalogUtil.java
@@ -98,7 +98,7 @@ public class CatalogUtil {
                 idFieldName = fieldModel.getName();
             }
             String typeName = convertTypeName(fieldModel.getType().name());
-            IType<?> fieldType = Types.of(typeName);
+            IType<?> fieldType = Types.of(typeName, -1);
             TableField field = new TableField(fieldModel.getName(), fieldType, 
false);
             fields.add(field);
         }
@@ -154,7 +154,7 @@ public class CatalogUtil {
                 default:
             }
             String typeName = convertTypeName(fieldModel.getType().name());
-            IType<?> fieldType = Types.of(typeName);
+            IType<?> fieldType = Types.of(typeName, -1);
             TableField field = new TableField(fieldModel.getName(), fieldType, 
false);
             fields.add(field);
         }
@@ -237,7 +237,7 @@ public class CatalogUtil {
         List<TableField> fields = new ArrayList<>(fieldModels.size());
         for (FieldModel fieldModel : fieldModels) {
             String typeName = convertTypeName(fieldModel.getType().name());
-            IType<?> fieldType = Types.of(typeName);
+            IType<?> fieldType = Types.of(typeName, -1);
             TableField field = new TableField(fieldModel.getName(), fieldType, 
false);
             fields.add(field);
         }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
index a79d3c24..dc66526a 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-parser/src/main/java/org/apache/geaflow/dsl/util/SqlTypeUtil.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.geaflow.common.type.IType;
 import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
 import org.apache.geaflow.dsl.calcite.EdgeRecordType;
 import org.apache.geaflow.dsl.calcite.GraphRecordType;
 import org.apache.geaflow.dsl.calcite.PathRecordType;
@@ -47,7 +48,7 @@ public final class SqlTypeUtil {
     public static IType<?> convertType(SqlDataTypeSpec typeSpec) {
         String typeName = typeSpec.getTypeName().getSimple().toUpperCase();
         typeName = convertTypeName(typeName);
-        return Types.of(typeName);
+        return Types.of(typeName, typeSpec.getPrecision());
     }
 
     public static IType<?> convertType(RelDataType type) {
@@ -90,7 +91,7 @@ public final class SqlTypeUtil {
 
     public static IType<?> ofTypeName(SqlTypeName sqlTypeName) {
         String typeName = convertTypeName(sqlTypeName.getName());
-        return Types.of(typeName);
+        return Types.of(typeName, sqlTypeName.getPrecision());
     }
 
     public static RelDataType convertToRelType(IType<?> type, boolean 
isNullable,
@@ -129,7 +130,9 @@ public final class SqlTypeUtil {
             default:
                 if (type.isPrimitive()) {
                     String sqlTypeName = convertToSqlTypeName(type);
-                    SqlTypeName typeName = SqlTypeName.valueOf(sqlTypeName);
+                    SqlTypeName typeName = Types.getType(type.getTypeClass()) 
== Types.BINARY_STRING
+                        ? SqlTypeName.get(sqlTypeName, ((BinaryStringType) 
type).getPrecision())
+                        : SqlTypeName.get(sqlTypeName);
                     return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(typeName), 
isNullable);
                 } else {
                     throw new GeaFlowDSLException("Not support type: " + type);
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
index 111d3608..042dfcfc 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
@@ -116,6 +116,17 @@
             <artifactId>testng</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <!-- include kafka server for tests  -->
             <groupId>org.apache.kafka</groupId>
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
new file mode 100644
index 00000000..dc57095c
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByHeapSort.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator;
+
+public class OrderByHeapSort implements OrderByFunction {
+
+    private final SortInfo sortInfo;
+
+    private PriorityQueue<Row> topNQueue;
+
+    private TopNRowComparator<Row> topNRowComparator;
+
+    public OrderByHeapSort(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        this.topNRowComparator = new TopNRowComparator<>(sortInfo);
+        this.topNQueue = new PriorityQueue<>(
+            sortInfo.fetch, topNRowComparator.getNegativeComparator());
+    }
+
+    @Override
+    public void process(Row row) {
+        if (topNQueue.size() == sortInfo.fetch) {
+            if (sortInfo.orderByFields.isEmpty()) {
+                return;
+            }
+            Row top = topNQueue.peek();
+            if (topNQueue.comparator().compare(top, row) < 0) {
+                topNQueue.remove();
+                topNQueue.add(row);
+            }
+        } else {
+            topNQueue.add(row);
+        }
+    }
+
+    @Override
+    public Iterable<Row> finish() {
+        List<Row> results = new ArrayList<>();
+        while (!topNQueue.isEmpty()) {
+            results.add(topNQueue.remove());
+        }
+        Collections.reverse(results);
+        topNQueue.clear();
+        return results;
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
new file mode 100644
index 00000000..515d2c48
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByRadixSort.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+
+public class OrderByRadixSort implements OrderByFunction {
+
+    private final SortInfo sortInfo;
+
+    private List<Row> allRows;
+
+    public OrderByRadixSort(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        this.allRows = new ArrayList<>();
+    }
+
+    @Override
+    public void process(Row row) {
+        if (sortInfo.fetch == 0) {
+            return;
+        }
+        allRows.add(row);
+    }
+
+    @Override
+    public Iterable<Row> finish() {
+        List<Row> sortedRows = new ArrayList<>(allRows);
+        MultiFieldRadixSort.multiFieldRadixSort(sortedRows, sortInfo);
+        allRows.clear();
+        return sortedRows;
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
new file mode 100644
index 00000000..6b4e216d
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/OrderByTimSort.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.function.FunctionContext;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.apache.geaflow.dsl.runtime.function.table.order.TopNRowComparator;
+
+public class OrderByTimSort implements OrderByFunction {
+
+    private final SortInfo sortInfo;
+
+    private List<Row> allRows;
+
+    private TopNRowComparator<Row> topNRowComparator;
+
+    public OrderByTimSort(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        this.topNRowComparator = new TopNRowComparator<>(sortInfo);
+        this.allRows = new ArrayList<>();
+    }
+
+    @Override
+    public void process(Row row) {
+        if (sortInfo.fetch == 0) {
+            return;
+        }
+        allRows.add(row);
+    }
+
+    @Override
+    public Iterable<Row> finish() {
+        List<Row> sortedRows = new ArrayList<>(allRows);
+        sortedRows.sort(topNRowComparator);
+        allRows.clear();
+        return sortedRows;
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
new file mode 100644
index 00000000..223ebecb
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/MultiFieldRadixSort.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.function.table.order;
+
+import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.dsl.common.data.Row;
+
+public class MultiFieldRadixSort {
+
+    private static final ThreadLocal<Integer> dataSize = new ThreadLocal<>();
+    private static final ThreadLocal<int[]> intValues = new ThreadLocal<>();
+    private static final ThreadLocal<int[]> sortedIntValues = new 
ThreadLocal<>();
+    private static final ThreadLocal<int[]> charCodes = new ThreadLocal<>();
+    private static final ThreadLocal<byte[]> digits = new ThreadLocal<>();
+    private static final ThreadLocal<String[]> stringValues = new 
ThreadLocal<>();
+    private static final ThreadLocal<String[]> sortedStringValues = new 
ThreadLocal<>();
+    private static final ThreadLocal<Row[]> srcData = new ThreadLocal<>();
+    private static final ThreadLocal<Row[]> dstData = new ThreadLocal<>();
+
+    /**
+     * Multi-field radix sort.
+     */
+    public static void multiFieldRadixSort(List<Row> data, SortInfo sortInfo) {
+        if (data == null || data.size() <= 1) {
+            return;
+        }
+        int size = data.size();
+
+        try {
+            dataSize.set(size);
+            intValues.set(new int[size]);
+            sortedIntValues.set(new int[size]);
+            charCodes.set(new int[size]);
+            digits.set(new byte[size]);
+            stringValues.set(new String[size]);
+            sortedStringValues.set(new String[size]);
+            srcData.set(data.toArray(new Row[0]));
+            dstData.set(new Row[size]);
+
+            // Sort by field with the lowest priority.
+            List<OrderByField> fields = sortInfo.orderByFields;
+
+            for (int i = fields.size() - 1; i >= 0; i--) {
+                OrderByField field = fields.get(i);
+                if (field.expression.getOutputType().getTypeClass() == 
Integer.class) {
+                    radixSortByIntField(field);
+                } else {
+                    radixSortByStringField(field);
+                }
+            }
+
+            Row[] finalData = srcData.get();
+            for (int j = 0; j < size; j++) {
+                data.set(j, finalData[j]);
+            }
+        } finally {
+            dataSize.remove();
+            intValues.remove();
+            sortedIntValues.remove();
+            charCodes.remove();
+            digits.remove();
+            stringValues.remove();
+            sortedStringValues.remove();
+            srcData.remove();
+            dstData.remove();
+        }
+    }
+
+    /**
+     * Radix sort by integer field.
+     */
+    private static void radixSortByIntField(OrderByField field) {
+        int size = dataSize.get();
+        int[] intVals = intValues.get();
+        byte[] digs = digits.get();
+        Row[] src = srcData.get();
+        
+        // Determine the number of digits.
+        int max = Integer.MIN_VALUE;
+        int min = Integer.MAX_VALUE;
+        boolean hasNull = false;
+        
+        for (int i = 0; i < size; i++) {
+            Integer value = (Integer) field.expression.evaluate(src[i]);
+            if (value != null) {
+                intVals[i] = value;
+                max = value > max ? value : max;
+                min = value < min ? value : min;
+            } else {
+                intVals[i] = Integer.MIN_VALUE;
+                hasNull = true;
+            }
+        }
+        if (hasNull) {
+            min--;
+        }
+        
+        // Handling negative numbers: Add the offset to all numbers to make 
them positive.
+        final int offset = min < 0 ? -min : 0;
+        max += offset;
+
+        for (int i = 0; i < size; i++) {
+            if (intVals[i] == Integer.MIN_VALUE) {
+                intVals[i] = min;
+            }
+            intVals[i] += offset;
+        }
+
+        // Bitwise sorting.
+        for (int exp = 1; max / exp > 0; exp *= 10) {
+            for (int j = 0; j < size; j++) {
+                digs[j] = (byte) (intVals[j] / exp % 10);
+            }
+            countingSortByDigit(field.order.value > 0);
+        }
+    }
+
+    /**
+     * Radix sorting by string field.
+     */
+    private static void radixSortByStringField(OrderByField field) {
+        int size = dataSize.get();
+        String[] strVals = stringValues.get();
+        Row[] src = srcData.get();
+        
+        // Precompute all strings to avoid repeated evaluation and toString.
+        int maxLength = 0;
+        
+        for (int i = 0; i < size; i++) {
+            BinaryString binaryString = (BinaryString) 
field.expression.evaluate(src[i]);
+            strVals[i] = binaryString != null ? binaryString.toString() : "";
+            maxLength = Math.max(maxLength, strVals[i].length());
+        }
+
+        // Sort from the last digit of the string.
+        for (int pos = maxLength - 1; pos >= 0; pos--) {
+            countingSortByChar(field.order.value > 0, pos);
+        }
+    }
+
+    /**
+     * Sort by the specified number of digits (integer).
+     */
+    private static void countingSortByDigit(boolean ascending) {
+        int size = dataSize.get();
+        byte[] digs = digits.get();
+        int[] intVals = intValues.get();
+        int[] sortedIntVals = sortedIntValues.get();
+        Row[] src = srcData.get();
+        Row[] dst = dstData.get();
+
+        int[] count = new int[10];
+
+        // Count the number of times each number appears.
+        for (int i = 0; i < size; i++) {
+            count[digs[i]]++;
+        }
+
+        // Calculate cumulative count.
+        if (ascending) {
+            for (int i = 1; i < 10; i++) {
+                count[i] += count[i - 1];
+            }
+        } else {
+            for (int i = 8; i >= 0; i--) {
+                count[i] += count[i + 1];
+            }
+        }
+
+        // Build the output array from back to front (to ensure stability).
+        for (int i = size - 1; i >= 0; i--) {
+            int index = --count[digs[i]];
+            dst[index] = src[i];
+            sortedIntVals[index] = intVals[i];
+        }
+
+        int[] intTmp = intVals;
+        intValues.set(sortedIntVals);
+        sortedIntValues.set(intTmp);
+        
+        Row[] rowTmp = src;
+        srcData.set(dst);
+        dstData.set(rowTmp);
+    }
+
+    /**
+     * Sort by the specified number of digits (string).
+     */
+    private static void countingSortByChar(boolean ascending, int pos) {
+        int size = dataSize.get();
+        String[] strVals = stringValues.get();
+        String[] sortedStrVals = sortedStringValues.get();
+        int[] charCds = charCodes.get();
+        Row[] src = srcData.get();
+        Row[] dst = dstData.get();
+
+        // Precompute all strings and character codes to avoid repeated 
evaluate and toString.
+        int minChar = Integer.MAX_VALUE;
+        int maxChar = Integer.MIN_VALUE;
+
+        for (int i = 0; i < size; i++) {
+            String value = strVals[i];
+            if (pos < value.length()) {
+                int charCode = value.codePointAt(pos);
+                charCds[i] = charCode;
+                minChar = Math.min(minChar, charCode);
+                maxChar = Math.max(maxChar, charCode);
+            }
+        }
+        int range = maxChar - minChar + 2;
+        int[] count = new int[range];
+
+        for (int i = 0; i < size; i++) {
+            if (pos < strVals[i].length()) {
+                charCds[i] -= (minChar - 1);
+            } else {
+                charCds[i] = 0; // null character
+            }
+            count[charCds[i]]++;
+        }
+
+        if (ascending) {
+            for (int i = 1; i < range; i++) {
+                count[i] += count[i - 1];
+            }
+        } else {
+            for (int i = range - 2; i >= 0; i--) {
+                count[i] += count[i + 1];
+            }
+        }
+
+        for (int i = size - 1; i >= 0; i--) {
+            int index = --count[charCds[i]];
+            dst[index] = src[i];
+            sortedStrVals[index] = strVals[i];
+        }
+
+        String[] stringTmp = strVals;
+        stringValues.set(sortedStrVals);
+        sortedStringValues.set(stringTmp);
+        
+        Row[] rowTmp = src;
+        srcData.set(dst);
+        dstData.set(rowTmp);
+    }
+}
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
index b7bbf238..c90c9e21 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
@@ -23,6 +23,9 @@ import com.google.common.collect.Lists;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
 
 public class SortInfo implements Serializable {
 
@@ -36,4 +39,21 @@ public class SortInfo implements Serializable {
         sortInfo.fetch = this.fetch;
         return sortInfo;
     }
+
+    public boolean isRadixSortable() {
+        for (int i = 0; i < this.orderByFields.size(); i++) {
+            OrderByField field = this.orderByFields.get(i);
+            IType<?> orderType = field.expression.getOutputType();
+            if (orderType.getTypeClass() != Integer.class && 
orderType.getTypeClass() != BinaryString.class) {
+                return false;
+            } else if (orderType.getTypeClass() == BinaryString.class) {
+                int precision = ((BinaryStringType) orderType).getPrecision();
+                // MongoDB ObjectId: 24-character hexadecimal
+                if (precision > 24 || precision < 0) {
+                    return false;  
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
index 08c2179a..7823ecee 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/plan/PhysicSortRelNode.java
@@ -38,7 +38,9 @@ import org.apache.geaflow.dsl.runtime.RuntimeTable;
 import org.apache.geaflow.dsl.runtime.expression.ExpressionTranslator;
 import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
 import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
-import org.apache.geaflow.dsl.runtime.function.table.OrderByFunctionImpl;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
 import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
 import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
 import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
@@ -69,8 +71,15 @@ public class PhysicSortRelNode extends Sort implements 
PhysicRelNode<RuntimeTabl
     public RuntimeTable translate(QueryContext context) {
         SortInfo sortInfo = buildSortInfo();
         RDataView dataView = ((PhysicRelNode<?>) 
getInput()).translate(context);
-
-        OrderByFunction orderByFunction = new OrderByFunctionImpl(sortInfo);
+        
+        OrderByFunction orderByFunction;
+        if (sortInfo.fetch > 0) {
+            orderByFunction = new OrderByHeapSort(sortInfo);
+        } else if (sortInfo.isRadixSortable()) {
+            orderByFunction = new OrderByRadixSort(sortInfo);
+        } else {
+            orderByFunction = new OrderByTimSort(sortInfo);
+        }
         if (dataView.getType() == ViewType.TABLE) {
             return ((RuntimeTable) dataView).orderBy(orderByFunction);
         } else {
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
new file mode 100644
index 00000000..94a8a6fd
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderMemoryBenchmark.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.benchmark;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.runtime.expression.Expression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.profile.GCProfiler;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Measurement(iterations = 10)
+@Fork(1)
+public class OrderMemoryBenchmark {
+    
+    @Param({"10000", "100000", "1000000"})
+    private int dataSize;
+    
+    @Param({"100", "1000", "10000"})
+    private int topN;
+    
+    private OrderByFunction orderByFunction;
+    private List<Row> testData;
+    private SortInfo sortInfo = new SortInfo();
+    
+    @Setup(Level.Trial)
+    public void setup() {
+        // Create sort expression
+        Expression expression = new FieldExpression(0, Types.INTEGER);
+
+        OrderByField orderByField = new OrderByField();
+        orderByField.expression = expression;
+        orderByField.order = ORDER.ASC;
+
+        List<OrderByField> orderByFields = new ArrayList<>(1);
+        orderByFields.add(orderByField);
+
+        sortInfo.orderByFields = orderByFields;
+        sortInfo.fetch = topN;
+        
+        // Generate test data
+        testData = generateTestData();
+    }
+    
+    private List<Row> generateTestData() {
+        List<Row> data = new ArrayList<>(dataSize);
+        Random random = new Random(42);
+        
+        for (int i = 0; i < dataSize; i++) {
+            Object[] values = {random.nextInt(dataSize * 10)};
+            data.add(ObjectRow.create(values));
+        }
+        
+        return data;
+    }
+    
+    @Benchmark
+    public Iterable<Row> benchmarkHeapSortMemory() {
+        // Create a copy of the input data to avoid state pollution
+        List<Row> inputData = new ArrayList<>(testData);
+
+        orderByFunction = new OrderByHeapSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+        
+        // Perform Top-N sorting
+        return orderByFunction.finish();
+    }
+
+    @Benchmark
+    public Iterable<Row> benchmarkRadixSortMemory() {
+        List<Row> inputData = new ArrayList<>(testData);
+        
+        orderByFunction = new OrderByRadixSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+        
+        return orderByFunction.finish();
+    }
+    
+    @Benchmark
+    public Iterable<Row> benchmarkTimSortMemory() {
+        List<Row> inputData = new ArrayList<>(testData);
+        
+        orderByFunction = new OrderByTimSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+        
+        return orderByFunction.finish();
+    }
+    
+    public static void main(String[] args) throws RunnerException {
+        // Run a verification first
+        OrderMemoryBenchmark benchmark = new OrderMemoryBenchmark();
+        benchmark.dataSize = 10;
+        benchmark.topN = 10;
+        benchmark.setup();
+        Iterable<Row> heapResults = benchmark.benchmarkHeapSortMemory();
+        System.out.println("===HEAP_SORT===");
+        for (Row result: heapResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+        System.out.println("===RADIX_SORT===");
+        Iterable<Row> radixResults = benchmark.benchmarkRadixSortMemory();
+        for (Row result: radixResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+        System.out.println("===TIM_SORT===");
+        Iterable<Row> timResults = benchmark.benchmarkTimSortMemory();
+        for (Row result: timResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+
+        String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new 
Date());
+        String resultFile = "target/benchmark-results/memory-" + timestamp + 
".json";
+
+        Options opt = new OptionsBuilder()
+            .include(OrderMemoryBenchmark.class.getSimpleName())
+            .addProfiler(GCProfiler.class)
+            .jvmArgs("-Xms2g", "-Xmx4g")
+            .result(resultFile)
+            .resultFormat(ResultFormatType.JSON)
+            .build();
+        
+        new Runner(opt).run();
+    }
+}
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
new file mode 100644
index 00000000..96bdf77e
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/benchmark/OrderTimeBenchmark.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.benchmark;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.Types;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.runtime.expression.Expression;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByFunction;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByHeapSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.OrderByTimSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.openjdk.jmh.annotations.*;
+import org.openjdk.jmh.results.format.ResultFormatType;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(2)
+public class OrderTimeBenchmark {
+    private static final String CHARS = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+    
+    @Param({"1000"})
+    private int dataSize;
+    
+    @Param({"1000"})
+    private int topN;
+    
+    @Param({"RANDOM", "SORTED", "REVERSE_SORTED", "PARTIAL_SORTED", 
"DUPLICATED"})
+    private String dataPattern;
+    
+    @Param({"STRING"})
+    private String dataType;
+    
+    private OrderByFunction orderByFunction;
+    private List<Row> testData;
+    private SortInfo sortInfo = new SortInfo();
+    
+    @Setup(Level.Trial)
+    public void setupBenchmark() {
+        // Create sort expression
+        setupOrderByExpressions();
+        
+        // Generate test data
+        testData = generateTestData();
+    }
+    
+    private void setupOrderByExpressions() {
+        IType<?> fieldType;
+        switch (dataType) {
+            case "INTEGER":
+                fieldType = Types.INTEGER;
+                break;
+            case "DOUBLE":
+                fieldType = Types.DOUBLE;
+                break;
+            case "STRING":
+                fieldType = Types.BINARY_STRING;
+                break;
+            default:
+                fieldType = Types.INTEGER;
+        }
+        
+        List<OrderByField> orderByFields = new ArrayList<>(2);
+
+        // Primary sort field
+        Expression expression1 = new FieldExpression(0, fieldType);
+        OrderByField orderByField1 = new OrderByField();
+        orderByField1.expression = expression1;
+        orderByField1.order = ORDER.ASC;
+        orderByFields.add(orderByField1);
+
+        // Add a secondary sort field (for testing multi-field sorting 
performance)
+        Expression expression2 = new FieldExpression(1, Types.INTEGER);
+        OrderByField orderByField2 = new OrderByField();
+        orderByField2.expression = expression2;
+        orderByField2.order = ORDER.ASC;
+        orderByFields.add(orderByField2);
+
+        sortInfo.orderByFields = orderByFields;
+        sortInfo.fetch = topN;
+    }
+    
+    private List<Row> generateTestData() {
+        List<Row> data = new ArrayList<>(dataSize);
+        Random random = new Random(42); // Fixed seeds ensure reproducibility
+        
+        for (int i = 0; i < dataSize; i++) {
+            Object[] values = new Object[2];
+            
+            // Generate the value of the primary sort field
+            switch (dataType) {
+                case "INTEGER":
+                    values[0] = generateIntegerValue(i, random);
+                    break;
+                case "DOUBLE":
+                    values[0] = generateDoubleValue(i, random);
+                    break;
+                case "STRING":
+                    values[0] = BinaryString.fromString(generateStringValue(i, 
random));
+                    break;
+                default:
+                    return data;
+            }
+            
+            // Generate the value of the secondary sort field
+            values[1] = random.nextInt(100);
+            
+            data.add(ObjectRow.create(values));
+        }
+        
+        return data;
+    }
+    
+    private Integer generateIntegerValue(int index, Random random) {
+        switch (dataPattern) {
+            case "RANDOM":
+                return random.nextInt(dataSize * 10);
+            case "SORTED":
+                return index;
+            case "REVERSE_SORTED":
+                return dataSize - index;
+            case "PARTIAL_SORTED":
+                // 70% ordered, 30% random
+                return index < dataSize * 0.7 ? index : 
random.nextInt(dataSize);
+            case "DUPLICATED":
+                // Generate a large number of repeated values
+                return random.nextInt(dataSize / 10);
+            default:
+                return random.nextInt(dataSize);
+        }
+    }
+    
+    private Double generateDoubleValue(int index, Random random) {
+        switch (dataPattern) {
+            case "RANDOM":
+                return random.nextDouble() * dataSize * 10;
+            case "SORTED":
+                return (double) index + random.nextDouble();
+            case "REVERSE_SORTED":
+                return (double) (dataSize - index) + random.nextDouble();
+            case "PARTIAL_SORTED":
+                return index < dataSize * 0.7
+                    ? (double) index + random.nextDouble()
+                    : random.nextDouble() * dataSize;
+            case "DUPLICATED":
+                return (double) (random.nextInt(dataSize / 10)) + 
random.nextDouble();
+            default:
+                return random.nextDouble() * dataSize;
+        }
+    }
+    
+    private String generateStringValue(int index, Random random) {
+        String[] prefixes = {"A", "B", "C", "D", "E", "F", "G", "H", "I", "J"};
+        
+        switch (dataPattern) {
+            case "RANDOM":
+                return generateRandomString(1, 101, random);
+            case "SORTED":
+                return String.format("R%0100d", index);
+            case "REVERSE_SORTED":
+                return String.format("R%0100d", dataSize - index);
+            case "PARTIAL_SORTED":
+                return index < dataSize * 0.7
+                    ? String.format("R%0100d", index)
+                    : generateRandomString(1, 101, random);
+            case "DUPLICATED":
+                return prefixes[random.nextInt(3)]
+                    + String.format("%0100d", random.nextInt(dataSize / 10));
+            default:
+                return String.format("R%0100d", random.nextInt(dataSize));
+        }
+    }
+
+    private String generateRandomString(int length, Random random) {
+        StringBuilder sb = new StringBuilder(length);
+        for (int i = 0; i < length; i++) {
+            sb.append(CHARS.charAt(random.nextInt(CHARS.length())));
+        }
+        return sb.toString();
+    }
+
+    private String generateRandomString(int minLength, int maxLength, Random 
random) {
+        int length = minLength + random.nextInt(maxLength - minLength + 1);
+        return generateRandomString(length, random);
+    }
+    
+    
+    @Benchmark
+    public Iterable<Row> benchmarkHeapSort() {
+        // Create a copy of the input data to avoid state pollution
+        List<Row> inputData = new ArrayList<>(testData);
+
+        orderByFunction = new OrderByHeapSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+
+        // Perform Top-N sorting
+        return orderByFunction.finish();
+    }
+
+    @Benchmark
+    public Iterable<Row> benchmarkRadixSort() {
+        List<Row> inputData = new ArrayList<>(testData);
+        
+        orderByFunction = new OrderByRadixSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+
+        return orderByFunction.finish();
+    }
+    
+    @Benchmark
+    public Iterable<Row> benchmarkTimSort() {
+        List<Row> inputData = new ArrayList<>(testData);
+        
+        orderByFunction = new OrderByTimSort(sortInfo);
+        orderByFunction.open(null);
+
+        for (int i = 0; i < dataSize; i++) {
+            orderByFunction.process(inputData.get(i));
+        }
+
+        return orderByFunction.finish();
+    }
+    
+    public static void main(String[] args) throws RunnerException {
+        // Run a verification first
+        OrderTimeBenchmark benchmark = new OrderTimeBenchmark();
+        benchmark.dataSize = 10;
+        benchmark.topN = 10;
+        benchmark.dataPattern = "RANDOM";
+        benchmark.dataType = "INTEGER";
+        benchmark.setupBenchmark();
+        Iterable<Row> heapResults = benchmark.benchmarkHeapSort();
+        System.out.println("===HEAP_SORT===");
+        for (Row result: heapResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+        System.out.println("===RADIX_SORT===");
+        Iterable<Row> radixResults = benchmark.benchmarkRadixSort();
+        for (Row result: radixResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+        System.out.println("===TIM_SORT===");
+        Iterable<Row> timResults = benchmark.benchmarkTimSort();
+        for (Row result: timResults) {
+            System.out.print(result);
+        }
+        System.out.println();
+
+        String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new 
Date());
+        String resultFile = "target/benchmark-results/time-" + timestamp + 
".json";
+
+        Options opt = new OptionsBuilder()
+            .include(OrderTimeBenchmark.class.getSimpleName())
+            .jvmArgs("-Xms4g", "-Xmx8g", "-XX:+UseG1GC")
+            .result(resultFile)
+            .resultFormat(ResultFormatType.JSON)
+            .build();
+        
+        new Runner(opt).run();
+    }
+}
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
index eb08dd52..82d72af5 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/plan/StepPlanTest.java
@@ -175,7 +175,7 @@ public class StepPlanTest {
     }
 
     private GraphSchema createGraph() {
-        TableField idField = new TableField("id", Types.of("Long"), false);
+        TableField idField = new TableField("id", Types.of("Long", -1), false);
         VertexTable vTable = new VertexTable("default", "testV", 
Collections.singletonList(idField), "id");
         GeaFlowGraph graph = new GeaFlowGraph("default", "test", 
Lists.newArrayList(vTable),
             new ArrayList<>(), new HashMap<>(), new HashMap<>(), false, false);
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
new file mode 100644
index 00000000..1b2b4502
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/MultiFieldRadixSortTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.common.binary.BinaryString;
+import org.apache.geaflow.common.type.primitive.BinaryStringType;
+import org.apache.geaflow.common.type.primitive.IntegerType;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.runtime.expression.field.FieldExpression;
+import org.apache.geaflow.dsl.runtime.function.table.order.MultiFieldRadixSort;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField;
+import org.apache.geaflow.dsl.runtime.function.table.order.OrderByField.ORDER;
+import org.apache.geaflow.dsl.runtime.function.table.order.SortInfo;
+import org.testng.annotations.Test;
+
+public class MultiFieldRadixSortTest {
+
+    @Test
+    public void testSortEmptyList() {
+        List<Row> data = new ArrayList<>();
+
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 0);
+    }
+
+    @Test
+    public void testSortSingleElement() {
+        List<Row> data = new ArrayList<>(1);
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("test")}));
+
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 1);
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+    }
+
+    @Test
+    public void testSortByIntegerFieldAscending() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("c")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("b")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+        assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(2));
+        assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(3));
+    }
+
+    @Test
+    public void testSortByIntegerFieldDescending() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("c")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("b")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.DESC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(3));
+        assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(2));
+        assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+    }
+
+    @Test
+    public void testSortByStringFieldAscending() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("zebra")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("apple")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("banana")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField stringField = new OrderByField();
+        stringField.expression = new FieldExpression(1, 
BinaryStringType.INSTANCE);
+        stringField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(stringField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        assertEquals(((BinaryString)data.get(0).getField(1, 
BinaryStringType.INSTANCE)).toString(), "apple");
+        assertEquals(((BinaryString)data.get(1).getField(1, 
BinaryStringType.INSTANCE)).toString(), "banana");
+        assertEquals(((BinaryString)data.get(2).getField(1, 
BinaryStringType.INSTANCE)).toString(), "zebra");
+    }
+
+    @Test
+    public void testSortByStringFieldDescending() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("apple")}));
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("zebra")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("banana")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField stringField = new OrderByField();
+        stringField.expression = new FieldExpression(1, 
BinaryStringType.INSTANCE);
+        stringField.order = ORDER.DESC;
+        sortInfo.orderByFields.add(stringField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        assertEquals(((BinaryString)data.get(0).getField(1, 
BinaryStringType.INSTANCE)).toString(), "zebra");
+        assertEquals(((BinaryString)data.get(1).getField(1, 
BinaryStringType.INSTANCE)).toString(), "banana");
+        assertEquals(((BinaryString)data.get(2).getField(1, 
BinaryStringType.INSTANCE)).toString(), "apple");
+    }
+
+    @Test
+    public void testMultiFieldSort() {
+        List<Row> data = new ArrayList<>(4);
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("b")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("b")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(2);
+        
+        // First sort by integer field (ascending)
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        // Then sort by string field (ascending)
+        OrderByField stringField = new OrderByField();
+        stringField.expression = new FieldExpression(1, 
BinaryStringType.INSTANCE);
+        stringField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(stringField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 4);
+        // Expected order: (1, "a"), (1, "b"), (2, "a"), (2, "b")
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+        assertEquals(((BinaryString)data.get(0).getField(1, 
BinaryStringType.INSTANCE)).toString(), "a");
+        
+        assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+        assertEquals(((BinaryString)data.get(1).getField(1, 
BinaryStringType.INSTANCE)).toString(), "b");
+        
+        assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(2));
+        assertEquals(((BinaryString)data.get(2).getField(1, 
BinaryStringType.INSTANCE)).toString(), "a");
+        
+        assertEquals(data.get(3).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(2));
+        assertEquals(((BinaryString)data.get(3).getField(1, 
BinaryStringType.INSTANCE)).toString(), "b");
+    }
+
+    @Test
+    public void testSortWithNullValues() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{null, 
BinaryString.fromString("b")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("c")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        // Null values should appear first in ascending order
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), null);
+        assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+        assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(2));
+    }
+
+    @Test
+    public void testSortWithNegativeNumbers() {
+        List<Row> data = new ArrayList<>(4);
+        data.add(ObjectRow.create(new Object[]{-1, 
BinaryString.fromString("a")}));
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("b")}));
+        data.add(ObjectRow.create(new Object[]{-5, 
BinaryString.fromString("c")}));
+        data.add(ObjectRow.create(new Object[]{0, 
BinaryString.fromString("d")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 4);
+        assertEquals(data.get(0).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(-5));
+        assertEquals(data.get(1).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(-1));
+        assertEquals(data.get(2).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(0));
+        assertEquals(data.get(3).getField(0, IntegerType.INSTANCE), 
Integer.valueOf(3));
+    }
+
+    @Test
+    public void testSortWithEmptyStrings() {
+        List<Row> data = new ArrayList<>(3);
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("")}));
+        data.add(ObjectRow.create(new Object[]{2, 
BinaryString.fromString("hello")}));
+        data.add(ObjectRow.create(new Object[]{3, 
BinaryString.fromString("a")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField stringField = new OrderByField();
+        stringField.expression = new FieldExpression(1, 
BinaryStringType.INSTANCE);
+        stringField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(stringField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        // Empty string should come first
+        assertEquals(((BinaryString)data.get(0).getField(1, 
BinaryStringType.INSTANCE)).toString(), "");
+        assertEquals(((BinaryString)data.get(1).getField(1, 
BinaryStringType.INSTANCE)).toString(), "a");
+        assertEquals(((BinaryString)data.get(2).getField(1, 
BinaryStringType.INSTANCE)).toString(), "hello");
+    }
+
+    @Test
+    public void testSortStability() {
+        List<Row> data = new ArrayList<>(3);
+        // Create rows with same sort key but different secondary values
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("first")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("second")}));
+        data.add(ObjectRow.create(new Object[]{1, 
BinaryString.fromString("third")}));
+        
+        SortInfo sortInfo = new SortInfo();
+        sortInfo.orderByFields = new ArrayList<>(1);
+        OrderByField intField = new OrderByField();
+        intField.expression = new FieldExpression(0, IntegerType.INSTANCE);
+        intField.order = ORDER.ASC;
+        sortInfo.orderByFields.add(intField);
+        
+        MultiFieldRadixSort.multiFieldRadixSort(data, sortInfo);
+        
+        assertEquals(data.size(), 3);
+        // All should have the same integer value
+        for (Row row : data) {
+            assertEquals(row.getField(0, IntegerType.INSTANCE), 
Integer.valueOf(1));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
index a0823f19..f5e95786 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/SortTest.java
@@ -49,4 +49,13 @@ public class SortTest {
             .execute()
             .checkSinkResult();
     }
+
+    @Test
+    public void testSort_004() throws Exception {
+        QueryTester
+            .build()
+            .withQueryPath("/query/sort_004.sql")
+            .execute()
+            .checkSinkResult();
+    }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
new file mode 100644
index 00000000..0767504c
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/expect/sort_004.txt
@@ -0,0 +1,22 @@
+1489576693883,2,2,1,mills
+1489576693894,8,10,1,penys
+1489576693864,7,7,2,jacks
+1489576693844,5,5,3,jacks
+1489576693834,4,4,4,mills
+1489576693884,1,1,10,hello
+1489576693884,1,19,10,hello
+1489576693874,8,8,12,jacks
+1489576693883,1,20,14,mills
+1489576693883,1,21,14,中国
+1489576693854,6,6,15,calls
+1489576693883,2,22,15,中国
+1489576693184,7,11,22,penys
+1489576693822,3,3,30,mills
+1489576693884,9,9,31,penys
+1489576693284,6,12,33,penys
+1489576693864,5,13,44,jacks
+1489576693874,4,14,51,jacks
+1489576693884,3,15,61,penys
+1489576693894,2,16,71,penys
+1489576693184,1,17,81,penys
+1489576693284,5,18,91,penys
\ No newline at end of file
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
 b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
similarity index 58%
copy from 
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
copy to 
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
index b7bbf238..eae350ab 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/org/apache/geaflow/dsl/runtime/function/table/order/SortInfo.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/sort_004.sql
@@ -17,23 +17,29 @@
  * under the License.
  */
 
-package org.apache.geaflow.dsl.runtime.function.table.order;
+CREATE TABLE orders(
+       createTime bigint,
+       productId bigint,
+       orderId bigint,
+       units int,
+       user_name VARCHAR(8)
+) WITH (
+       type='file',
+       geaflow.dsl.file.path = 'resource:///data/orders.txt'
+);
 
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+CREATE TABLE tbl_result (
+         createTime bigint,
+       productId bigint,
+       orderId bigint,
+       units int,
+       user_name VARCHAR(8)
+) WITH (
+       type='file',
+       geaflow.dsl.file.path='${target}'
+);
 
-public class SortInfo implements Serializable {
-
-    public List<OrderByField> orderByFields = new ArrayList<>();
-
-    public int fetch = -1;
-
-    public SortInfo copy(List<OrderByField> orderByFields) {
-        SortInfo sortInfo = new SortInfo();
-        sortInfo.orderByFields = Lists.newArrayList(orderByFields);
-        sortInfo.fetch = this.fetch;
-        return sortInfo;
-    }
-}
+INSERT INTO tbl_result
+SELECT *
+FROM orders o
+ORDER BY units, user_name
diff --git a/geaflow/geaflow-dsl/pom.xml b/geaflow/geaflow-dsl/pom.xml
index b8d58db2..f7bb8be6 100644
--- a/geaflow/geaflow-dsl/pom.xml
+++ b/geaflow/geaflow-dsl/pom.xml
@@ -44,7 +44,7 @@
     </modules>
 
     <properties>
-        <calcite.version>1.18.0-geaflow_1.0</calcite.version>
+        <calcite.version>1.18.0-geaflow_1.1</calcite.version>
         <kafka.version>2.4.1</kafka.version>
         <kafka.scala.binary.version>2.11</kafka.scala.binary.version>
         <h2.version>2.1.214</h2.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to