This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 37509c71 [AURON #1739] Support LIMIT with OFFSET (#1740)
37509c71 is described below

commit 37509c7144941b5cebb726abf9178202fc50fab8
Author: yew1eb <[email protected]>
AuthorDate: Wed Jan 28 00:08:28 2026 +0800

    [AURON #1739] Support LIMIT with OFFSET (#1740)
    
    <!--
    - Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
    Short summary...'.
    -->
    # Which issue does this PR close?
    
    Closes #1739
    
    # Rationale for this change
    
    # What changes are included in this PR?
    
    # Are there any user-facing changes?
    
    # How was this patch tested?
---
 .../tpcds-plan-stability/spark-3.5/q1.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q10.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q14a.txt        |  2 +-
 .../tpcds-plan-stability/spark-3.5/q14b.txt        |  2 +-
 .../tpcds-plan-stability/spark-3.5/q15.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q18.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q21.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q22.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q23b.txt        |  2 +-
 .../tpcds-plan-stability/spark-3.5/q26.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q27.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q3.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q30.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q33.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q35.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q36.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q40.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q41.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q42.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q43.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q44.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q45.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q47.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q49.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q5.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q50.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q52.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q55.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q56.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q57.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q59.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q6.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q60.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q62.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q66.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q67.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q69.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q7.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q70.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q72.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q76.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q77.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q78.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q79.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q8.txt          |  2 +-
 .../tpcds-plan-stability/spark-3.5/q80.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q81.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q82.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q83.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q84.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q85.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q86.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q93.txt         |  2 +-
 .../tpcds-plan-stability/spark-3.5/q99.txt         |  2 +-
 .../comparison/PlanStabilityChecker.scala          |  1 +
 native-engine/auron-planner/proto/auron.proto      |  6 +-
 native-engine/auron-planner/src/planner.rs         | 22 +++--
 .../datafusion-ext-plans/src/limit_exec.rs         | 99 +++++++++++++++++++---
 .../datafusion-ext-plans/src/sort_exec.rs          | 85 +++++++++++++++++--
 .../org/apache/spark/sql/auron/ShimsImpl.scala     | 46 ++++++----
 .../auron/plan/NativeCollectLimitExec.scala        |  4 +-
 .../auron/plan/NativeGlobalLimitExec.scala         |  4 +-
 .../auron/plan/NativeLocalLimitExec.scala          |  2 +-
 .../auron/plan/NativePartialTakeOrderedExec.scala  |  2 +-
 .../auron/plan/NativeTakeOrderedExec.scala         |  5 +-
 .../org/apache/auron/exec/AuronExecSuite.scala     | 95 ++++++++++++++++++++-
 .../apache/spark/sql/auron/AuronConverters.scala   | 12 ++-
 .../scala/org/apache/spark/sql/auron/Shims.scala   | 26 ++++--
 .../auron/plan/NativeCollectLimitBase.scala        |  6 +-
 .../auron/plan/NativeGlobalLimitBase.scala         |  3 +-
 .../auron/plan/NativeLocalLimitBase.scala          |  2 +-
 .../auron/plan/NativeTakeOrderedBase.scala         | 59 +++++++------
 72 files changed, 436 insertions(+), 151 deletions(-)

diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
index da2e4839..23544dbc 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
@@ -566,7 +566,7 @@ Input [3]: [ctr_customer_sk#11, #30#30, #31#31]
 
 (96) NativeTakeOrdered
 Input [1]: [c_customer_id#31]
-Arguments: X, [c_customer_id#31 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#31 ASC NULLS FIRST]
 
 (97) Scan parquet
 Output [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, 
sr_return_amt#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
index 65b0b56c..2e2154e8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
@@ -687,7 +687,7 @@ Input [9]: [cd_gender#20, cd_marital_status#21, 
cd_education_status#22, cd_purch
 
 (118) NativeTakeOrdered
 Input [14]: [cd_gender#20, cd_marital_status#21, cd_education_status#22, 
cnt1#31, cd_purchase_estimate#23, cnt2#32, cd_credit_rating#24, cnt3#33, 
cd_dep_count#25, cnt4#34, cd_dep_employed_count#26, cnt5#35, 
cd_dep_college_count#27, cnt6#36]
-Arguments: X, [cd_gender#20 ASC NULLS FIRST, cd_marital_status#21 ASC NULLS 
FIRST, cd_education_status#22 ASC NULLS FIRST, cd_purchase_estimate#23 ASC 
NULLS FIRST, cd_credit_rating#24 ASC NULLS FIRST, cd_dep_count#25 ASC NULLS 
FIRST, cd_dep_employed_count#26 ASC NULLS FIRST, cd_dep_college_count#27 ASC 
NULLS FIRST]
+Arguments: X, X, [cd_gender#20 ASC NULLS FIRST, cd_marital_status#21 ASC NULLS 
FIRST, cd_education_status#22 ASC NULLS FIRST, cd_purchase_estimate#23 ASC 
NULLS FIRST, cd_credit_rating#24 ASC NULLS FIRST, cd_dep_count#25 ASC NULLS 
FIRST, cd_dep_employed_count#26 ASC NULLS FIRST, cd_dep_college_count#27 ASC 
NULLS FIRST]
 
 (119) Scan parquet
 Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
index 15c94b96..eae71c8d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
@@ -2161,7 +2161,7 @@ Input [7]: [channel#97, i_brand_id#98, i_class_id#99, 
i_category_id#100, spark_g
 
 (309) NativeTakeOrdered
 Input [6]: [channel#97, i_brand_id#98, i_class_id#99, i_category_id#100, 
sum(sales)#107, sum(number_sales)#108]
-Arguments: X, [channel#97 ASC NULLS FIRST, i_brand_id#98 ASC NULLS FIRST, 
i_class_id#99 ASC NULLS FIRST, i_category_id#100 ASC NULLS FIRST]
+Arguments: X, X, [channel#97 ASC NULLS FIRST, i_brand_id#98 ASC NULLS FIRST, 
i_class_id#99 ASC NULLS FIRST, i_category_id#100 ASC NULLS FIRST]
 
 (310) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_quantity#3, ss_list_price#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
index 6b891118..2ebf9c13 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
@@ -1539,7 +1539,7 @@ Join condition: None
 
 (223) NativeTakeOrdered
 Input [12]: [channel#51, i_brand_id#36, i_class_id#37, i_category_id#38, 
sales#52, number_sales#53, channel#72, i_brand_id#59, i_class_id#60, 
i_category_id#61, sales#73, number_sales#74]
-Arguments: X, [i_brand_id#36 ASC NULLS FIRST, i_class_id#37 ASC NULLS FIRST, 
i_category_id#38 ASC NULLS FIRST]
+Arguments: X, X, [i_brand_id#36 ASC NULLS FIRST, i_class_id#37 ASC NULLS 
FIRST, i_category_id#38 ASC NULLS FIRST]
 
 (224) NativeProject
 Output [12]: [channel#51, i_brand_id#36, i_class_id#37, i_category_id#38, 
sales#52, number_sales#53, channel#72, i_brand_id#59, i_class_id#60, 
i_category_id#61, sales#73, number_sales#74]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
index 832bc5fa..36e1a750 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
@@ -344,7 +344,7 @@ Input [2]: [ca_zip#8, 
sum(UnscaledValue(cs_sales_price#3))#15]
 
 (59) NativeTakeOrdered
 Input [2]: [ca_zip#8, sum(cs_sales_price)#16]
-Arguments: X, [ca_zip#8 ASC NULLS FIRST]
+Arguments: X, X, [ca_zip#8 ASC NULLS FIRST]
 
 (60) Scan parquet
 Output [3]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_sales_price#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
index fa49d904..44c7f90b 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
@@ -616,7 +616,7 @@ Input [12]: [i_item_id#28, ca_country#29, ca_state#30, 
ca_county#31, spark_group
 
 (106) NativeTakeOrdered
 Input [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, agg1#63, 
agg2#64, agg3#65, agg4#66, agg5#67, agg6#68, agg7#69]
-Arguments: X, [ca_country#29 DESC NULLS LAST, ca_state#30 DESC NULLS LAST, 
ca_county#31 DESC NULLS LAST, i_item_id#28 ASC NULLS FIRST]
+Arguments: X, X, [ca_country#29 DESC NULLS LAST, ca_state#30 DESC NULLS LAST, 
ca_county#31 DESC NULLS LAST, i_item_id#28 ASC NULLS FIRST]
 
 (107) Scan parquet
 Output [9]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_bill_cdemo_sk#3, 
cs_item_sk#4, cs_quantity#5, cs_list_price#6, cs_sales_price#7, 
cs_coupon_amt#8, cs_net_profit#9]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
index e2a1a086..ce1844ba 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
@@ -346,7 +346,7 @@ Condition : (CASE WHEN (inv_before#19 > 0) THEN 
((cast(inv_after#20 as double) /
 
 (59) NativeTakeOrdered
 Input [4]: [w_warehouse_name#6, i_item_id#8, inv_before#19, inv_after#20]
-Arguments: X, [w_warehouse_name#6 ASC NULLS FIRST, i_item_id#8 ASC NULLS FIRST]
+Arguments: X, X, [w_warehouse_name#6 ASC NULLS FIRST, i_item_id#8 ASC NULLS 
FIRST]
 
 (60) Scan parquet
 Output [4]: [inv_date_sk#1, inv_item_sk#2, inv_warehouse_sk#3, 
inv_quantity_on_hand#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
index 69747fa7..3dd14c22 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
@@ -341,7 +341,7 @@ Input [6]: [i_product_name#13, i_brand#14, i_class#15, 
i_category#16, spark_grou
 
 (58) NativeTakeOrdered
 Input [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#23]
-Arguments: X, [qoh#23 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST, 
i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS 
FIRST]
+Arguments: X, X, [qoh#23 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST, 
i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS 
FIRST]
 
 (59) Scan parquet
 Output [4]: [inv_date_sk#1, inv_item_sk#2, inv_warehouse_sk#3, 
inv_quantity_on_hand#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
index e28da843..6b2e09d6 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
@@ -1480,7 +1480,7 @@ Arguments: [#33, #32, #42]
 
 (248) NativeTakeOrdered
 Input [3]: [#33#33, #32#32, #42#42]
-Arguments: X, [c_last_name#33 ASC NULLS FIRST, c_first_name#32 ASC NULLS 
FIRST, sales#42 ASC NULLS FIRST]
+Arguments: X, X, [c_last_name#33 ASC NULLS FIRST, c_first_name#32 ASC NULLS 
FIRST, sales#42 ASC NULLS FIRST]
 
 (249) Scan parquet
 Output [5]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_item_sk#3, 
cs_quantity#4, cs_list_price#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
index 49eb29bd..cde99228 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
@@ -438,7 +438,7 @@ Input [5]: [i_item_id#16, avg(cs_quantity#5)#34, 
avg(UnscaledValue(cs_list_price
 
 (75) NativeTakeOrdered
 Input [5]: [i_item_id#16, agg1#38, agg2#39, agg3#40, agg4#41]
-Arguments: X, [i_item_id#16 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#16 ASC NULLS FIRST]
 
 (76) Scan parquet
 Output [8]: [cs_sold_date_sk#1, cs_bill_cdemo_sk#2, cs_item_sk#3, 
cs_promo_sk#4, cs_quantity#5, cs_list_price#6, cs_sales_price#7, 
cs_coupon_amt#8]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
index 3f33d987..ecd5cd0c 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
@@ -438,7 +438,7 @@ Input [7]: [i_item_id#19, s_state#20, spark_grouping_id#21, 
avg(ss_quantity#5)#3
 
 (75) NativeTakeOrdered
 Input [7]: [i_item_id#19, s_state#20, g_state#40, agg1#41, agg2#42, agg3#43, 
agg4#44]
-Arguments: X, [i_item_id#19 ASC NULLS FIRST, s_state#20 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#19 ASC NULLS FIRST, s_state#20 ASC NULLS FIRST]
 
 (76) Scan parquet
 Output [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_store_sk#4, 
ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
index 085e6b27..dc1f8dee 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_brand#9, i_brand_id#8, 
sum(UnscaledValue(ss_ext_sales_pr
 
 (44) NativeTakeOrdered
 Input [4]: [d_year#2, brand_id#15, brand#16, sum_agg#17]
-Arguments: X, [d_year#2 ASC NULLS FIRST, sum_agg#17 DESC NULLS LAST, 
brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [d_year#2 ASC NULLS FIRST, sum_agg#17 DESC NULLS LAST, 
brand_id#15 ASC NULLS FIRST]
 
 (45) Scan parquet
 Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
index 903eec46..53ab1e28 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
@@ -750,7 +750,7 @@ Input [15]: [ctr_total_return#15, c_customer_id#33, 
c_current_addr_sk#34, c_salu
 
 (129) NativeTakeOrdered
 Input [13]: [c_customer_id#33, c_salutation#35, c_first_name#36, 
c_last_name#37, c_preferred_cust_flag#38, c_birth_day#39, c_birth_month#40, 
c_birth_year#41, c_birth_country#42, c_login#43, c_email_address#44, 
c_last_review_date_sk#45, ctr_total_return#15]
-Arguments: X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS 
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST, 
c_preferred_cust_flag#38 ASC NULLS FIRST, c_birth_day#39 ASC NULLS FIRST, 
c_birth_month#40 ASC NULLS FIRST, c_birth_year#41 ASC NULLS FIRST, 
c_birth_country#42 ASC NULLS FIRST, c_login#43 ASC NULLS FIRST, 
c_email_address#44 ASC NULLS FIRST, c_last_review_date_sk#45 ASC NULLS FIRST, 
ctr_total_return#15 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS 
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST, 
c_preferred_cust_flag#38 ASC NULLS FIRST, c_birth_day#39 ASC NULLS FIRST, 
c_birth_month#40 ASC NULLS FIRST, c_birth_year#41 ASC NULLS FIRST, 
c_birth_country#42 ASC NULLS FIRST, c_login#43 ASC NULLS FIRST, 
c_email_address#44 ASC NULLS FIRST, c_last_review_date_sk#45 ASC NULLS FIRST, 
ctr_total_return#15 ASC NULLS FIRST]
 
 (130) Scan parquet
 Output [4]: [wr_returned_date_sk#1, wr_returning_customer_sk#2, 
wr_returning_addr_sk#3, wr_return_amt#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
index b69d9cf1..abf23aea 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_manufact_id#11, sum(total_sales#18)#46]
 
 (183) NativeTakeOrdered
 Input [2]: [i_manufact_id#11, total_sales#47]
-Arguments: X, [total_sales#47 ASC NULLS FIRST]
+Arguments: X, X, [total_sales#47 ASC NULLS FIRST]
 
 (184) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3, 
ss_ext_sales_price#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
index 47e8cd88..a78dc6fb 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
@@ -682,7 +682,7 @@ Input [16]: [ca_state#18, cd_gender#20, 
cd_marital_status#21, cd_dep_count#22, c
 
 (117) NativeTakeOrdered
 Input [18]: [ca_state#18, cd_gender#20, cd_marital_status#21, cnt1#50, 
min(cd_dep_count)#51, max(cd_dep_count)#52, avg(cd_dep_count)#53, 
cd_dep_employed_count#23, cnt2#54, min(cd_dep_employed_count)#55, 
max(cd_dep_employed_count)#56, avg(cd_dep_employed_count)#57, 
cd_dep_college_count#24, cnt3#58, min(cd_dep_college_count)#59, 
max(cd_dep_college_count)#60, avg(cd_dep_college_count)#61, cd_dep_count#22]
-Arguments: X, [ca_state#18 ASC NULLS FIRST, cd_gender#20 ASC NULLS FIRST, 
cd_marital_status#21 ASC NULLS FIRST, cd_dep_count#22 ASC NULLS FIRST, 
cd_dep_employed_count#23 ASC NULLS FIRST, cd_dep_college_count#24 ASC NULLS 
FIRST]
+Arguments: X, X, [ca_state#18 ASC NULLS FIRST, cd_gender#20 ASC NULLS FIRST, 
cd_marital_status#21 ASC NULLS FIRST, cd_dep_count#22 ASC NULLS FIRST, 
cd_dep_employed_count#23 ASC NULLS FIRST, cd_dep_college_count#24 ASC NULLS 
FIRST]
 
 (118) NativeProject
 Output [17]: [ca_state#18, cd_gender#20, cd_marital_status#21, cnt1#50, 
min(cd_dep_count)#51, max(cd_dep_count)#52, avg(cd_dep_count)#53, 
cd_dep_employed_count#23, cnt2#54, min(cd_dep_employed_count)#55, 
max(cd_dep_employed_count)#56, avg(cd_dep_employed_count)#57, 
cd_dep_college_count#24, cnt3#58, min(cd_dep_college_count)#59, 
max(cd_dep_college_count)#60, avg(cd_dep_college_count)#61]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
index 23324918..e894fb79 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
@@ -390,7 +390,7 @@ Input [8]: [gross_margin#23, i_category#13, i_class#14, 
lochierarchy#24, _w0#25,
 
 (67) NativeTakeOrdered
 Input [5]: [gross_margin#23, i_category#13, i_class#14, lochierarchy#24, 
rank_within_parent#28]
-Arguments: X, [lochierarchy#24 DESC NULLS LAST, CASE WHEN (lochierarchy#24 = 
0) THEN i_category#13 END ASC NULLS FIRST, rank_within_parent#28 ASC NULLS 
FIRST]
+Arguments: X, X, [lochierarchy#24 DESC NULLS LAST, CASE WHEN (lochierarchy#24 
= 0) THEN i_category#13 END ASC NULLS FIRST, rank_within_parent#28 ASC NULLS 
FIRST]
 
 (68) Scan parquet
 Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, 
ss_ext_sales_price#4, ss_net_profit#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
index ddac6fb5..e2c67ceb 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
@@ -426,7 +426,7 @@ Input [4]: [w_state#10, i_item_id#12, sum(CASE WHEN 
(d_date#15 < 2000-03-11) THE
 
 (73) NativeTakeOrdered
 Input [4]: [w_state#10, i_item_id#12, sales_before#25, sales_after#26]
-Arguments: X, [w_state#10 ASC NULLS FIRST, i_item_id#12 ASC NULLS FIRST]
+Arguments: X, X, [w_state#10 ASC NULLS FIRST, i_item_id#12 ASC NULLS FIRST]
 
 (74) Scan parquet
 Output [5]: [cs_sold_date_sk#1, cs_warehouse_sk#2, cs_item_sk#3, 
cs_order_number#4, cs_sales_price#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
index c35acf49..7ca464d7 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
@@ -204,7 +204,7 @@ Input [1]: [i_product_name#3]
 
 (33) NativeTakeOrdered
 Input [1]: [i_product_name#3]
-Arguments: X, [i_product_name#3 ASC NULLS FIRST]
+Arguments: X, X, [i_product_name#3 ASC NULLS FIRST]
 
 (34) Scan parquet
 Output [3]: [i_manufact_id#1, i_manufact#2, i_product_name#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
index 05eaecba..42ce1050 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_category_id#8, i_category#9, 
sum(UnscaledValue(ss_ext_sa
 
 (44) NativeTakeOrdered
 Input [4]: [d_year#2, i_category_id#8, i_category#9, 
sum(ss_ext_sales_price)#15]
-Arguments: X, [sum(ss_ext_sales_price)#15 DESC NULLS LAST, d_year#2 ASC NULLS 
FIRST, i_category_id#8 ASC NULLS FIRST, i_category#9 ASC NULLS FIRST]
+Arguments: X, X, [sum(ss_ext_sales_price)#15 DESC NULLS LAST, d_year#2 ASC 
NULLS FIRST, i_category_id#8 ASC NULLS FIRST, i_category#9 ASC NULLS FIRST]
 
 (45) Scan parquet
 Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
index 62b02fb1..78812804 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
@@ -260,7 +260,7 @@ Input [9]: [s_store_name#9, s_store_id#8, 
sum(UnscaledValue(CASE WHEN (d_day_nam
 
 (44) NativeTakeOrdered
 Input [9]: [s_store_name#9, s_store_id#8, sun_sales#33, mon_sales#34, 
tue_sales#35, wed_sales#36, thu_sales#37, fri_sales#38, sat_sales#39]
-Arguments: X, [s_store_name#9 ASC NULLS FIRST, s_store_id#8 ASC NULLS FIRST, 
sun_sales#33 ASC NULLS FIRST, mon_sales#34 ASC NULLS FIRST, tue_sales#35 ASC 
NULLS FIRST, wed_sales#36 ASC NULLS FIRST, thu_sales#37 ASC NULLS FIRST, 
fri_sales#38 ASC NULLS FIRST, sat_sales#39 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#9 ASC NULLS FIRST, s_store_id#8 ASC NULLS 
FIRST, sun_sales#33 ASC NULLS FIRST, mon_sales#34 ASC NULLS FIRST, tue_sales#35 
ASC NULLS FIRST, wed_sales#36 ASC NULLS FIRST, thu_sales#37 ASC NULLS FIRST, 
fri_sales#38 ASC NULLS FIRST, sat_sales#39 ASC NULLS FIRST]
 
 (45) Scan parquet
 Output [3]: [d_date_sk#1, d_year#2, d_day_name#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
index 8a11de5e..78f88221 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
@@ -426,7 +426,7 @@ Input [5]: [rnk#16, item_sk#23, i_product_name#29, #30#30, 
#31#31]
 
 (72) NativeTakeOrdered
 Input [3]: [rnk#16, best_performing#32, worst_performing#33]
-Arguments: X, [rnk#16 ASC NULLS FIRST]
+Arguments: X, X, [rnk#16 ASC NULLS FIRST]
 
 (73) Scan parquet
 Output [3]: [ss_item_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
index 56cbefa1..28242970 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
@@ -524,7 +524,7 @@ Input [3]: [ca_zip#10, ca_city#9, 
sum(UnscaledValue(ws_sales_price#5))#21]
 
 (90) NativeTakeOrdered
 Input [3]: [ca_zip#10, ca_city#9, sum(ws_sales_price)#22]
-Arguments: X, [ca_zip#10 ASC NULLS FIRST, ca_city#9 ASC NULLS FIRST]
+Arguments: X, X, [ca_zip#10 ASC NULLS FIRST, ca_city#9 ASC NULLS FIRST]
 
 (91) Scan parquet
 Output [4]: [ws_sold_date_sk#2, ws_item_sk#3, ws_bill_customer_sk#4, 
ws_sales_price#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
index 28a13858..8f643b80 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
@@ -674,7 +674,7 @@ Input [16]: [i_category#3, i_brand#2, s_store_name#12, 
s_company_name#13, d_year
 
 (109) NativeTakeOrdered
 Input [10]: [i_category#3, i_brand#2, s_store_name#12, s_company_name#13, 
d_year#9, d_moy#10, avg_monthly_sales#21, sum_sales#18, psum#41, nsum#42]
-Arguments: X, [(sum_sales#18 - avg_monthly_sales#21) ASC NULLS FIRST, 
s_store_name#12 ASC NULLS FIRST]
+Arguments: X, X, [(sum_sales#18 - avg_monthly_sales#21) ASC NULLS FIRST, 
s_store_name#12 ASC NULLS FIRST]
 
 (110) Scan parquet
 Output [3]: [i_item_sk#1, i_brand#2, i_category#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
index 75d831e3..e852b2b2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
@@ -920,7 +920,7 @@ Input [5]: [channel#35, item#30, return_ratio#31, 
return_rank#33, currency_rank#
 
 (157) NativeTakeOrdered
 Input [5]: [channel#35, item#30, return_ratio#31, return_rank#33, 
currency_rank#34]
-Arguments: X, [channel#35 ASC NULLS FIRST, return_rank#33 ASC NULLS FIRST, 
currency_rank#34 ASC NULLS FIRST, item#30 ASC NULLS FIRST]
+Arguments: X, X, [channel#35 ASC NULLS FIRST, return_rank#33 ASC NULLS FIRST, 
currency_rank#34 ASC NULLS FIRST, item#30 ASC NULLS FIRST]
 
 (158) Scan parquet
 Output [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#3, 
ws_quantity#4, ws_net_paid#5, ws_net_profit#6]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
index 12e94f13..4aadbc9d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
@@ -973,7 +973,7 @@ Input [6]: [channel#127, id#128, spark_grouping_id#129, 
sum(sales#38)#136, sum(r
 
 (166) NativeTakeOrdered
 Input [5]: [channel#127, id#128, sales#139, returns#140, profit#141]
-Arguments: X, [channel#127 ASC NULLS FIRST, id#128 ASC NULLS FIRST]
+Arguments: X, X, [channel#127 ASC NULLS FIRST, id#128 ASC NULLS FIRST]
 
 (167) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_store_sk#2, ss_ext_sales_price#3, 
ss_net_profit#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
index 2e1a6058..b93b243a 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
@@ -426,7 +426,7 @@ Input [15]: [s_store_name#11, s_company_id#12, 
s_street_number#13, s_street_name
 
 (73) NativeTakeOrdered
 Input [15]: [s_store_name#11, s_company_id#12, s_street_number#13, 
s_street_name#14, s_street_type#15, s_suite_number#16, s_city#17, s_county#18, 
s_state#19, s_zip#20, 30 days #41, 31 - 60 days #42, 61 - 90 days #43, 91 - 120 
days #44, >120 days #45]
-Arguments: X, [s_store_name#11 ASC NULLS FIRST, s_company_id#12 ASC NULLS 
FIRST, s_street_number#13 ASC NULLS FIRST, s_street_name#14 ASC NULLS FIRST, 
s_street_type#15 ASC NULLS FIRST, s_suite_number#16 ASC NULLS FIRST, s_city#17 
ASC NULLS FIRST, s_county#18 ASC NULLS FIRST, s_state#19 ASC NULLS FIRST, 
s_zip#20 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#11 ASC NULLS FIRST, s_company_id#12 ASC NULLS 
FIRST, s_street_number#13 ASC NULLS FIRST, s_street_name#14 ASC NULLS FIRST, 
s_street_type#15 ASC NULLS FIRST, s_suite_number#16 ASC NULLS FIRST, s_city#17 
ASC NULLS FIRST, s_county#18 ASC NULLS FIRST, s_state#19 ASC NULLS FIRST, 
s_zip#20 ASC NULLS FIRST]
 
 (74) Scan parquet
 Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_customer_sk#3, ss_store_sk#4, 
ss_ticket_number#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
index 2e5625ad..2615c3ba 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_brand#9, i_brand_id#8, 
sum(UnscaledValue(ss_ext_sales_pr
 
 (44) NativeTakeOrdered
 Input [4]: [d_year#2, brand_id#15, brand#16, ext_price#17]
-Arguments: X, [d_year#2 ASC NULLS FIRST, ext_price#17 DESC NULLS LAST, 
brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [d_year#2 ASC NULLS FIRST, ext_price#17 DESC NULLS LAST, 
brand_id#15 ASC NULLS FIRST]
 
 (45) Scan parquet
 Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
index bc1a58f2..2da4331f 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
@@ -260,7 +260,7 @@ Input [3]: [i_brand#9, i_brand_id#8, 
sum(UnscaledValue(ss_ext_sales_price#6))#14
 
 (44) NativeTakeOrdered
 Input [3]: [brand_id#15, brand#16, ext_price#17]
-Arguments: X, [ext_price#17 DESC NULLS LAST, brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [ext_price#17 DESC NULLS LAST, brand_id#15 ASC NULLS FIRST]
 
 (45) Scan parquet
 Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
index 885ce41e..e1aec1a9 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_item_id#11, sum(total_sales#18)#46]
 
 (183) NativeTakeOrdered
 Input [2]: [i_item_id#11, total_sales#47]
-Arguments: X, [total_sales#47 ASC NULLS FIRST, i_item_id#11 ASC NULLS FIRST]
+Arguments: X, X, [total_sales#47 ASC NULLS FIRST, i_item_id#11 ASC NULLS FIRST]
 
 (184) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3, 
ss_ext_sales_price#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
index 3ee7d5b0..31b9e645 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
@@ -674,7 +674,7 @@ Input [14]: [i_category#3, i_brand#2, cc_name#12, d_year#9, 
d_moy#10, sum_sales#
 
 (109) NativeTakeOrdered
 Input [9]: [i_category#3, i_brand#2, cc_name#12, d_year#9, d_moy#10, 
avg_monthly_sales#20, sum_sales#17, psum#38, nsum#39]
-Arguments: X, [(sum_sales#17 - avg_monthly_sales#20) ASC NULLS FIRST, 
cc_name#12 ASC NULLS FIRST]
+Arguments: X, X, [(sum_sales#17 - avg_monthly_sales#20) ASC NULLS FIRST, 
cc_name#12 ASC NULLS FIRST]
 
 (110) Scan parquet
 Output [3]: [i_item_sk#1, i_brand#2, i_category#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
index 06fef646..af91f9dd 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
@@ -601,7 +601,7 @@ Input [19]: [s_store_name1#41, d_week_seq1#42, 
s_store_id1#43, sun_sales1#44, mo
 
 (102) NativeTakeOrdered
 Input [10]: [s_store_name1#41, s_store_id1#43, d_week_seq1#42, (sun_sales1 / 
sun_sales2)#73, (mon_sales1 / mon_sales2)#74, (tue_sales1 / tue_sales2)#75, 
(wed_sales1 / wed_sales2)#76, (thu_sales1 / thu_sales2)#77, (fri_sales1 / 
fri_sales2)#78, (sat_sales1 / sat_sales2)#79]
-Arguments: X, [s_store_name1#41 ASC NULLS FIRST, s_store_id1#43 ASC NULLS 
FIRST, d_week_seq1#42 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name1#41 ASC NULLS FIRST, s_store_id1#43 ASC NULLS 
FIRST, d_week_seq1#42 ASC NULLS FIRST]
 
 (103) Scan parquet
 Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_sales_price#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
index 16995838..94c71db2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
@@ -552,7 +552,7 @@ Condition : (cnt#27 >= 10)
 
 (94) NativeTakeOrdered
 Input [3]: [state#26, cnt#27, ca_state#2]
-Arguments: X, [cnt#27 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST]
+Arguments: X, X, [cnt#27 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST]
 
 (95) NativeProject
 Output [2]: [state#26, cnt#27]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
index 52698607..1c0bbd80 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_item_id#11, sum(total_sales#18)#46]
 
 (183) NativeTakeOrdered
 Input [2]: [i_item_id#11, total_sales#47]
-Arguments: X, [i_item_id#11 ASC NULLS FIRST, total_sales#47 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#11 ASC NULLS FIRST, total_sales#47 ASC NULLS FIRST]
 
 (184) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3, 
ss_ext_sales_price#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
index 76af56eb..b5907e28 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
@@ -426,7 +426,7 @@ Input [8]: [_groupingexpression#14, sm_type#9, web_name#11, 
sum(CASE WHEN ((ws_s
 
 (73) NativeTakeOrdered
 Input [8]: [substr(w_warehouse_name, 1, 20)#31, sm_type#9, web_name#11, 30 
days #32, 31 - 60 days #33, 61 - 90 days #34, 91 - 120 days #35, >120 days #36]
-Arguments: X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, sm_type#9 
ASC NULLS FIRST, web_name#11 ASC NULLS FIRST]
+Arguments: X, X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, 
sm_type#9 ASC NULLS FIRST, web_name#11 ASC NULLS FIRST]
 
 (74) Scan parquet
 Output [5]: [ws_sold_date_sk#1, ws_ship_date_sk#2, ws_web_site_sk#3, 
ws_ship_mode_sk#4, ws_warehouse_sk#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
index 9494adb9..6fa135c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
@@ -848,7 +848,7 @@ Input [44]: [w_warehouse_name#9, w_warehouse_sq_ft#10, 
w_city#11, w_county#12, w
 
 (147) NativeTakeOrdered
 Input [44]: [w_warehouse_name#9, w_warehouse_sq_ft#10, w_city#11, w_county#12, 
w_state#13, w_country#14, ship_carriers#119, year#120, jan_sales#430, 
feb_sales#431, mar_sales#432, apr_sales#433, may_sales#434, jun_sales#435, 
jul_sales#436, aug_sales#437, sep_sales#438, oct_sales#439, nov_sales#440, 
dec_sales#441, jan_sales_per_sq_foot#442, feb_sales_per_sq_foot#443, 
mar_sales_per_sq_foot#444, apr_sales_per_sq_foot#445, 
may_sales_per_sq_foot#446, jun_sales_per_sq_foot#447, jul_sales_per_sq [...]
-Arguments: X, [w_warehouse_name#9 ASC NULLS FIRST]
+Arguments: X, X, [w_warehouse_name#9 ASC NULLS FIRST]
 
 (148) Scan parquet
 Output [7]: [ws_sold_date_sk#1, ws_sold_time_sk#2, ws_ship_mode_sk#3, 
ws_warehouse_sk#4, ws_quantity#5, ws_ext_sales_price#6, ws_net_paid#7]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
index 403ecb39..780d38e4 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
@@ -402,7 +402,7 @@ Condition : (rk#35 <= 100)
 
 (69) NativeTakeOrdered
 Input [10]: [i_category#18, i_class#19, i_brand#20, i_product_name#21, 
d_year#22, d_qoy#23, d_moy#24, s_store_id#25, sumsales#32, rk#35]
-Arguments: X, [i_category#18 ASC NULLS FIRST, i_class#19 ASC NULLS FIRST, 
i_brand#20 ASC NULLS FIRST, i_product_name#21 ASC NULLS FIRST, d_year#22 ASC 
NULLS FIRST, d_qoy#23 ASC NULLS FIRST, d_moy#24 ASC NULLS FIRST, s_store_id#25 
ASC NULLS FIRST, sumsales#32 ASC NULLS FIRST, rk#35 ASC NULLS FIRST]
+Arguments: X, X, [i_category#18 ASC NULLS FIRST, i_class#19 ASC NULLS FIRST, 
i_brand#20 ASC NULLS FIRST, i_product_name#21 ASC NULLS FIRST, d_year#22 ASC 
NULLS FIRST, d_qoy#23 ASC NULLS FIRST, d_moy#24 ASC NULLS FIRST, s_store_id#25 
ASC NULLS FIRST, sumsales#32 ASC NULLS FIRST, rk#35 ASC NULLS FIRST]
 
 (70) Scan parquet
 Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, ss_quantity#4, 
ss_sales_price#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
index ea41e4da..94a56b72 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
@@ -681,7 +681,7 @@ Input [6]: [cd_gender#18, cd_marital_status#19, 
cd_education_status#20, cd_purch
 
 (117) NativeTakeOrdered
 Input [8]: [cd_gender#18, cd_marital_status#19, cd_education_status#20, 
cnt1#26, cd_purchase_estimate#21, cnt2#27, cd_credit_rating#22, cnt3#28]
-Arguments: X, [cd_gender#18 ASC NULLS FIRST, cd_marital_status#19 ASC NULLS 
FIRST, cd_education_status#20 ASC NULLS FIRST, cd_purchase_estimate#21 ASC 
NULLS FIRST, cd_credit_rating#22 ASC NULLS FIRST]
+Arguments: X, X, [cd_gender#18 ASC NULLS FIRST, cd_marital_status#19 ASC NULLS 
FIRST, cd_education_status#20 ASC NULLS FIRST, cd_purchase_estimate#21 ASC 
NULLS FIRST, cd_credit_rating#22 ASC NULLS FIRST]
 
 (118) Scan parquet
 Output [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
index 7152bb2c..7d0c06f8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
@@ -438,7 +438,7 @@ Input [5]: [i_item_id#16, avg(ss_quantity#5)#34, 
avg(UnscaledValue(ss_list_price
 
 (75) NativeTakeOrdered
 Input [5]: [i_item_id#16, agg1#38, agg2#39, agg3#40, agg4#41]
-Arguments: X, [i_item_id#16 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#16 ASC NULLS FIRST]
 
 (76) Scan parquet
 Output [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4, 
ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
index 21c24af0..76084a31 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
@@ -589,7 +589,7 @@ Input [8]: [total_sum#28, s_state#22, s_county#23, 
lochierarchy#29, _w0#30, _w1#
 
 (101) NativeTakeOrdered
 Input [5]: [total_sum#28, s_state#22, s_county#23, lochierarchy#29, 
rank_within_parent#33]
-Arguments: X, [lochierarchy#29 DESC NULLS LAST, CASE WHEN (lochierarchy#29 = 
0) THEN s_state#22 END ASC NULLS FIRST, rank_within_parent#33 ASC NULLS FIRST]
+Arguments: X, X, [lochierarchy#29 DESC NULLS LAST, CASE WHEN (lochierarchy#29 
= 0) THEN s_state#22 END ASC NULLS FIRST, rank_within_parent#33 ASC NULLS FIRST]
 
 (102) Scan parquet
 Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
index 537390c6..746601c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
@@ -957,7 +957,7 @@ Input [4]: [i_item_desc#16, w_warehouse_name#14, 
d_week_seq#23, count(1)#34]
 
 (166) NativeTakeOrdered
 Input [6]: [i_item_desc#16, w_warehouse_name#14, d_week_seq#23, no_promo#35, 
promo#36, total_cnt#37]
-Arguments: X, [total_cnt#37 DESC NULLS LAST, i_item_desc#16 ASC NULLS FIRST, 
w_warehouse_name#14 ASC NULLS FIRST, d_week_seq#23 ASC NULLS FIRST]
+Arguments: X, X, [total_cnt#37 DESC NULLS LAST, i_item_desc#16 ASC NULLS 
FIRST, w_warehouse_name#14 ASC NULLS FIRST, d_week_seq#23 ASC NULLS FIRST]
 
 (167) Scan parquet
 Output [8]: [cs_sold_date_sk#1, cs_ship_date_sk#2, cs_bill_cdemo_sk#3, 
cs_bill_hdemo_sk#4, cs_item_sk#5, cs_promo_sk#6, cs_order_number#7, 
cs_quantity#8]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
index 6ba794c5..c49c643d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
@@ -586,7 +586,7 @@ Input [7]: [channel#10, col_name#11, d_year#8, d_qoy#9, 
i_category#6, count(1)#4
 
 (102) NativeTakeOrdered
 Input [7]: [channel#10, col_name#11, d_year#8, d_qoy#9, i_category#6, 
sales_cnt#43, sales_amt#44]
-Arguments: X, [channel#10 ASC NULLS FIRST, col_name#11 ASC NULLS FIRST, 
d_year#8 ASC NULLS FIRST, d_qoy#9 ASC NULLS FIRST, i_category#6 ASC NULLS FIRST]
+Arguments: X, X, [channel#10 ASC NULLS FIRST, col_name#11 ASC NULLS FIRST, 
d_year#8 ASC NULLS FIRST, d_qoy#9 ASC NULLS FIRST, i_category#6 ASC NULLS FIRST]
 
 (103) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, 
ss_ext_sales_price#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
index a99931b9..3ac28684 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
@@ -1227,7 +1227,7 @@ Input [6]: [channel#95, id#96, spark_grouping_id#97, 
sum(sales#15)#105, sum(retu
 
 (207) NativeTakeOrdered
 Input [5]: [channel#95, id#96, sales#108, returns#109, profit#110]
-Arguments: X, [channel#95 ASC NULLS FIRST, id#96 ASC NULLS FIRST, sales#108 
ASC NULLS FIRST]
+Arguments: X, X, [channel#95 ASC NULLS FIRST, id#96 ASC NULLS FIRST, sales#108 
ASC NULLS FIRST]
 
 (208) Scan parquet
 Output [4]: [ss_sold_date_sk#1, ss_store_sk#2, ss_ext_sales_price#3, 
ss_net_profit#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
index 77e2735d..83737d60 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
@@ -777,7 +777,7 @@ Input [15]: [ss_sold_year#22, ss_item_sk#2, 
ss_customer_sk#3, ss_qty#23, ss_wc#2
 
 (132) NativeTakeOrdered
 Input [12]: [ratio#76, store_qty#77, store_wholesale_cost#78, 
store_sales_price#79, other_chan_qty#80, other_chan_wholesale_cost#81, 
other_chan_sales_price#82, ss_qty#23, ss_wc#24, ss_sp#25, ws_qty#48, cs_qty#73]
-Arguments: X, [ratio#76 ASC NULLS FIRST, ss_qty#23 DESC NULLS LAST, ss_wc#24 
DESC NULLS LAST, ss_sp#25 DESC NULLS LAST, other_chan_qty#80 ASC NULLS FIRST, 
other_chan_wholesale_cost#81 ASC NULLS FIRST, other_chan_sales_price#82 ASC 
NULLS FIRST, round((cast(ss_qty#23 as double) / cast(coalesce((ws_qty#48 + 
cs_qty#73), 1) as double)), 2) ASC NULLS FIRST]
+Arguments: X, X, [ratio#76 ASC NULLS FIRST, ss_qty#23 DESC NULLS LAST, 
ss_wc#24 DESC NULLS LAST, ss_sp#25 DESC NULLS LAST, other_chan_qty#80 ASC NULLS 
FIRST, other_chan_wholesale_cost#81 ASC NULLS FIRST, other_chan_sales_price#82 
ASC NULLS FIRST, round((cast(ss_qty#23 as double) / cast(coalesce((ws_qty#48 + 
cs_qty#73), 1) as double)), 2) ASC NULLS FIRST]
 
 (133) NativeProject
 Output [7]: [ratio#76, store_qty#77, store_wholesale_cost#78, 
store_sales_price#79, other_chan_qty#80, other_chan_wholesale_cost#81, 
other_chan_sales_price#82]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
index c8d6794a..73af64b6 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
@@ -439,7 +439,7 @@ Input [8]: [ss_ticket_number#6, ss_customer_sk#2, 
s_city#14, amt#25, profit#26,
 
 (75) NativeTakeOrdered
 Input [7]: [c_last_name#29, c_first_name#28, substr(s_city, 1, 30)#30, 
ss_ticket_number#6, amt#25, profit#26, s_city#14]
-Arguments: X, [c_last_name#29 ASC NULLS FIRST, c_first_name#28 ASC NULLS 
FIRST, substr(s_city#14, 1, 30) ASC NULLS FIRST, ss_ticket_number#6 ASC NULLS 
FIRST, profit#26 ASC NULLS FIRST]
+Arguments: X, X, [c_last_name#29 ASC NULLS FIRST, c_first_name#28 ASC NULLS 
FIRST, substr(s_city#14, 1, 30) ASC NULLS FIRST, ss_ticket_number#6 ASC NULLS 
FIRST, profit#26 ASC NULLS FIRST]
 
 (76) NativeProject
 Output [6]: [c_last_name#29, c_first_name#28, substr(s_city, 1, 30)#30, 
ss_ticket_number#6, amt#25, profit#26]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
index a4aa3c6b..225af5da 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
@@ -615,7 +615,7 @@ Input [2]: [s_store_name#8, 
sum(UnscaledValue(ss_net_profit#3))#23]
 
 (105) NativeTakeOrdered
 Input [2]: [s_store_name#8, sum(ss_net_profit)#24]
-Arguments: X, [s_store_name#8 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#8 ASC NULLS FIRST]
 
 (106) Scan parquet
 Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
index 5d1f214f..6f0affa2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
@@ -1501,7 +1501,7 @@ Input [6]: [channel#101, id#102, spark_grouping_id#103, 
sum(sales#33)#110, sum(r
 
 (260) NativeTakeOrdered
 Input [5]: [channel#101, id#102, sales#113, returns#114, profit#115]
-Arguments: X, [channel#101 ASC NULLS FIRST, id#102 ASC NULLS FIRST]
+Arguments: X, X, [channel#101 ASC NULLS FIRST, id#102 ASC NULLS FIRST]
 
 (261) Scan parquet
 Output [7]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, ss_promo_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_net_profit#7]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
index d59c9265..34ee7ca5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
@@ -744,7 +744,7 @@ Input [18]: [ctr_total_return#15, c_customer_id#33, 
c_current_addr_sk#34, c_salu
 
 (128) NativeTakeOrdered
 Input [16]: [c_customer_id#33, c_salutation#35, c_first_name#36, 
c_last_name#37, ca_street_number#39, ca_street_name#40, ca_street_type#41, 
ca_suite_number#42, ca_city#43, ca_county#44, ca_state#45, ca_zip#46, 
ca_country#47, ca_gmt_offset#48, ca_location_type#49, ctr_total_return#15]
-Arguments: X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS 
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST, 
ca_street_number#39 ASC NULLS FIRST, ca_street_name#40 ASC NULLS FIRST, 
ca_street_type#41 ASC NULLS FIRST, ca_suite_number#42 ASC NULLS FIRST, 
ca_city#43 ASC NULLS FIRST, ca_county#44 ASC NULLS FIRST, ca_state#45 ASC NULLS 
FIRST, ca_zip#46 ASC NULLS FIRST, ca_country#47 ASC NULLS FIRST, 
ca_gmt_offset#48 ASC NULLS FIRST, ca_location_type#49 ASC [...]
+Arguments: X, X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS 
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST, 
ca_street_number#39 ASC NULLS FIRST, ca_street_name#40 ASC NULLS FIRST, 
ca_street_type#41 ASC NULLS FIRST, ca_suite_number#42 ASC NULLS FIRST, 
ca_city#43 ASC NULLS FIRST, ca_county#44 ASC NULLS FIRST, ca_state#45 ASC NULLS 
FIRST, ca_zip#46 ASC NULLS FIRST, ca_country#47 ASC NULLS FIRST, 
ca_gmt_offset#48 ASC NULLS FIRST, ca_location_type#49  [...]
 
 (129) Scan parquet
 Output [4]: [cr_returned_date_sk#1, cr_returning_customer_sk#2, 
cr_returning_addr_sk#3, cr_return_amt_inc_tax#4]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
index d3cb3fe3..c15bf254 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
@@ -347,7 +347,7 @@ Input [3]: [i_item_id#2, i_item_desc#3, i_current_price#4]
 
 (59) NativeTakeOrdered
 Input [3]: [i_item_id#2, i_item_desc#3, i_current_price#4]
-Arguments: X, [i_item_id#2 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#2 ASC NULLS FIRST]
 
 (60) Scan parquet
 Output [5]: [i_item_sk#1, i_item_id#2, i_item_desc#3, i_current_price#4, 
i_manufact_id#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
index 95bf1118..fe63cfc8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
@@ -905,7 +905,7 @@ Input [5]: [item_id#15, sr_item_qty#16, cr_item_qty#26, 
item_id#35, wr_item_qty#
 
 (150) NativeTakeOrdered
 Input [8]: [item_id#15, sr_item_qty#16, sr_dev#37, cr_item_qty#26, cr_dev#38, 
wr_item_qty#36, wr_dev#39, average#40]
-Arguments: X, [item_id#15 ASC NULLS FIRST, sr_item_qty#16 ASC NULLS FIRST]
+Arguments: X, X, [item_id#15 ASC NULLS FIRST, sr_item_qty#16 ASC NULLS FIRST]
 
 (151) Scan parquet
 Output [3]: [sr_returned_date_sk#1, sr_item_sk#2, sr_return_quantity#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
index 7b435cee..552fa28c 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
@@ -471,7 +471,7 @@ Input [5]: [c_customer_id#1, c_first_name#5, c_last_name#6, 
cd_demo_sk#9, #15#15
 
 (81) NativeTakeOrdered
 Input [3]: [customer_id#16, customername#17, c_customer_id#1]
-Arguments: X, [c_customer_id#1 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#1 ASC NULLS FIRST]
 
 (82) NativeProject
 Output [2]: [customer_id#16, customername#17]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
index b027641d..8deea780 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
@@ -698,7 +698,7 @@ Input [4]: [r_reason_desc#29, avg(ws_quantity#5)#41, 
avg(UnscaledValue(wr_refund
 
 (121) NativeTakeOrdered
 Input [4]: [substr(r_reason_desc, 1, 20)#44, avg(ws_quantity)#45, 
avg(wr_refunded_cash)#46, avg(wr_fee)#47]
-Arguments: X, [substr(r_reason_desc, 1, 20)#44 ASC NULLS FIRST, 
avg(ws_quantity)#45 ASC NULLS FIRST, avg(wr_refunded_cash)#46 ASC NULLS FIRST, 
avg(wr_fee)#47 ASC NULLS FIRST]
+Arguments: X, X, [substr(r_reason_desc, 1, 20)#44 ASC NULLS FIRST, 
avg(ws_quantity)#45 ASC NULLS FIRST, avg(wr_refunded_cash)#46 ASC NULLS FIRST, 
avg(wr_fee)#47 ASC NULLS FIRST]
 
 (122) Scan parquet
 Output [7]: [ws_sold_date_sk#1, ws_item_sk#2, ws_web_page_sk#3, 
ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
index 4f73ab53..a97b8626 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
@@ -298,7 +298,7 @@ Input [8]: [total_sum#16, i_category#9, i_class#10, 
lochierarchy#17, _w0#18, _w1
 
 (51) NativeTakeOrdered
 Input [5]: [total_sum#16, i_category#9, i_class#10, lochierarchy#17, 
rank_within_parent#21]
-Arguments: X, [lochierarchy#17 DESC NULLS LAST, CASE WHEN (lochierarchy#17 = 
0) THEN i_category#9 END ASC NULLS FIRST, rank_within_parent#21 ASC NULLS FIRST]
+Arguments: X, X, [lochierarchy#17 DESC NULLS LAST, CASE WHEN (lochierarchy#17 
= 0) THEN i_category#9 END ASC NULLS FIRST, rank_within_parent#21 ASC NULLS 
FIRST]
 
 (52) Scan parquet
 Output [3]: [ws_sold_date_sk#1, ws_item_sk#2, ws_net_paid#3]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
index bbf2f51f..970677c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
@@ -242,7 +242,7 @@ Input [2]: [ss_customer_sk#2, sum(act_sales#12)#16]
 
 (41) NativeTakeOrdered
 Input [2]: [ss_customer_sk#2, sumsales#17]
-Arguments: X, [sumsales#17 ASC NULLS FIRST, ss_customer_sk#2 ASC NULLS FIRST]
+Arguments: X, X, [sumsales#17 ASC NULLS FIRST, ss_customer_sk#2 ASC NULLS 
FIRST]
 
 (42) Scan parquet
 Output [5]: [ss_item_sk#1, ss_customer_sk#2, ss_ticket_number#3, 
ss_quantity#4, ss_sales_price#5]
diff --git 
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt 
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
index 406f3072..edceaec7 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
@@ -426,7 +426,7 @@ Input [8]: [_groupingexpression#14, sm_type#9, cc_name#11, 
sum(CASE WHEN ((cs_sh
 
 (73) NativeTakeOrdered
 Input [8]: [substr(w_warehouse_name, 1, 20)#31, sm_type#9, cc_name#11, 30 days 
#32, 31 - 60 days #33, 61 - 90 days #34, 91 - 120 days #35, >120 days #36]
-Arguments: X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, sm_type#9 
ASC NULLS FIRST, cc_name#11 ASC NULLS FIRST]
+Arguments: X, X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, 
sm_type#9 ASC NULLS FIRST, cc_name#11 ASC NULLS FIRST]
 
 (74) Scan parquet
 Output [5]: [cs_sold_date_sk#1, cs_ship_date_sk#2, cs_call_center_sk#3, 
cs_ship_mode_sk#4, cs_warehouse_sk#5]
diff --git 
a/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
 
b/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
index 3c7ecd53..89a205db 100644
--- 
a/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
+++ 
b/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
@@ -73,6 +73,7 @@ class PlanStabilityChecker(
                  |
                  |--- Actual ---
                  |$actualPlan
+                 |----------------
                  |""".stripMargin)
       false
     }
diff --git a/native-engine/auron-planner/proto/auron.proto 
b/native-engine/auron-planner/proto/auron.proto
index 99f8078b..2afe1cfa 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -635,7 +635,8 @@ message SortExecNode {
 
 message FetchLimit {
   // wrap into a message to make it optional
-  uint64 limit = 1;
+  uint32 limit = 1;
+  uint32 offset = 2;
 }
 
 message PhysicalRepartition {
@@ -709,7 +710,8 @@ enum AggMode {
 
 message LimitExecNode {
   PhysicalPlanNode input = 1;
-  uint64 limit = 2;
+  uint32 limit = 2;
+  uint32 offset = 3;
 }
 
 message FFIReaderExecNode {
diff --git a/native-engine/auron-planner/src/planner.rs 
b/native-engine/auron-planner/src/planner.rs
index cfab99e1..5c79e571 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -357,12 +357,12 @@ impl PhysicalPlanner {
                         panic!("Failed to parse physical sort expressions: 
{e}");
                     });
 
+                let fetch = sort.fetch_limit.as_ref();
+                let limit = fetch.map(|f| f.limit as usize);
+                let offset = fetch.map(|f| f.offset as usize).unwrap_or(0);
+
                 // always preserve partitioning
-                Ok(Arc::new(SortExec::new(
-                    input,
-                    exprs,
-                    sort.fetch_limit.as_ref().map(|limit| limit.limit as 
usize),
-                )))
+                Ok(Arc::new(SortExec::new(input, exprs, limit, offset)))
             }
             PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => {
                 let input: Arc<dyn ExecutionPlan> = 
convert_box_required!(self, bhm.input)?;
@@ -558,7 +558,11 @@ impl PhysicalPlanner {
             }
             PhysicalPlanType::Limit(limit) => {
                 let input: Arc<dyn ExecutionPlan> = 
convert_box_required!(self, limit.input)?;
-                Ok(Arc::new(LimitExec::new(input, limit.limit)))
+                Ok(Arc::new(LimitExec::new(
+                    input,
+                    limit.limit as usize,
+                    limit.offset as usize,
+                )))
             }
             PhysicalPlanType::FfiReader(ffi_reader) => {
                 let schema = Arc::new(convert_required!(ffi_reader.schema)?);
@@ -571,7 +575,11 @@ impl PhysicalPlanner {
             PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
                 let input: Arc<dyn ExecutionPlan> =
                     convert_box_required!(self, coalesce_batches.input)?;
-                Ok(Arc::new(LimitExec::new(input, 
coalesce_batches.batch_size)))
+                Ok(Arc::new(LimitExec::new(
+                    input,
+                    coalesce_batches.batch_size as usize,
+                    0,
+                )))
             }
             PhysicalPlanType::Expand(expand) => {
                 let schema = Arc::new(convert_required!(expand.schema)?);
diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs 
b/native-engine/datafusion-ext-plans/src/limit_exec.rs
index dd1e40d9..dd45ed12 100644
--- a/native-engine/datafusion-ext-plans/src/limit_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs
@@ -41,16 +41,18 @@ use crate::common::execution_context::ExecutionContext;
 #[derive(Debug)]
 pub struct LimitExec {
     input: Arc<dyn ExecutionPlan>,
-    limit: u64,
+    limit: usize,
+    offset: usize,
     pub metrics: ExecutionPlanMetricsSet,
     props: OnceCell<PlanProperties>,
 }
 
 impl LimitExec {
-    pub fn new(input: Arc<dyn ExecutionPlan>, limit: u64) -> Self {
+    pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize, offset: usize) -> 
Self {
         Self {
             input,
             limit,
+            offset,
             metrics: ExecutionPlanMetricsSet::new(),
             props: OnceCell::new(),
         }
@@ -59,7 +61,7 @@ impl LimitExec {
 
 impl DisplayAs for LimitExec {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
-        write!(f, "LimitExec(limit={})", self.limit)
+        write!(f, "LimitExec(limit={},offset={})", self.limit, self.offset)
     }
 }
 
@@ -95,7 +97,11 @@ impl ExecutionPlan for LimitExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(Self::new(children[0].clone(), self.limit)))
+        Ok(Arc::new(Self::new(
+            children[0].clone(),
+            self.limit,
+            self.offset,
+        )))
     }
 
     fn execute(
@@ -105,15 +111,19 @@ impl ExecutionPlan for LimitExec {
     ) -> Result<SendableRecordBatchStream> {
         let exec_ctx = ExecutionContext::new(context, partition, 
self.schema(), &self.metrics);
         let input = exec_ctx.execute_with_input_stats(&self.input)?;
-        execute_limit(input, self.limit, exec_ctx)
+        if self.offset == 0 {
+            execute_limit(input, self.limit, exec_ctx)
+        } else {
+            execute_limit_with_offset(input, self.limit, self.offset, exec_ctx)
+        }
     }
 
     fn statistics(&self) -> Result<Statistics> {
         Statistics::with_fetch(
             self.input.statistics()?,
             self.schema(),
-            Some(self.limit as usize),
-            0,
+            Some(self.limit),
+            self.offset,
             1,
         )
     }
@@ -121,7 +131,7 @@ impl ExecutionPlan for LimitExec {
 
 fn execute_limit(
     mut input: SendableRecordBatchStream,
-    limit: u64,
+    limit: usize,
     exec_ctx: Arc<ExecutionContext>,
 ) -> Result<SendableRecordBatchStream> {
     Ok(exec_ctx
@@ -131,11 +141,49 @@ fn execute_limit(
             while remaining > 0
                 && let Some(mut batch) = input.next().await.transpose()?
             {
-                if remaining < batch.num_rows() as u64 {
-                    batch = batch.slice(0, remaining as usize);
+                if remaining < batch.num_rows() {
+                    batch = batch.slice(0, remaining);
+                    remaining = 0;
+                } else {
+                    remaining -= batch.num_rows();
+                }
+                exec_ctx.baseline_metrics().record_output(batch.num_rows());
+                sender.send(batch).await;
+            }
+            Ok(())
+        }))
+}
+
+fn execute_limit_with_offset(
+    mut input: SendableRecordBatchStream,
+    limit: usize,
+    offset: usize,
+    exec_ctx: Arc<ExecutionContext>,
+) -> Result<SendableRecordBatchStream> {
+    Ok(exec_ctx
+        .clone()
+        .output_with_sender("Limit", move |sender| async move {
+            let mut skip = offset;
+            let mut remaining = limit - skip;
+            while remaining > 0
+                && let Some(mut batch) = input.next().await.transpose()?
+            {
+                if skip > 0 {
+                    let rows = batch.num_rows();
+                    if skip >= rows {
+                        skip -= rows;
+                        continue;
+                    }
+
+                    batch = batch.slice(skip, rows - skip);
+                    skip = 0;
+                }
+
+                if remaining < batch.num_rows() {
+                    batch = batch.slice(0, remaining);
                     remaining = 0;
                 } else {
-                    remaining -= batch.num_rows() as u64;
+                    remaining -= batch.num_rows();
                 }
                 exec_ctx.baseline_metrics().record_output(batch.num_rows());
                 sender.send(batch).await;
@@ -207,7 +255,7 @@ mod test {
             ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
             ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
         )?;
-        let limit_exec = LimitExec::new(input, 2_u64);
+        let limit_exec = LimitExec::new(input, 2, 0);
         let session_ctx = SessionContext::new();
         let task_ctx = session_ctx.task_ctx();
         let output = limit_exec.execute(0, task_ctx)?;
@@ -226,4 +274,31 @@ mod test {
         assert_eq!(row_count, Precision::Exact(2));
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_limit_with_offset() -> Result<()> {
+        let input = build_table(
+            ("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+            ("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+            ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+        )?;
+        let limit_exec = LimitExec::new(input, 7, 5);
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let output = limit_exec.execute(0, task_ctx)?;
+        let batches = common::collect(output).await?;
+        let row_count: usize = batches.iter().map(|batch| 
batch.num_rows()).sum();
+
+        let expected = vec![
+            "+---+---+---+",
+            "| a | b | c |",
+            "+---+---+---+",
+            "| 5 | 4 | 0 |",
+            "| 6 | 3 | 1 |",
+            "+---+---+---+",
+        ];
+        assert_batches_eq!(expected, &batches);
+        assert_eq!(row_count, 2);
+        Ok(())
+    }
 }
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs 
b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 659eff36..a39487c1 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -86,7 +86,8 @@ const NUM_MAX_MERGING_BATCHES: usize = 32;
 pub struct SortExec {
     input: Arc<dyn ExecutionPlan>,
     exprs: Vec<PhysicalSortExpr>,
-    fetch: Option<usize>,
+    limit: Option<usize>,
+    offset: usize,
     metrics: ExecutionPlanMetricsSet,
     record_output: bool,
     props: OnceCell<PlanProperties>,
@@ -96,13 +97,15 @@ impl SortExec {
     pub fn new(
         input: Arc<dyn ExecutionPlan>,
         exprs: Vec<PhysicalSortExpr>,
-        fetch: Option<usize>,
+        limit: Option<usize>,
+        offset: usize,
     ) -> Self {
         let metrics = ExecutionPlanMetricsSet::new();
         Self {
             input,
             exprs,
-            fetch,
+            limit,
+            offset,
             metrics,
             record_output: true,
             props: OnceCell::new(),
@@ -130,6 +133,7 @@ pub fn create_default_ascending_sort_exec(
             })
             .collect(),
         None,
+        0,
     );
     if let Some(execution_plan_metrics) = execution_plan_metrics {
         sort_exec.metrics = execution_plan_metrics;
@@ -185,7 +189,8 @@ impl ExecutionPlan for SortExec {
         Ok(Arc::new(Self::new(
             children[0].clone(),
             self.exprs.clone(),
-            self.fetch,
+            self.limit,
+            self.offset,
         )))
     }
 
@@ -203,7 +208,13 @@ impl ExecutionPlan for SortExec {
     }
 
     fn statistics(&self) -> Result<Statistics> {
-        Statistics::with_fetch(self.input.statistics()?, self.schema(), 
self.fetch, 0, 1)
+        Statistics::with_fetch(
+            self.input.statistics()?,
+            self.schema(),
+            self.limit,
+            self.offset,
+            1,
+        )
     }
 }
 
@@ -223,7 +234,8 @@ impl SortExec {
             mem_consumer_info: None,
             weak: Weak::new(),
             prune_sort_keys_from_batch: prune_sort_keys_from_batch.clone(),
-            limit: self.fetch.unwrap_or(usize::MAX),
+            skip: self.offset,
+            limit: self.limit.unwrap_or(usize::MAX),
             record_output: self.record_output,
             in_mem_blocks: Default::default(),
             spills: Default::default(),
@@ -336,6 +348,7 @@ struct ExternalSorter {
     mem_consumer_info: Option<Weak<MemConsumerInfo>>,
     weak: Weak<Self>,
     prune_sort_keys_from_batch: Arc<PruneSortKeysFromBatch>,
+    skip: usize,
     limit: usize,
     record_output: bool,
     in_mem_blocks: Arc<Mutex<Vec<InMemSortedBlock>>>,
@@ -704,6 +717,9 @@ impl ExternalSorter {
             let in_mem_blocks = std::mem::take(&mut 
*self.in_mem_blocks.lock());
             if !in_mem_blocks.is_empty() {
                 let mut merger = Merger::try_new(self.clone(), in_mem_blocks)?;
+                if self.skip > 0 {
+                    merger.skip_rows::<InMemRowsKeyCollector>(self.skip, 
output_batch_size);
+                }
                 while let Some((key_collector, pruned_batch)) =
                     merger.next::<InMemRowsKeyCollector>(output_batch_size)?
                 {
@@ -727,6 +743,9 @@ impl ExternalSorter {
 
         let spill_blocks = spills.into_iter().map(|spill| 
spill.block).collect();
         let mut merger = Merger::try_new(self.to_arc(), spill_blocks)?;
+        if self.skip > 0 {
+            merger.skip_rows::<InMemRowsKeyCollector>(self.skip, 
output_batch_size);
+        }
         while let Some((key_collector, pruned_batch)) =
             merger.next::<InMemRowsKeyCollector>(output_batch_size)?
         {
@@ -1023,6 +1042,22 @@ impl<B: SortedBlock> Merger<B> {
         }
         Ok(Some((key_collector, pruned_batch)))
     }
+
+    pub fn skip_rows<KC: KeyCollector>(
+        &mut self,
+        skip: usize,
+        suggested_batch_size: usize,
+    ) -> Result<()> {
+        let mut remaining = skip;
+        while remaining > 0 {
+            let batch_size = remaining.min(suggested_batch_size);
+            if self.next::<KC>(batch_size)?.is_none() {
+                break;
+            }
+            remaining -= batch_size;
+        }
+        Ok(())
+    }
 }
 
 fn merge_blocks<B: SortedBlock, KC: KeyCollector>(
@@ -1472,7 +1507,7 @@ mod test {
             options: SortOptions::default(),
         }];
 
-        let sort = SortExec::new(input, sort_exprs, Some(6));
+        let sort = SortExec::new(input, sort_exprs, Some(6), 0);
         let output = sort.execute(0, task_ctx)?;
         let batches = common::collect(output).await?;
         let expected = vec![
@@ -1491,6 +1526,40 @@ mod test {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_sort_i32_with_skip() -> Result<()> {
+        MemManager::init(100);
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let input = build_table(
+            ("a", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+            ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+            ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+        )?;
+        let sort_exprs = vec![PhysicalSortExpr {
+            expr: Arc::new(Column::new("a", 0)),
+            options: SortOptions::default(),
+        }];
+
+        let sort = SortExec::new(input, sort_exprs, Some(8), 3);
+        let output = sort.execute(0, task_ctx)?;
+        let batches = common::collect(output).await?;
+        let expected = vec![
+            "+---+---+---+",
+            "| a | b | c |",
+            "+---+---+---+",
+            "| 3 | 6 | 1 |",
+            "| 4 | 5 | 0 |",
+            "| 5 | 4 | 9 |",
+            "| 6 | 3 | 8 |",
+            "| 7 | 2 | 7 |",
+            "+---+---+---+",
+        ];
+        assert_batches_eq!(expected, &batches);
+
+        Ok(())
+    }
 }
 
 #[cfg(test)]
@@ -1585,7 +1654,7 @@ mod fuzztest {
             schema.clone(),
             None,
         )?);
-        let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None));
+        let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None, 0));
         let output = datafusion::physical_plan::collect(sort.clone(), 
task_ctx.clone()).await?;
         let a = concat_batches(&schema, &output)?;
         let a_row_count = sort.clone().statistics()?.num_rows;
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index 3de651e1..839f5655 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -52,14 +52,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.PartialMapperPartitionSpec
-import org.apache.spark.sql.execution.PartialReducerPartitionSpec
-import org.apache.spark.sql.execution.ShuffledRowRDD
-import org.apache.spark.sql.execution.ShufflePartitionSpec
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.UnaryExecNode
+import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.auron.plan._
 import org.apache.spark.sql.execution.auron.plan.ConvertToNativeExec
@@ -291,16 +284,38 @@ class ShimsImpl extends Shims with Logging {
       child: SparkPlan): NativeGenerateBase =
     NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput, 
child)
 
-  override def createNativeGlobalLimitExec(limit: Long, child: SparkPlan): 
NativeGlobalLimitBase =
-    NativeGlobalLimitExec(limit, child)
+  private def effectiveLimit(rawLimit: Int): Int =
+    if (rawLimit == -1) Int.MaxValue else rawLimit
 
-  override def createNativeLocalLimitExec(limit: Long, child: SparkPlan): 
NativeLocalLimitBase =
+  @sparkver("3.4 / 3.5")
+  override def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = {
+    (effectiveLimit(plan.limit), plan.offset)
+  }
+
+  @sparkver("3.4 / 3.5")
+  override def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int) 
= {
+    (effectiveLimit(plan.limit), plan.offset)
+  }
+
+  override def createNativeGlobalLimitExec(
+      limit: Int,
+      offset: Int,
+      child: SparkPlan): NativeGlobalLimitBase =
+    NativeGlobalLimitExec(limit, offset, child)
+
+  override def createNativeLocalLimitExec(limit: Int, child: SparkPlan): 
NativeLocalLimitBase =
     NativeLocalLimitExec(limit, child)
 
+  @sparkver("3.4 / 3.5")
+  override def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = {
+    (effectiveLimit(plan.limit), plan.offset)
+  }
+
   override def createNativeCollectLimitExec(
       limit: Int,
+      offset: Int,
       child: SparkPlan): NativeCollectLimitBase =
-    NativeCollectLimitExec(limit, child)
+    NativeCollectLimitExec(limit, offset, child)
 
   override def createNativeParquetInsertIntoHiveTableExec(
       cmd: InsertIntoHiveTable,
@@ -337,13 +352,14 @@ class ShimsImpl extends Shims with Logging {
     NativeSortExec(sortOrder, global, child)
 
   override def createNativeTakeOrderedExec(
-      limit: Long,
+      limit: Int,
+      offset: Int,
       sortOrder: Seq[SortOrder],
       child: SparkPlan): NativeTakeOrderedBase =
-    NativeTakeOrderedExec(limit, sortOrder, child)
+    NativeTakeOrderedExec(limit, offset, sortOrder, child)
 
   override def createNativePartialTakeOrderedExec(
-      limit: Long,
+      limit: Int,
       sortOrder: Seq[SortOrder],
       child: SparkPlan,
       metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase =
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
index ba514ab7..4ff7d804 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
@@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan
 
 import org.apache.auron.sparkver
 
-case class NativeCollectLimitExec(limit: Int, override val child: SparkPlan)
-    extends NativeCollectLimitBase(limit, child) {
+case class NativeCollectLimitExec(limit: Int, offset: Int, override val child: 
SparkPlan)
+    extends NativeCollectLimitBase(limit, offset, child) {
 
   @sparkver("3.2 / 3.3 / 3.4 / 3.5")
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
index 8aba2de4..1b493f43 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
@@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan
 
 import org.apache.auron.sparkver
 
-case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan)
-    extends NativeGlobalLimitBase(limit, child) {
+case class NativeGlobalLimitExec(limit: Int, offset: Int, override val child: 
SparkPlan)
+    extends NativeGlobalLimitBase(limit, offset, child) {
 
   @sparkver("3.2 / 3.3 / 3.4 / 3.5")
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
index ad74b02e..805408c8 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan
 
 import org.apache.auron.sparkver
 
-case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan)
+case class NativeLocalLimitExec(limit: Int, override val child: SparkPlan)
     extends NativeLocalLimitBase(limit, child) {
 
   @sparkver("3.2 / 3.3 / 3.4 / 3.5")
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
index e243e6f3..faf541a6 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.auron.sparkver
 
 case class NativePartialTakeOrderedExec(
-    limit: Long,
+    limit: Int,
     sortOrder: Seq[SortOrder],
     override val child: SparkPlan,
     override val metrics: Map[String, SQLMetric])
diff --git 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
index cec298b6..16548155 100644
--- 
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
+++ 
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
@@ -22,10 +22,11 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.auron.sparkver
 
 case class NativeTakeOrderedExec(
-    limit: Long,
+    limit: Int,
+    offset: Int,
     sortOrder: Seq[SortOrder],
     override val child: SparkPlan)
-    extends NativeTakeOrderedBase(limit, sortOrder, child) {
+    extends NativeTakeOrderedBase(limit, offset, sortOrder, child) {
 
   @sparkver("3.2 / 3.3 / 3.4 / 3.5")
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
index d7adf3a7..7f62dd52 100644
--- 
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
+++ 
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
@@ -17,13 +17,14 @@
 package org.apache.auron.exec
 
 import org.apache.spark.sql.AuronQueryTest
-import org.apache.spark.sql.execution.auron.plan.NativeCollectLimitExec
+import org.apache.spark.sql.execution.auron.plan.{NativeCollectLimitExec, 
NativeGlobalLimitExec, NativeLocalLimitExec, NativeTakeOrderedExec}
 
 import org.apache.auron.BaseAuronSQLSuite
+import org.apache.auron.util.AuronTestUtils
 
 class AuronExecSuite extends AuronQueryTest with BaseAuronSQLSuite {
 
-  test("Collect Limit") {
+  test("CollectLimit") {
     withTable("t1") {
       sql("create table t1(id INT) using parquet")
       sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
@@ -36,4 +37,94 @@ class AuronExecSuite extends AuronQueryTest with 
BaseAuronSQLSuite {
     }
   }
 
+  test("CollectLimit with offset") {
+    if (AuronTestUtils.isSparkV34OrGreater) {
+      withTempView("t1") {
+        sql("create table if not exists t1(id INT) using parquet")
+        sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+        Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+          case (limit, offset) => {
+            val query = s"select * from t1 limit $limit offset $offset"
+            val df = checkSparkAnswerAndOperator(() => spark.sql(query))
+            assert(collect(df.queryExecution.executedPlan) { case e: 
NativeCollectLimitExec =>
+              e
+            }.size == 1)
+          }
+        }
+      }
+    }
+  }
+
+  test("GlobalLimit and LocalLimit") {
+    withTempView("t1") {
+      sql("create table if not exists t1(id INT) using parquet")
+      sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+      val df = checkSparkAnswerAndOperator(() =>
+        spark
+          .sql(s"""
+               |select id from (
+               |  select * from t1 limit 5
+               |) where id > 0 limit 10;
+               |""".stripMargin)
+          .groupBy("id")
+          .count())
+      assert(collect(df.queryExecution.executedPlan) {
+        case e: NativeGlobalLimitExec => e
+        case e: NativeLocalLimitExec => e
+      }.size >= 2)
+    }
+  }
+
+  test("GlobalLimit with offset") {
+    if (AuronTestUtils.isSparkV34OrGreater) {
+      withTempView("t1") {
+        sql("create table if not exists t1(id INT) using parquet")
+        sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+        Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+          case (limit, offset) => {
+            val query = s"select * from t1 limit $limit offset $offset"
+            val df = checkSparkAnswerAndOperator(() => 
spark.sql(query).groupBy("id").count())
+            assert(collect(df.queryExecution.executedPlan) { case e: 
NativeGlobalLimitExec =>
+              e
+            }.size == 1)
+          }
+        }
+      }
+    }
+  }
+
+  test("TakeOrderedAndProject") {
+    withTempView("t1") {
+      sql("create table if not exists t1(id INT) using parquet")
+      sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+      val df = checkSparkAnswerAndOperator(() =>
+        spark
+          .sql(s"""
+               | select id from t1 order by id limit 5
+               |""".stripMargin)
+          .groupBy("id")
+          .count())
+      assert(collect(df.queryExecution.executedPlan) { case e: 
NativeTakeOrderedExec =>
+        e
+      }.size == 1)
+    }
+  }
+
+  test("TakeOrderedAndProject with offset") {
+    if (AuronTestUtils.isSparkV34OrGreater) {
+      withTempView("t1") {
+        sql("create table if not exists t1(id INT) using parquet")
+        sql("insert into t1 
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+        Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+          case (limit, offset) => {
+            val query = s"select * from t1 order by id limit $limit offset 
$offset"
+            val df = checkSparkAnswerAndOperator(() => 
spark.sql(query).groupBy("id").count())
+            assert(collect(df.queryExecution.executedPlan) { case e: 
NativeTakeOrderedExec =>
+              e
+            }.size == 1)
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 67f2c5b2..9b8bed91 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -732,18 +732,21 @@ object AuronConverters extends Logging {
 
   def convertLocalLimitExec(exec: LocalLimitExec): SparkPlan = {
     logDebugPlanConversion(exec)
-    Shims.get.createNativeLocalLimitExec(exec.limit.toLong, exec.child)
+    Shims.get.createNativeLocalLimitExec(exec.limit, exec.child)
   }
 
   def convertGlobalLimitExec(exec: GlobalLimitExec): SparkPlan = {
     logDebugPlanConversion(exec)
-    Shims.get.createNativeGlobalLimitExec(exec.limit.toLong, exec.child)
+    val (limit, offset) = Shims.get.getLimitAndOffset(exec)
+    Shims.get.createNativeGlobalLimitExec(limit, offset, exec.child)
   }
 
   def convertTakeOrderedAndProjectExec(exec: TakeOrderedAndProjectExec): 
SparkPlan = {
     logDebugPlanConversion(exec)
+    val (limit, offset) = Shims.get.getLimitAndOffset(exec)
     val nativeTakeOrdered = Shims.get.createNativeTakeOrderedExec(
-      exec.limit,
+      limit,
+      offset,
       exec.sortOrder,
       addRenameColumnsExec(convertToNative(exec.child)))
 
@@ -757,7 +760,8 @@ object AuronConverters extends Logging {
 
   def convertCollectLimitExec(exec: CollectLimitExec): SparkPlan = {
     logDebugPlanConversion(exec)
-    Shims.get.createNativeCollectLimitExec(exec.limit, exec.child)
+    val (limit, offset) = Shims.get.getLimitAndOffset(exec)
+    Shims.get.createNativeCollectLimitExec(limit, offset, exec.child)
   }
 
   def convertHashAggregateExec(exec: HashAggregateExec): SparkPlan = {
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala 
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
index 2b221d36..85ab65d5 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
@@ -40,8 +40,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec, 
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.auron.plan._
 import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
@@ -122,11 +121,21 @@ abstract class Shims {
       generatorOutput: Seq[Attribute],
       child: SparkPlan): NativeGenerateBase
 
-  def createNativeGlobalLimitExec(limit: Long, child: SparkPlan): 
NativeGlobalLimitBase
+  def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = (plan.limit, 0)
 
-  def createNativeLocalLimitExec(limit: Long, child: SparkPlan): 
NativeLocalLimitBase
+  def createNativeGlobalLimitExec(
+      limit: Int,
+      offset: Int,
+      child: SparkPlan): NativeGlobalLimitBase
 
-  def createNativeCollectLimitExec(limit: Int, child: SparkPlan): 
NativeCollectLimitBase
+  def createNativeLocalLimitExec(limit: Int, child: SparkPlan): 
NativeLocalLimitBase
+
+  def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = (plan.limit, 0)
+
+  def createNativeCollectLimitExec(
+      limit: Int,
+      offset: Int,
+      child: SparkPlan): NativeCollectLimitBase
 
   def createNativeParquetInsertIntoHiveTableExec(
       cmd: InsertIntoHiveTable,
@@ -154,13 +163,16 @@ abstract class Shims {
       global: Boolean,
       child: SparkPlan): NativeSortBase
 
+  def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int) = 
(plan.limit, 0)
+
   def createNativeTakeOrderedExec(
-      limit: Long,
+      limit: Int,
+      offset: Int,
       sortOrder: Seq[SortOrder],
       child: SparkPlan): NativeTakeOrderedBase
 
   def createNativePartialTakeOrderedExec(
-      limit: Long,
+      limit: Int,
       sortOrder: Seq[SortOrder],
       child: SparkPlan,
       metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
index f8315ed1..d3366103 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLMetrics}
 import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.{LimitExecNode, PhysicalPlanNode}
 
-abstract class NativeCollectLimitBase(limit: Int, override val child: 
SparkPlan)
+abstract class NativeCollectLimitBase(limit: Int, offset: Int, override val 
child: SparkPlan)
     extends UnaryExecNode
     with NativeSupports {
   override def output: Seq[Attribute] = child.output
@@ -51,7 +51,8 @@ abstract class NativeCollectLimitBase(limit: Int, override 
val child: SparkPlan)
       val row = it.next().copy()
       buf += row
     }
-    buf.toArray
+    val rows = buf.toArray
+    if (offset > 0) rows.drop(offset) else rows
   }
 
   override def doExecuteNative(): NativeRDD = {
@@ -78,6 +79,7 @@ abstract class NativeCollectLimitBase(limit: Int, override 
val child: SparkPlan)
           .newBuilder()
           .setInput(singlePartitionRDD.nativePlan(inputPartition, taskContext))
           .setLimit(limit)
+          .setOffset(offset)
           .build()
         PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build()
       },
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
index 83c1f7d8..e8c54d47 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
@@ -35,7 +35,7 @@ import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.LimitExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
-abstract class NativeGlobalLimitBase(limit: Long, override val child: 
SparkPlan)
+abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val 
child: SparkPlan)
     extends UnaryExecNode
     with NativeSupports {
 
@@ -67,6 +67,7 @@ abstract class NativeGlobalLimitBase(limit: Long, override 
val child: SparkPlan)
           .newBuilder()
           .setInput(inputRDD.nativePlan(inputPartition, taskContext))
           .setLimit(limit)
+          .setOffset(offset)
           .build()
         PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build()
       },
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
index f0ba9e3b..10186728 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
@@ -33,7 +33,7 @@ import org.apache.auron.metric.SparkMetricNode
 import org.apache.auron.protobuf.LimitExecNode
 import org.apache.auron.protobuf.PhysicalPlanNode
 
-abstract class NativeLocalLimitBase(limit: Long, override val child: SparkPlan)
+abstract class NativeLocalLimitBase(limit: Int, override val child: SparkPlan)
     extends UnaryExecNode
     with NativeSupports {
 
diff --git 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
index b3a5b7fe..dd7ed2f0 100644
--- 
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
+++ 
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
@@ -47,7 +47,8 @@ import org.apache.auron.protobuf.PhysicalSortExprNode
 import org.apache.auron.protobuf.SortExecNode
 
 abstract class NativeTakeOrderedBase(
-    limit: Long,
+    limit: Int,
+    offset: Int,
     sortOrder: Seq[SortOrder],
     override val child: SparkPlan)
     extends UnaryExecNode
@@ -81,35 +82,37 @@ abstract class NativeTakeOrderedBase(
     val ord = new LazilyGeneratedOrdering(sortOrder, output)
 
     // all partitions are sorted, so perform a sorted-merge to achieve the 
result
-    partial
-      .execute()
-      .map(_.copy())
-      .mapPartitions(iter => Iterator.single(iter.toArray))
-      .reduce { case (array1, array2) =>
-        val result = ArrayBuffer[InternalRow]()
-        var i = 0
-        var j = 0
-
-        while (result.length < limit && (i < array1.length || j < 
array2.length)) {
-          0 match {
-            case _ if i == array1.length =>
-              result.append(array2(j))
-              j += 1
-            case _ if j == array2.length =>
-              result.append(array1(i))
-              i += 1
-            case _ =>
-              if (ord.compare(array1(i), array2(j)) <= 0) {
-                result.append(array1(i))
-                i += 1
-              } else {
+    val rows =
+      partial
+        .execute()
+        .map(_.copy())
+        .mapPartitions(iter => Iterator.single(iter.toArray))
+        .reduce { case (array1, array2) =>
+          val result = ArrayBuffer[InternalRow]()
+          var i = 0
+          var j = 0
+
+          while (result.length < limit && (i < array1.length || j < 
array2.length)) {
+            0 match {
+              case _ if i == array1.length =>
                 result.append(array2(j))
                 j += 1
-              }
+              case _ if j == array2.length =>
+                result.append(array1(i))
+                i += 1
+              case _ =>
+                if (ord.compare(array1(i), array2(j)) <= 0) {
+                  result.append(array1(i))
+                  i += 1
+                } else {
+                  result.append(array2(j))
+                  j += 1
+                }
+            }
           }
+          result.toArray
         }
-        result.toArray
-      }
+    if (offset > 0) rows.drop(offset) else rows
   }
 
   // check whether native converting is supported
@@ -141,7 +144,7 @@ abstract class NativeTakeOrderedBase(
           .newBuilder()
           .setInput(shuffledRDD.nativePlan(inputPartition, taskContext))
           .addAllExpr(nativeSortExprs.asJava)
-          .setFetchLimit(FetchLimit.newBuilder().setLimit(limit))
+          
.setFetchLimit(FetchLimit.newBuilder().setLimit(limit).setOffset(offset))
           .build()
         PhysicalPlanNode.newBuilder().setSort(nativeTakeOrderedExec).build()
       },
@@ -150,7 +153,7 @@ abstract class NativeTakeOrderedBase(
 }
 
 abstract class NativePartialTakeOrderedBase(
-    limit: Long,
+    limit: Int,
     sortOrder: Seq[SortOrder],
     override val child: SparkPlan,
     override val metrics: Map[String, SQLMetric])

Reply via email to