This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 37509c71 [AURON #1739] Support LIMIT with OFFSET (#1740)
37509c71 is described below
commit 37509c7144941b5cebb726abf9178202fc50fab8
Author: yew1eb <[email protected]>
AuthorDate: Wed Jan 28 00:08:28 2026 +0800
[AURON #1739] Support LIMIT with OFFSET (#1740)
<!--
- Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
Short summary...'.
-->
# Which issue does this PR close?
Closes #1739
# Rationale for this change
# What changes are included in this PR?
# Are there any user-facing changes?
# How was this patch tested?
---
.../tpcds-plan-stability/spark-3.5/q1.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q10.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q14a.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q14b.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q15.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q18.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q21.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q22.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q23b.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q26.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q27.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q3.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q30.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q33.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q35.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q36.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q40.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q41.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q42.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q43.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q44.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q45.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q47.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q49.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q5.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q50.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q52.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q55.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q56.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q57.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q59.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q6.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q60.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q62.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q66.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q67.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q69.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q7.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q70.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q72.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q76.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q77.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q78.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q79.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q8.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q80.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q81.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q82.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q83.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q84.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q85.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q86.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q93.txt | 2 +-
.../tpcds-plan-stability/spark-3.5/q99.txt | 2 +-
.../comparison/PlanStabilityChecker.scala | 1 +
native-engine/auron-planner/proto/auron.proto | 6 +-
native-engine/auron-planner/src/planner.rs | 22 +++--
.../datafusion-ext-plans/src/limit_exec.rs | 99 +++++++++++++++++++---
.../datafusion-ext-plans/src/sort_exec.rs | 85 +++++++++++++++++--
.../org/apache/spark/sql/auron/ShimsImpl.scala | 46 ++++++----
.../auron/plan/NativeCollectLimitExec.scala | 4 +-
.../auron/plan/NativeGlobalLimitExec.scala | 4 +-
.../auron/plan/NativeLocalLimitExec.scala | 2 +-
.../auron/plan/NativePartialTakeOrderedExec.scala | 2 +-
.../auron/plan/NativeTakeOrderedExec.scala | 5 +-
.../org/apache/auron/exec/AuronExecSuite.scala | 95 ++++++++++++++++++++-
.../apache/spark/sql/auron/AuronConverters.scala | 12 ++-
.../scala/org/apache/spark/sql/auron/Shims.scala | 26 ++++--
.../auron/plan/NativeCollectLimitBase.scala | 6 +-
.../auron/plan/NativeGlobalLimitBase.scala | 3 +-
.../auron/plan/NativeLocalLimitBase.scala | 2 +-
.../auron/plan/NativeTakeOrderedBase.scala | 59 +++++++------
72 files changed, 436 insertions(+), 151 deletions(-)
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
index da2e4839..23544dbc 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q1.txt
@@ -566,7 +566,7 @@ Input [3]: [ctr_customer_sk#11, #30#30, #31#31]
(96) NativeTakeOrdered
Input [1]: [c_customer_id#31]
-Arguments: X, [c_customer_id#31 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#31 ASC NULLS FIRST]
(97) Scan parquet
Output [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3,
sr_return_amt#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
index 65b0b56c..2e2154e8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q10.txt
@@ -687,7 +687,7 @@ Input [9]: [cd_gender#20, cd_marital_status#21,
cd_education_status#22, cd_purch
(118) NativeTakeOrdered
Input [14]: [cd_gender#20, cd_marital_status#21, cd_education_status#22,
cnt1#31, cd_purchase_estimate#23, cnt2#32, cd_credit_rating#24, cnt3#33,
cd_dep_count#25, cnt4#34, cd_dep_employed_count#26, cnt5#35,
cd_dep_college_count#27, cnt6#36]
-Arguments: X, [cd_gender#20 ASC NULLS FIRST, cd_marital_status#21 ASC NULLS
FIRST, cd_education_status#22 ASC NULLS FIRST, cd_purchase_estimate#23 ASC
NULLS FIRST, cd_credit_rating#24 ASC NULLS FIRST, cd_dep_count#25 ASC NULLS
FIRST, cd_dep_employed_count#26 ASC NULLS FIRST, cd_dep_college_count#27 ASC
NULLS FIRST]
+Arguments: X, X, [cd_gender#20 ASC NULLS FIRST, cd_marital_status#21 ASC NULLS
FIRST, cd_education_status#22 ASC NULLS FIRST, cd_purchase_estimate#23 ASC
NULLS FIRST, cd_credit_rating#24 ASC NULLS FIRST, cd_dep_count#25 ASC NULLS
FIRST, cd_dep_employed_count#26 ASC NULLS FIRST, cd_dep_college_count#27 ASC
NULLS FIRST]
(119) Scan parquet
Output [3]: [c_customer_sk#3, c_current_cdemo_sk#4, c_current_addr_sk#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
index 15c94b96..eae71c8d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14a.txt
@@ -2161,7 +2161,7 @@ Input [7]: [channel#97, i_brand_id#98, i_class_id#99,
i_category_id#100, spark_g
(309) NativeTakeOrdered
Input [6]: [channel#97, i_brand_id#98, i_class_id#99, i_category_id#100,
sum(sales)#107, sum(number_sales)#108]
-Arguments: X, [channel#97 ASC NULLS FIRST, i_brand_id#98 ASC NULLS FIRST,
i_class_id#99 ASC NULLS FIRST, i_category_id#100 ASC NULLS FIRST]
+Arguments: X, X, [channel#97 ASC NULLS FIRST, i_brand_id#98 ASC NULLS FIRST,
i_class_id#99 ASC NULLS FIRST, i_category_id#100 ASC NULLS FIRST]
(310) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_quantity#3, ss_list_price#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
index 6b891118..2ebf9c13 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q14b.txt
@@ -1539,7 +1539,7 @@ Join condition: None
(223) NativeTakeOrdered
Input [12]: [channel#51, i_brand_id#36, i_class_id#37, i_category_id#38,
sales#52, number_sales#53, channel#72, i_brand_id#59, i_class_id#60,
i_category_id#61, sales#73, number_sales#74]
-Arguments: X, [i_brand_id#36 ASC NULLS FIRST, i_class_id#37 ASC NULLS FIRST,
i_category_id#38 ASC NULLS FIRST]
+Arguments: X, X, [i_brand_id#36 ASC NULLS FIRST, i_class_id#37 ASC NULLS
FIRST, i_category_id#38 ASC NULLS FIRST]
(224) NativeProject
Output [12]: [channel#51, i_brand_id#36, i_class_id#37, i_category_id#38,
sales#52, number_sales#53, channel#72, i_brand_id#59, i_class_id#60,
i_category_id#61, sales#73, number_sales#74]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
index 832bc5fa..36e1a750 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q15.txt
@@ -344,7 +344,7 @@ Input [2]: [ca_zip#8,
sum(UnscaledValue(cs_sales_price#3))#15]
(59) NativeTakeOrdered
Input [2]: [ca_zip#8, sum(cs_sales_price)#16]
-Arguments: X, [ca_zip#8 ASC NULLS FIRST]
+Arguments: X, X, [ca_zip#8 ASC NULLS FIRST]
(60) Scan parquet
Output [3]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_sales_price#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
index fa49d904..44c7f90b 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q18.txt
@@ -616,7 +616,7 @@ Input [12]: [i_item_id#28, ca_country#29, ca_state#30,
ca_county#31, spark_group
(106) NativeTakeOrdered
Input [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, agg1#63,
agg2#64, agg3#65, agg4#66, agg5#67, agg6#68, agg7#69]
-Arguments: X, [ca_country#29 DESC NULLS LAST, ca_state#30 DESC NULLS LAST,
ca_county#31 DESC NULLS LAST, i_item_id#28 ASC NULLS FIRST]
+Arguments: X, X, [ca_country#29 DESC NULLS LAST, ca_state#30 DESC NULLS LAST,
ca_county#31 DESC NULLS LAST, i_item_id#28 ASC NULLS FIRST]
(107) Scan parquet
Output [9]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_bill_cdemo_sk#3,
cs_item_sk#4, cs_quantity#5, cs_list_price#6, cs_sales_price#7,
cs_coupon_amt#8, cs_net_profit#9]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
index e2a1a086..ce1844ba 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q21.txt
@@ -346,7 +346,7 @@ Condition : (CASE WHEN (inv_before#19 > 0) THEN
((cast(inv_after#20 as double) /
(59) NativeTakeOrdered
Input [4]: [w_warehouse_name#6, i_item_id#8, inv_before#19, inv_after#20]
-Arguments: X, [w_warehouse_name#6 ASC NULLS FIRST, i_item_id#8 ASC NULLS FIRST]
+Arguments: X, X, [w_warehouse_name#6 ASC NULLS FIRST, i_item_id#8 ASC NULLS
FIRST]
(60) Scan parquet
Output [4]: [inv_date_sk#1, inv_item_sk#2, inv_warehouse_sk#3,
inv_quantity_on_hand#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
index 69747fa7..3dd14c22 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q22.txt
@@ -341,7 +341,7 @@ Input [6]: [i_product_name#13, i_brand#14, i_class#15,
i_category#16, spark_grou
(58) NativeTakeOrdered
Input [5]: [i_product_name#13, i_brand#14, i_class#15, i_category#16, qoh#23]
-Arguments: X, [qoh#23 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST,
i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS
FIRST]
+Arguments: X, X, [qoh#23 ASC NULLS FIRST, i_product_name#13 ASC NULLS FIRST,
i_brand#14 ASC NULLS FIRST, i_class#15 ASC NULLS FIRST, i_category#16 ASC NULLS
FIRST]
(59) Scan parquet
Output [4]: [inv_date_sk#1, inv_item_sk#2, inv_warehouse_sk#3,
inv_quantity_on_hand#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
index e28da843..6b2e09d6 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q23b.txt
@@ -1480,7 +1480,7 @@ Arguments: [#33, #32, #42]
(248) NativeTakeOrdered
Input [3]: [#33#33, #32#32, #42#42]
-Arguments: X, [c_last_name#33 ASC NULLS FIRST, c_first_name#32 ASC NULLS
FIRST, sales#42 ASC NULLS FIRST]
+Arguments: X, X, [c_last_name#33 ASC NULLS FIRST, c_first_name#32 ASC NULLS
FIRST, sales#42 ASC NULLS FIRST]
(249) Scan parquet
Output [5]: [cs_sold_date_sk#1, cs_bill_customer_sk#2, cs_item_sk#3,
cs_quantity#4, cs_list_price#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
index 49eb29bd..cde99228 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q26.txt
@@ -438,7 +438,7 @@ Input [5]: [i_item_id#16, avg(cs_quantity#5)#34,
avg(UnscaledValue(cs_list_price
(75) NativeTakeOrdered
Input [5]: [i_item_id#16, agg1#38, agg2#39, agg3#40, agg4#41]
-Arguments: X, [i_item_id#16 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#16 ASC NULLS FIRST]
(76) Scan parquet
Output [8]: [cs_sold_date_sk#1, cs_bill_cdemo_sk#2, cs_item_sk#3,
cs_promo_sk#4, cs_quantity#5, cs_list_price#6, cs_sales_price#7,
cs_coupon_amt#8]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
index 3f33d987..ecd5cd0c 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q27.txt
@@ -438,7 +438,7 @@ Input [7]: [i_item_id#19, s_state#20, spark_grouping_id#21,
avg(ss_quantity#5)#3
(75) NativeTakeOrdered
Input [7]: [i_item_id#19, s_state#20, g_state#40, agg1#41, agg2#42, agg3#43,
agg4#44]
-Arguments: X, [i_item_id#19 ASC NULLS FIRST, s_state#20 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#19 ASC NULLS FIRST, s_state#20 ASC NULLS FIRST]
(76) Scan parquet
Output [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_store_sk#4,
ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
index 085e6b27..dc1f8dee 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q3.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_brand#9, i_brand_id#8,
sum(UnscaledValue(ss_ext_sales_pr
(44) NativeTakeOrdered
Input [4]: [d_year#2, brand_id#15, brand#16, sum_agg#17]
-Arguments: X, [d_year#2 ASC NULLS FIRST, sum_agg#17 DESC NULLS LAST,
brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [d_year#2 ASC NULLS FIRST, sum_agg#17 DESC NULLS LAST,
brand_id#15 ASC NULLS FIRST]
(45) Scan parquet
Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
index 903eec46..53ab1e28 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q30.txt
@@ -750,7 +750,7 @@ Input [15]: [ctr_total_return#15, c_customer_id#33,
c_current_addr_sk#34, c_salu
(129) NativeTakeOrdered
Input [13]: [c_customer_id#33, c_salutation#35, c_first_name#36,
c_last_name#37, c_preferred_cust_flag#38, c_birth_day#39, c_birth_month#40,
c_birth_year#41, c_birth_country#42, c_login#43, c_email_address#44,
c_last_review_date_sk#45, ctr_total_return#15]
-Arguments: X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST,
c_preferred_cust_flag#38 ASC NULLS FIRST, c_birth_day#39 ASC NULLS FIRST,
c_birth_month#40 ASC NULLS FIRST, c_birth_year#41 ASC NULLS FIRST,
c_birth_country#42 ASC NULLS FIRST, c_login#43 ASC NULLS FIRST,
c_email_address#44 ASC NULLS FIRST, c_last_review_date_sk#45 ASC NULLS FIRST,
ctr_total_return#15 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST,
c_preferred_cust_flag#38 ASC NULLS FIRST, c_birth_day#39 ASC NULLS FIRST,
c_birth_month#40 ASC NULLS FIRST, c_birth_year#41 ASC NULLS FIRST,
c_birth_country#42 ASC NULLS FIRST, c_login#43 ASC NULLS FIRST,
c_email_address#44 ASC NULLS FIRST, c_last_review_date_sk#45 ASC NULLS FIRST,
ctr_total_return#15 ASC NULLS FIRST]
(130) Scan parquet
Output [4]: [wr_returned_date_sk#1, wr_returning_customer_sk#2,
wr_returning_addr_sk#3, wr_return_amt#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
index b69d9cf1..abf23aea 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q33.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_manufact_id#11, sum(total_sales#18)#46]
(183) NativeTakeOrdered
Input [2]: [i_manufact_id#11, total_sales#47]
-Arguments: X, [total_sales#47 ASC NULLS FIRST]
+Arguments: X, X, [total_sales#47 ASC NULLS FIRST]
(184) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3,
ss_ext_sales_price#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
index 47e8cd88..a78dc6fb 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q35.txt
@@ -682,7 +682,7 @@ Input [16]: [ca_state#18, cd_gender#20,
cd_marital_status#21, cd_dep_count#22, c
(117) NativeTakeOrdered
Input [18]: [ca_state#18, cd_gender#20, cd_marital_status#21, cnt1#50,
min(cd_dep_count)#51, max(cd_dep_count)#52, avg(cd_dep_count)#53,
cd_dep_employed_count#23, cnt2#54, min(cd_dep_employed_count)#55,
max(cd_dep_employed_count)#56, avg(cd_dep_employed_count)#57,
cd_dep_college_count#24, cnt3#58, min(cd_dep_college_count)#59,
max(cd_dep_college_count)#60, avg(cd_dep_college_count)#61, cd_dep_count#22]
-Arguments: X, [ca_state#18 ASC NULLS FIRST, cd_gender#20 ASC NULLS FIRST,
cd_marital_status#21 ASC NULLS FIRST, cd_dep_count#22 ASC NULLS FIRST,
cd_dep_employed_count#23 ASC NULLS FIRST, cd_dep_college_count#24 ASC NULLS
FIRST]
+Arguments: X, X, [ca_state#18 ASC NULLS FIRST, cd_gender#20 ASC NULLS FIRST,
cd_marital_status#21 ASC NULLS FIRST, cd_dep_count#22 ASC NULLS FIRST,
cd_dep_employed_count#23 ASC NULLS FIRST, cd_dep_college_count#24 ASC NULLS
FIRST]
(118) NativeProject
Output [17]: [ca_state#18, cd_gender#20, cd_marital_status#21, cnt1#50,
min(cd_dep_count)#51, max(cd_dep_count)#52, avg(cd_dep_count)#53,
cd_dep_employed_count#23, cnt2#54, min(cd_dep_employed_count)#55,
max(cd_dep_employed_count)#56, avg(cd_dep_employed_count)#57,
cd_dep_college_count#24, cnt3#58, min(cd_dep_college_count)#59,
max(cd_dep_college_count)#60, avg(cd_dep_college_count)#61]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
index 23324918..e894fb79 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q36.txt
@@ -390,7 +390,7 @@ Input [8]: [gross_margin#23, i_category#13, i_class#14,
lochierarchy#24, _w0#25,
(67) NativeTakeOrdered
Input [5]: [gross_margin#23, i_category#13, i_class#14, lochierarchy#24,
rank_within_parent#28]
-Arguments: X, [lochierarchy#24 DESC NULLS LAST, CASE WHEN (lochierarchy#24 =
0) THEN i_category#13 END ASC NULLS FIRST, rank_within_parent#28 ASC NULLS
FIRST]
+Arguments: X, X, [lochierarchy#24 DESC NULLS LAST, CASE WHEN (lochierarchy#24
= 0) THEN i_category#13 END ASC NULLS FIRST, rank_within_parent#28 ASC NULLS
FIRST]
(68) Scan parquet
Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3,
ss_ext_sales_price#4, ss_net_profit#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
index ddac6fb5..e2c67ceb 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q40.txt
@@ -426,7 +426,7 @@ Input [4]: [w_state#10, i_item_id#12, sum(CASE WHEN
(d_date#15 < 2000-03-11) THE
(73) NativeTakeOrdered
Input [4]: [w_state#10, i_item_id#12, sales_before#25, sales_after#26]
-Arguments: X, [w_state#10 ASC NULLS FIRST, i_item_id#12 ASC NULLS FIRST]
+Arguments: X, X, [w_state#10 ASC NULLS FIRST, i_item_id#12 ASC NULLS FIRST]
(74) Scan parquet
Output [5]: [cs_sold_date_sk#1, cs_warehouse_sk#2, cs_item_sk#3,
cs_order_number#4, cs_sales_price#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
index c35acf49..7ca464d7 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q41.txt
@@ -204,7 +204,7 @@ Input [1]: [i_product_name#3]
(33) NativeTakeOrdered
Input [1]: [i_product_name#3]
-Arguments: X, [i_product_name#3 ASC NULLS FIRST]
+Arguments: X, X, [i_product_name#3 ASC NULLS FIRST]
(34) Scan parquet
Output [3]: [i_manufact_id#1, i_manufact#2, i_product_name#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
index 05eaecba..42ce1050 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q42.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_category_id#8, i_category#9,
sum(UnscaledValue(ss_ext_sa
(44) NativeTakeOrdered
Input [4]: [d_year#2, i_category_id#8, i_category#9,
sum(ss_ext_sales_price)#15]
-Arguments: X, [sum(ss_ext_sales_price)#15 DESC NULLS LAST, d_year#2 ASC NULLS
FIRST, i_category_id#8 ASC NULLS FIRST, i_category#9 ASC NULLS FIRST]
+Arguments: X, X, [sum(ss_ext_sales_price)#15 DESC NULLS LAST, d_year#2 ASC
NULLS FIRST, i_category_id#8 ASC NULLS FIRST, i_category#9 ASC NULLS FIRST]
(45) Scan parquet
Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
index 62b02fb1..78812804 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q43.txt
@@ -260,7 +260,7 @@ Input [9]: [s_store_name#9, s_store_id#8,
sum(UnscaledValue(CASE WHEN (d_day_nam
(44) NativeTakeOrdered
Input [9]: [s_store_name#9, s_store_id#8, sun_sales#33, mon_sales#34,
tue_sales#35, wed_sales#36, thu_sales#37, fri_sales#38, sat_sales#39]
-Arguments: X, [s_store_name#9 ASC NULLS FIRST, s_store_id#8 ASC NULLS FIRST,
sun_sales#33 ASC NULLS FIRST, mon_sales#34 ASC NULLS FIRST, tue_sales#35 ASC
NULLS FIRST, wed_sales#36 ASC NULLS FIRST, thu_sales#37 ASC NULLS FIRST,
fri_sales#38 ASC NULLS FIRST, sat_sales#39 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#9 ASC NULLS FIRST, s_store_id#8 ASC NULLS
FIRST, sun_sales#33 ASC NULLS FIRST, mon_sales#34 ASC NULLS FIRST, tue_sales#35
ASC NULLS FIRST, wed_sales#36 ASC NULLS FIRST, thu_sales#37 ASC NULLS FIRST,
fri_sales#38 ASC NULLS FIRST, sat_sales#39 ASC NULLS FIRST]
(45) Scan parquet
Output [3]: [d_date_sk#1, d_year#2, d_day_name#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
index 8a11de5e..78f88221 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q44.txt
@@ -426,7 +426,7 @@ Input [5]: [rnk#16, item_sk#23, i_product_name#29, #30#30,
#31#31]
(72) NativeTakeOrdered
Input [3]: [rnk#16, best_performing#32, worst_performing#33]
-Arguments: X, [rnk#16 ASC NULLS FIRST]
+Arguments: X, X, [rnk#16 ASC NULLS FIRST]
(73) Scan parquet
Output [3]: [ss_item_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
index 56cbefa1..28242970 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q45.txt
@@ -524,7 +524,7 @@ Input [3]: [ca_zip#10, ca_city#9,
sum(UnscaledValue(ws_sales_price#5))#21]
(90) NativeTakeOrdered
Input [3]: [ca_zip#10, ca_city#9, sum(ws_sales_price)#22]
-Arguments: X, [ca_zip#10 ASC NULLS FIRST, ca_city#9 ASC NULLS FIRST]
+Arguments: X, X, [ca_zip#10 ASC NULLS FIRST, ca_city#9 ASC NULLS FIRST]
(91) Scan parquet
Output [4]: [ws_sold_date_sk#2, ws_item_sk#3, ws_bill_customer_sk#4,
ws_sales_price#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
index 28a13858..8f643b80 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q47.txt
@@ -674,7 +674,7 @@ Input [16]: [i_category#3, i_brand#2, s_store_name#12,
s_company_name#13, d_year
(109) NativeTakeOrdered
Input [10]: [i_category#3, i_brand#2, s_store_name#12, s_company_name#13,
d_year#9, d_moy#10, avg_monthly_sales#21, sum_sales#18, psum#41, nsum#42]
-Arguments: X, [(sum_sales#18 - avg_monthly_sales#21) ASC NULLS FIRST,
s_store_name#12 ASC NULLS FIRST]
+Arguments: X, X, [(sum_sales#18 - avg_monthly_sales#21) ASC NULLS FIRST,
s_store_name#12 ASC NULLS FIRST]
(110) Scan parquet
Output [3]: [i_item_sk#1, i_brand#2, i_category#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
index 75d831e3..e852b2b2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q49.txt
@@ -920,7 +920,7 @@ Input [5]: [channel#35, item#30, return_ratio#31,
return_rank#33, currency_rank#
(157) NativeTakeOrdered
Input [5]: [channel#35, item#30, return_ratio#31, return_rank#33,
currency_rank#34]
-Arguments: X, [channel#35 ASC NULLS FIRST, return_rank#33 ASC NULLS FIRST,
currency_rank#34 ASC NULLS FIRST, item#30 ASC NULLS FIRST]
+Arguments: X, X, [channel#35 ASC NULLS FIRST, return_rank#33 ASC NULLS FIRST,
currency_rank#34 ASC NULLS FIRST, item#30 ASC NULLS FIRST]
(158) Scan parquet
Output [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#3,
ws_quantity#4, ws_net_paid#5, ws_net_profit#6]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
index 12e94f13..4aadbc9d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q5.txt
@@ -973,7 +973,7 @@ Input [6]: [channel#127, id#128, spark_grouping_id#129,
sum(sales#38)#136, sum(r
(166) NativeTakeOrdered
Input [5]: [channel#127, id#128, sales#139, returns#140, profit#141]
-Arguments: X, [channel#127 ASC NULLS FIRST, id#128 ASC NULLS FIRST]
+Arguments: X, X, [channel#127 ASC NULLS FIRST, id#128 ASC NULLS FIRST]
(167) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_store_sk#2, ss_ext_sales_price#3,
ss_net_profit#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
index 2e1a6058..b93b243a 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q50.txt
@@ -426,7 +426,7 @@ Input [15]: [s_store_name#11, s_company_id#12,
s_street_number#13, s_street_name
(73) NativeTakeOrdered
Input [15]: [s_store_name#11, s_company_id#12, s_street_number#13,
s_street_name#14, s_street_type#15, s_suite_number#16, s_city#17, s_county#18,
s_state#19, s_zip#20, 30 days #41, 31 - 60 days #42, 61 - 90 days #43, 91 - 120
days #44, >120 days #45]
-Arguments: X, [s_store_name#11 ASC NULLS FIRST, s_company_id#12 ASC NULLS
FIRST, s_street_number#13 ASC NULLS FIRST, s_street_name#14 ASC NULLS FIRST,
s_street_type#15 ASC NULLS FIRST, s_suite_number#16 ASC NULLS FIRST, s_city#17
ASC NULLS FIRST, s_county#18 ASC NULLS FIRST, s_state#19 ASC NULLS FIRST,
s_zip#20 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#11 ASC NULLS FIRST, s_company_id#12 ASC NULLS
FIRST, s_street_number#13 ASC NULLS FIRST, s_street_name#14 ASC NULLS FIRST,
s_street_type#15 ASC NULLS FIRST, s_suite_number#16 ASC NULLS FIRST, s_city#17
ASC NULLS FIRST, s_county#18 ASC NULLS FIRST, s_state#19 ASC NULLS FIRST,
s_zip#20 ASC NULLS FIRST]
(74) Scan parquet
Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_customer_sk#3, ss_store_sk#4,
ss_ticket_number#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
index 2e5625ad..2615c3ba 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q52.txt
@@ -260,7 +260,7 @@ Input [4]: [d_year#2, i_brand#9, i_brand_id#8,
sum(UnscaledValue(ss_ext_sales_pr
(44) NativeTakeOrdered
Input [4]: [d_year#2, brand_id#15, brand#16, ext_price#17]
-Arguments: X, [d_year#2 ASC NULLS FIRST, ext_price#17 DESC NULLS LAST,
brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [d_year#2 ASC NULLS FIRST, ext_price#17 DESC NULLS LAST,
brand_id#15 ASC NULLS FIRST]
(45) Scan parquet
Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
index bc1a58f2..2da4331f 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q55.txt
@@ -260,7 +260,7 @@ Input [3]: [i_brand#9, i_brand_id#8,
sum(UnscaledValue(ss_ext_sales_price#6))#14
(44) NativeTakeOrdered
Input [3]: [brand_id#15, brand#16, ext_price#17]
-Arguments: X, [ext_price#17 DESC NULLS LAST, brand_id#15 ASC NULLS FIRST]
+Arguments: X, X, [ext_price#17 DESC NULLS LAST, brand_id#15 ASC NULLS FIRST]
(45) Scan parquet
Output [3]: [d_date_sk#1, d_year#2, d_moy#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
index 885ce41e..e1aec1a9 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q56.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_item_id#11, sum(total_sales#18)#46]
(183) NativeTakeOrdered
Input [2]: [i_item_id#11, total_sales#47]
-Arguments: X, [total_sales#47 ASC NULLS FIRST, i_item_id#11 ASC NULLS FIRST]
+Arguments: X, X, [total_sales#47 ASC NULLS FIRST, i_item_id#11 ASC NULLS FIRST]
(184) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3,
ss_ext_sales_price#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
index 3ee7d5b0..31b9e645 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q57.txt
@@ -674,7 +674,7 @@ Input [14]: [i_category#3, i_brand#2, cc_name#12, d_year#9,
d_moy#10, sum_sales#
(109) NativeTakeOrdered
Input [9]: [i_category#3, i_brand#2, cc_name#12, d_year#9, d_moy#10,
avg_monthly_sales#20, sum_sales#17, psum#38, nsum#39]
-Arguments: X, [(sum_sales#17 - avg_monthly_sales#20) ASC NULLS FIRST,
cc_name#12 ASC NULLS FIRST]
+Arguments: X, X, [(sum_sales#17 - avg_monthly_sales#20) ASC NULLS FIRST,
cc_name#12 ASC NULLS FIRST]
(110) Scan parquet
Output [3]: [i_item_sk#1, i_brand#2, i_category#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
index 06fef646..af91f9dd 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q59.txt
@@ -601,7 +601,7 @@ Input [19]: [s_store_name1#41, d_week_seq1#42,
s_store_id1#43, sun_sales1#44, mo
(102) NativeTakeOrdered
Input [10]: [s_store_name1#41, s_store_id1#43, d_week_seq1#42, (sun_sales1 /
sun_sales2)#73, (mon_sales1 / mon_sales2)#74, (tue_sales1 / tue_sales2)#75,
(wed_sales1 / wed_sales2)#76, (thu_sales1 / thu_sales2)#77, (fri_sales1 /
fri_sales2)#78, (sat_sales1 / sat_sales2)#79]
-Arguments: X, [s_store_name1#41 ASC NULLS FIRST, s_store_id1#43 ASC NULLS
FIRST, d_week_seq1#42 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name1#41 ASC NULLS FIRST, s_store_id1#43 ASC NULLS
FIRST, d_week_seq1#42 ASC NULLS FIRST]
(103) Scan parquet
Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_sales_price#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
index 16995838..94c71db2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q6.txt
@@ -552,7 +552,7 @@ Condition : (cnt#27 >= 10)
(94) NativeTakeOrdered
Input [3]: [state#26, cnt#27, ca_state#2]
-Arguments: X, [cnt#27 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST]
+Arguments: X, X, [cnt#27 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST]
(95) NativeProject
Output [2]: [state#26, cnt#27]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
index 52698607..1c0bbd80 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q60.txt
@@ -1073,7 +1073,7 @@ Input [2]: [i_item_id#11, sum(total_sales#18)#46]
(183) NativeTakeOrdered
Input [2]: [i_item_id#11, total_sales#47]
-Arguments: X, [i_item_id#11 ASC NULLS FIRST, total_sales#47 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#11 ASC NULLS FIRST, total_sales#47 ASC NULLS FIRST]
(184) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_addr_sk#3,
ss_ext_sales_price#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
index 76af56eb..b5907e28 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q62.txt
@@ -426,7 +426,7 @@ Input [8]: [_groupingexpression#14, sm_type#9, web_name#11,
sum(CASE WHEN ((ws_s
(73) NativeTakeOrdered
Input [8]: [substr(w_warehouse_name, 1, 20)#31, sm_type#9, web_name#11, 30
days #32, 31 - 60 days #33, 61 - 90 days #34, 91 - 120 days #35, >120 days #36]
-Arguments: X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, sm_type#9
ASC NULLS FIRST, web_name#11 ASC NULLS FIRST]
+Arguments: X, X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST,
sm_type#9 ASC NULLS FIRST, web_name#11 ASC NULLS FIRST]
(74) Scan parquet
Output [5]: [ws_sold_date_sk#1, ws_ship_date_sk#2, ws_web_site_sk#3,
ws_ship_mode_sk#4, ws_warehouse_sk#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
index 9494adb9..6fa135c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q66.txt
@@ -848,7 +848,7 @@ Input [44]: [w_warehouse_name#9, w_warehouse_sq_ft#10,
w_city#11, w_county#12, w
(147) NativeTakeOrdered
Input [44]: [w_warehouse_name#9, w_warehouse_sq_ft#10, w_city#11, w_county#12,
w_state#13, w_country#14, ship_carriers#119, year#120, jan_sales#430,
feb_sales#431, mar_sales#432, apr_sales#433, may_sales#434, jun_sales#435,
jul_sales#436, aug_sales#437, sep_sales#438, oct_sales#439, nov_sales#440,
dec_sales#441, jan_sales_per_sq_foot#442, feb_sales_per_sq_foot#443,
mar_sales_per_sq_foot#444, apr_sales_per_sq_foot#445,
may_sales_per_sq_foot#446, jun_sales_per_sq_foot#447, jul_sales_per_sq [...]
-Arguments: X, [w_warehouse_name#9 ASC NULLS FIRST]
+Arguments: X, X, [w_warehouse_name#9 ASC NULLS FIRST]
(148) Scan parquet
Output [7]: [ws_sold_date_sk#1, ws_sold_time_sk#2, ws_ship_mode_sk#3,
ws_warehouse_sk#4, ws_quantity#5, ws_ext_sales_price#6, ws_net_paid#7]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
index 403ecb39..780d38e4 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q67.txt
@@ -402,7 +402,7 @@ Condition : (rk#35 <= 100)
(69) NativeTakeOrdered
Input [10]: [i_category#18, i_class#19, i_brand#20, i_product_name#21,
d_year#22, d_qoy#23, d_moy#24, s_store_id#25, sumsales#32, rk#35]
-Arguments: X, [i_category#18 ASC NULLS FIRST, i_class#19 ASC NULLS FIRST,
i_brand#20 ASC NULLS FIRST, i_product_name#21 ASC NULLS FIRST, d_year#22 ASC
NULLS FIRST, d_qoy#23 ASC NULLS FIRST, d_moy#24 ASC NULLS FIRST, s_store_id#25
ASC NULLS FIRST, sumsales#32 ASC NULLS FIRST, rk#35 ASC NULLS FIRST]
+Arguments: X, X, [i_category#18 ASC NULLS FIRST, i_class#19 ASC NULLS FIRST,
i_brand#20 ASC NULLS FIRST, i_product_name#21 ASC NULLS FIRST, d_year#22 ASC
NULLS FIRST, d_qoy#23 ASC NULLS FIRST, d_moy#24 ASC NULLS FIRST, s_store_id#25
ASC NULLS FIRST, sumsales#32 ASC NULLS FIRST, rk#35 ASC NULLS FIRST]
(70) Scan parquet
Output [5]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, ss_quantity#4,
ss_sales_price#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
index ea41e4da..94a56b72 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q69.txt
@@ -681,7 +681,7 @@ Input [6]: [cd_gender#18, cd_marital_status#19,
cd_education_status#20, cd_purch
(117) NativeTakeOrdered
Input [8]: [cd_gender#18, cd_marital_status#19, cd_education_status#20,
cnt1#26, cd_purchase_estimate#21, cnt2#27, cd_credit_rating#22, cnt3#28]
-Arguments: X, [cd_gender#18 ASC NULLS FIRST, cd_marital_status#19 ASC NULLS
FIRST, cd_education_status#20 ASC NULLS FIRST, cd_purchase_estimate#21 ASC
NULLS FIRST, cd_credit_rating#22 ASC NULLS FIRST]
+Arguments: X, X, [cd_gender#18 ASC NULLS FIRST, cd_marital_status#19 ASC NULLS
FIRST, cd_education_status#20 ASC NULLS FIRST, cd_purchase_estimate#21 ASC
NULLS FIRST, cd_credit_rating#22 ASC NULLS FIRST]
(118) Scan parquet
Output [3]: [c_customer_sk#1, c_current_cdemo_sk#2, c_current_addr_sk#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
index 7152bb2c..7d0c06f8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q7.txt
@@ -438,7 +438,7 @@ Input [5]: [i_item_id#16, avg(ss_quantity#5)#34,
avg(UnscaledValue(ss_list_price
(75) NativeTakeOrdered
Input [5]: [i_item_id#16, agg1#38, agg2#39, agg3#40, agg4#41]
-Arguments: X, [i_item_id#16 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#16 ASC NULLS FIRST]
(76) Scan parquet
Output [8]: [ss_sold_date_sk#1, ss_item_sk#2, ss_cdemo_sk#3, ss_promo_sk#4,
ss_quantity#5, ss_list_price#6, ss_sales_price#7, ss_coupon_amt#8]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
index 21c24af0..76084a31 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q70.txt
@@ -589,7 +589,7 @@ Input [8]: [total_sum#28, s_state#22, s_county#23,
lochierarchy#29, _w0#30, _w1#
(101) NativeTakeOrdered
Input [5]: [total_sum#28, s_state#22, s_county#23, lochierarchy#29,
rank_within_parent#33]
-Arguments: X, [lochierarchy#29 DESC NULLS LAST, CASE WHEN (lochierarchy#29 =
0) THEN s_state#22 END ASC NULLS FIRST, rank_within_parent#33 ASC NULLS FIRST]
+Arguments: X, X, [lochierarchy#29 DESC NULLS LAST, CASE WHEN (lochierarchy#29
= 0) THEN s_state#22 END ASC NULLS FIRST, rank_within_parent#33 ASC NULLS FIRST]
(102) Scan parquet
Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
index 537390c6..746601c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q72.txt
@@ -957,7 +957,7 @@ Input [4]: [i_item_desc#16, w_warehouse_name#14,
d_week_seq#23, count(1)#34]
(166) NativeTakeOrdered
Input [6]: [i_item_desc#16, w_warehouse_name#14, d_week_seq#23, no_promo#35,
promo#36, total_cnt#37]
-Arguments: X, [total_cnt#37 DESC NULLS LAST, i_item_desc#16 ASC NULLS FIRST,
w_warehouse_name#14 ASC NULLS FIRST, d_week_seq#23 ASC NULLS FIRST]
+Arguments: X, X, [total_cnt#37 DESC NULLS LAST, i_item_desc#16 ASC NULLS
FIRST, w_warehouse_name#14 ASC NULLS FIRST, d_week_seq#23 ASC NULLS FIRST]
(167) Scan parquet
Output [8]: [cs_sold_date_sk#1, cs_ship_date_sk#2, cs_bill_cdemo_sk#3,
cs_bill_hdemo_sk#4, cs_item_sk#5, cs_promo_sk#6, cs_order_number#7,
cs_quantity#8]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
index 6ba794c5..c49c643d 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q76.txt
@@ -586,7 +586,7 @@ Input [7]: [channel#10, col_name#11, d_year#8, d_qoy#9,
i_category#6, count(1)#4
(102) NativeTakeOrdered
Input [7]: [channel#10, col_name#11, d_year#8, d_qoy#9, i_category#6,
sales_cnt#43, sales_amt#44]
-Arguments: X, [channel#10 ASC NULLS FIRST, col_name#11 ASC NULLS FIRST,
d_year#8 ASC NULLS FIRST, d_qoy#9 ASC NULLS FIRST, i_category#6 ASC NULLS FIRST]
+Arguments: X, X, [channel#10 ASC NULLS FIRST, col_name#11 ASC NULLS FIRST,
d_year#8 ASC NULLS FIRST, d_qoy#9 ASC NULLS FIRST, i_category#6 ASC NULLS FIRST]
(103) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3,
ss_ext_sales_price#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
index a99931b9..3ac28684 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q77.txt
@@ -1227,7 +1227,7 @@ Input [6]: [channel#95, id#96, spark_grouping_id#97,
sum(sales#15)#105, sum(retu
(207) NativeTakeOrdered
Input [5]: [channel#95, id#96, sales#108, returns#109, profit#110]
-Arguments: X, [channel#95 ASC NULLS FIRST, id#96 ASC NULLS FIRST, sales#108
ASC NULLS FIRST]
+Arguments: X, X, [channel#95 ASC NULLS FIRST, id#96 ASC NULLS FIRST, sales#108
ASC NULLS FIRST]
(208) Scan parquet
Output [4]: [ss_sold_date_sk#1, ss_store_sk#2, ss_ext_sales_price#3,
ss_net_profit#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
index 77e2735d..83737d60 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q78.txt
@@ -777,7 +777,7 @@ Input [15]: [ss_sold_year#22, ss_item_sk#2,
ss_customer_sk#3, ss_qty#23, ss_wc#2
(132) NativeTakeOrdered
Input [12]: [ratio#76, store_qty#77, store_wholesale_cost#78,
store_sales_price#79, other_chan_qty#80, other_chan_wholesale_cost#81,
other_chan_sales_price#82, ss_qty#23, ss_wc#24, ss_sp#25, ws_qty#48, cs_qty#73]
-Arguments: X, [ratio#76 ASC NULLS FIRST, ss_qty#23 DESC NULLS LAST, ss_wc#24
DESC NULLS LAST, ss_sp#25 DESC NULLS LAST, other_chan_qty#80 ASC NULLS FIRST,
other_chan_wholesale_cost#81 ASC NULLS FIRST, other_chan_sales_price#82 ASC
NULLS FIRST, round((cast(ss_qty#23 as double) / cast(coalesce((ws_qty#48 +
cs_qty#73), 1) as double)), 2) ASC NULLS FIRST]
+Arguments: X, X, [ratio#76 ASC NULLS FIRST, ss_qty#23 DESC NULLS LAST,
ss_wc#24 DESC NULLS LAST, ss_sp#25 DESC NULLS LAST, other_chan_qty#80 ASC NULLS
FIRST, other_chan_wholesale_cost#81 ASC NULLS FIRST, other_chan_sales_price#82
ASC NULLS FIRST, round((cast(ss_qty#23 as double) / cast(coalesce((ws_qty#48 +
cs_qty#73), 1) as double)), 2) ASC NULLS FIRST]
(133) NativeProject
Output [7]: [ratio#76, store_qty#77, store_wholesale_cost#78,
store_sales_price#79, other_chan_qty#80, other_chan_wholesale_cost#81,
other_chan_sales_price#82]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
index c8d6794a..73af64b6 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q79.txt
@@ -439,7 +439,7 @@ Input [8]: [ss_ticket_number#6, ss_customer_sk#2,
s_city#14, amt#25, profit#26,
(75) NativeTakeOrdered
Input [7]: [c_last_name#29, c_first_name#28, substr(s_city, 1, 30)#30,
ss_ticket_number#6, amt#25, profit#26, s_city#14]
-Arguments: X, [c_last_name#29 ASC NULLS FIRST, c_first_name#28 ASC NULLS
FIRST, substr(s_city#14, 1, 30) ASC NULLS FIRST, ss_ticket_number#6 ASC NULLS
FIRST, profit#26 ASC NULLS FIRST]
+Arguments: X, X, [c_last_name#29 ASC NULLS FIRST, c_first_name#28 ASC NULLS
FIRST, substr(s_city#14, 1, 30) ASC NULLS FIRST, ss_ticket_number#6 ASC NULLS
FIRST, profit#26 ASC NULLS FIRST]
(76) NativeProject
Output [6]: [c_last_name#29, c_first_name#28, substr(s_city, 1, 30)#30,
ss_ticket_number#6, amt#25, profit#26]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
index a4aa3c6b..225af5da 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q8.txt
@@ -615,7 +615,7 @@ Input [2]: [s_store_name#8,
sum(UnscaledValue(ss_net_profit#3))#23]
(105) NativeTakeOrdered
Input [2]: [s_store_name#8, sum(ss_net_profit)#24]
-Arguments: X, [s_store_name#8 ASC NULLS FIRST]
+Arguments: X, X, [s_store_name#8 ASC NULLS FIRST]
(106) Scan parquet
Output [3]: [ss_sold_date_sk#1, ss_store_sk#2, ss_net_profit#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
index 5d1f214f..6f0affa2 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q80.txt
@@ -1501,7 +1501,7 @@ Input [6]: [channel#101, id#102, spark_grouping_id#103,
sum(sales#33)#110, sum(r
(260) NativeTakeOrdered
Input [5]: [channel#101, id#102, sales#113, returns#114, profit#115]
-Arguments: X, [channel#101 ASC NULLS FIRST, id#102 ASC NULLS FIRST]
+Arguments: X, X, [channel#101 ASC NULLS FIRST, id#102 ASC NULLS FIRST]
(261) Scan parquet
Output [7]: [ss_sold_date_sk#1, ss_item_sk#2, ss_store_sk#3, ss_promo_sk#4,
ss_ticket_number#5, ss_ext_sales_price#6, ss_net_profit#7]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
index d59c9265..34ee7ca5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q81.txt
@@ -744,7 +744,7 @@ Input [18]: [ctr_total_return#15, c_customer_id#33,
c_current_addr_sk#34, c_salu
(128) NativeTakeOrdered
Input [16]: [c_customer_id#33, c_salutation#35, c_first_name#36,
c_last_name#37, ca_street_number#39, ca_street_name#40, ca_street_type#41,
ca_suite_number#42, ca_city#43, ca_county#44, ca_state#45, ca_zip#46,
ca_country#47, ca_gmt_offset#48, ca_location_type#49, ctr_total_return#15]
-Arguments: X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST,
ca_street_number#39 ASC NULLS FIRST, ca_street_name#40 ASC NULLS FIRST,
ca_street_type#41 ASC NULLS FIRST, ca_suite_number#42 ASC NULLS FIRST,
ca_city#43 ASC NULLS FIRST, ca_county#44 ASC NULLS FIRST, ca_state#45 ASC NULLS
FIRST, ca_zip#46 ASC NULLS FIRST, ca_country#47 ASC NULLS FIRST,
ca_gmt_offset#48 ASC NULLS FIRST, ca_location_type#49 ASC [...]
+Arguments: X, X, [c_customer_id#33 ASC NULLS FIRST, c_salutation#35 ASC NULLS
FIRST, c_first_name#36 ASC NULLS FIRST, c_last_name#37 ASC NULLS FIRST,
ca_street_number#39 ASC NULLS FIRST, ca_street_name#40 ASC NULLS FIRST,
ca_street_type#41 ASC NULLS FIRST, ca_suite_number#42 ASC NULLS FIRST,
ca_city#43 ASC NULLS FIRST, ca_county#44 ASC NULLS FIRST, ca_state#45 ASC NULLS
FIRST, ca_zip#46 ASC NULLS FIRST, ca_country#47 ASC NULLS FIRST,
ca_gmt_offset#48 ASC NULLS FIRST, ca_location_type#49 [...]
(129) Scan parquet
Output [4]: [cr_returned_date_sk#1, cr_returning_customer_sk#2,
cr_returning_addr_sk#3, cr_return_amt_inc_tax#4]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
index d3cb3fe3..c15bf254 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q82.txt
@@ -347,7 +347,7 @@ Input [3]: [i_item_id#2, i_item_desc#3, i_current_price#4]
(59) NativeTakeOrdered
Input [3]: [i_item_id#2, i_item_desc#3, i_current_price#4]
-Arguments: X, [i_item_id#2 ASC NULLS FIRST]
+Arguments: X, X, [i_item_id#2 ASC NULLS FIRST]
(60) Scan parquet
Output [5]: [i_item_sk#1, i_item_id#2, i_item_desc#3, i_current_price#4,
i_manufact_id#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
index 95bf1118..fe63cfc8 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q83.txt
@@ -905,7 +905,7 @@ Input [5]: [item_id#15, sr_item_qty#16, cr_item_qty#26,
item_id#35, wr_item_qty#
(150) NativeTakeOrdered
Input [8]: [item_id#15, sr_item_qty#16, sr_dev#37, cr_item_qty#26, cr_dev#38,
wr_item_qty#36, wr_dev#39, average#40]
-Arguments: X, [item_id#15 ASC NULLS FIRST, sr_item_qty#16 ASC NULLS FIRST]
+Arguments: X, X, [item_id#15 ASC NULLS FIRST, sr_item_qty#16 ASC NULLS FIRST]
(151) Scan parquet
Output [3]: [sr_returned_date_sk#1, sr_item_sk#2, sr_return_quantity#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
index 7b435cee..552fa28c 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q84.txt
@@ -471,7 +471,7 @@ Input [5]: [c_customer_id#1, c_first_name#5, c_last_name#6,
cd_demo_sk#9, #15#15
(81) NativeTakeOrdered
Input [3]: [customer_id#16, customername#17, c_customer_id#1]
-Arguments: X, [c_customer_id#1 ASC NULLS FIRST]
+Arguments: X, X, [c_customer_id#1 ASC NULLS FIRST]
(82) NativeProject
Output [2]: [customer_id#16, customername#17]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
index b027641d..8deea780 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q85.txt
@@ -698,7 +698,7 @@ Input [4]: [r_reason_desc#29, avg(ws_quantity#5)#41,
avg(UnscaledValue(wr_refund
(121) NativeTakeOrdered
Input [4]: [substr(r_reason_desc, 1, 20)#44, avg(ws_quantity)#45,
avg(wr_refunded_cash)#46, avg(wr_fee)#47]
-Arguments: X, [substr(r_reason_desc, 1, 20)#44 ASC NULLS FIRST,
avg(ws_quantity)#45 ASC NULLS FIRST, avg(wr_refunded_cash)#46 ASC NULLS FIRST,
avg(wr_fee)#47 ASC NULLS FIRST]
+Arguments: X, X, [substr(r_reason_desc, 1, 20)#44 ASC NULLS FIRST,
avg(ws_quantity)#45 ASC NULLS FIRST, avg(wr_refunded_cash)#46 ASC NULLS FIRST,
avg(wr_fee)#47 ASC NULLS FIRST]
(122) Scan parquet
Output [7]: [ws_sold_date_sk#1, ws_item_sk#2, ws_web_page_sk#3,
ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
index 4f73ab53..a97b8626 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q86.txt
@@ -298,7 +298,7 @@ Input [8]: [total_sum#16, i_category#9, i_class#10,
lochierarchy#17, _w0#18, _w1
(51) NativeTakeOrdered
Input [5]: [total_sum#16, i_category#9, i_class#10, lochierarchy#17,
rank_within_parent#21]
-Arguments: X, [lochierarchy#17 DESC NULLS LAST, CASE WHEN (lochierarchy#17 =
0) THEN i_category#9 END ASC NULLS FIRST, rank_within_parent#21 ASC NULLS FIRST]
+Arguments: X, X, [lochierarchy#17 DESC NULLS LAST, CASE WHEN (lochierarchy#17
= 0) THEN i_category#9 END ASC NULLS FIRST, rank_within_parent#21 ASC NULLS
FIRST]
(52) Scan parquet
Output [3]: [ws_sold_date_sk#1, ws_item_sk#2, ws_net_paid#3]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
index bbf2f51f..970677c5 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q93.txt
@@ -242,7 +242,7 @@ Input [2]: [ss_customer_sk#2, sum(act_sales#12)#16]
(41) NativeTakeOrdered
Input [2]: [ss_customer_sk#2, sumsales#17]
-Arguments: X, [sumsales#17 ASC NULLS FIRST, ss_customer_sk#2 ASC NULLS FIRST]
+Arguments: X, X, [sumsales#17 ASC NULLS FIRST, ss_customer_sk#2 ASC NULLS
FIRST]
(42) Scan parquet
Output [5]: [ss_item_sk#1, ss_customer_sk#2, ss_ticket_number#3,
ss_quantity#4, ss_sales_price#5]
diff --git
a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
index 406f3072..edceaec7 100644
--- a/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
+++ b/dev/auron-it/src/main/resources/tpcds-plan-stability/spark-3.5/q99.txt
@@ -426,7 +426,7 @@ Input [8]: [_groupingexpression#14, sm_type#9, cc_name#11,
sum(CASE WHEN ((cs_sh
(73) NativeTakeOrdered
Input [8]: [substr(w_warehouse_name, 1, 20)#31, sm_type#9, cc_name#11, 30 days
#32, 31 - 60 days #33, 61 - 90 days #34, 91 - 120 days #35, >120 days #36]
-Arguments: X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST, sm_type#9
ASC NULLS FIRST, cc_name#11 ASC NULLS FIRST]
+Arguments: X, X, [substr(w_warehouse_name, 1, 20)#31 ASC NULLS FIRST,
sm_type#9 ASC NULLS FIRST, cc_name#11 ASC NULLS FIRST]
(74) Scan parquet
Output [5]: [cs_sold_date_sk#1, cs_ship_date_sk#2, cs_call_center_sk#3,
cs_ship_mode_sk#4, cs_warehouse_sk#5]
diff --git
a/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
b/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
index 3c7ecd53..89a205db 100644
---
a/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
+++
b/dev/auron-it/src/main/scala/org/apache/auron/integration/comparison/PlanStabilityChecker.scala
@@ -73,6 +73,7 @@ class PlanStabilityChecker(
|
|--- Actual ---
|$actualPlan
+ |----------------
|""".stripMargin)
false
}
diff --git a/native-engine/auron-planner/proto/auron.proto
b/native-engine/auron-planner/proto/auron.proto
index 99f8078b..2afe1cfa 100644
--- a/native-engine/auron-planner/proto/auron.proto
+++ b/native-engine/auron-planner/proto/auron.proto
@@ -635,7 +635,8 @@ message SortExecNode {
message FetchLimit {
// wrap into a message to make it optional
- uint64 limit = 1;
+ uint32 limit = 1;
+ uint32 offset = 2;
}
message PhysicalRepartition {
@@ -709,7 +710,8 @@ enum AggMode {
message LimitExecNode {
PhysicalPlanNode input = 1;
- uint64 limit = 2;
+ uint32 limit = 2;
+ uint32 offset = 3;
}
message FFIReaderExecNode {
diff --git a/native-engine/auron-planner/src/planner.rs
b/native-engine/auron-planner/src/planner.rs
index cfab99e1..5c79e571 100644
--- a/native-engine/auron-planner/src/planner.rs
+++ b/native-engine/auron-planner/src/planner.rs
@@ -357,12 +357,12 @@ impl PhysicalPlanner {
panic!("Failed to parse physical sort expressions:
{e}");
});
+ let fetch = sort.fetch_limit.as_ref();
+ let limit = fetch.map(|f| f.limit as usize);
+ let offset = fetch.map(|f| f.offset as usize).unwrap_or(0);
+
// always preserve partitioning
- Ok(Arc::new(SortExec::new(
- input,
- exprs,
- sort.fetch_limit.as_ref().map(|limit| limit.limit as
usize),
- )))
+ Ok(Arc::new(SortExec::new(input, exprs, limit, offset)))
}
PhysicalPlanType::BroadcastJoinBuildHashMap(bhm) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(self, bhm.input)?;
@@ -558,7 +558,11 @@ impl PhysicalPlanner {
}
PhysicalPlanType::Limit(limit) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(self, limit.input)?;
- Ok(Arc::new(LimitExec::new(input, limit.limit)))
+ Ok(Arc::new(LimitExec::new(
+ input,
+ limit.limit as usize,
+ limit.offset as usize,
+ )))
}
PhysicalPlanType::FfiReader(ffi_reader) => {
let schema = Arc::new(convert_required!(ffi_reader.schema)?);
@@ -571,7 +575,11 @@ impl PhysicalPlanner {
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(self, coalesce_batches.input)?;
- Ok(Arc::new(LimitExec::new(input,
coalesce_batches.batch_size)))
+ Ok(Arc::new(LimitExec::new(
+ input,
+ coalesce_batches.batch_size as usize,
+ 0,
+ )))
}
PhysicalPlanType::Expand(expand) => {
let schema = Arc::new(convert_required!(expand.schema)?);
diff --git a/native-engine/datafusion-ext-plans/src/limit_exec.rs
b/native-engine/datafusion-ext-plans/src/limit_exec.rs
index dd1e40d9..dd45ed12 100644
--- a/native-engine/datafusion-ext-plans/src/limit_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/limit_exec.rs
@@ -41,16 +41,18 @@ use crate::common::execution_context::ExecutionContext;
#[derive(Debug)]
pub struct LimitExec {
input: Arc<dyn ExecutionPlan>,
- limit: u64,
+ limit: usize,
+ offset: usize,
pub metrics: ExecutionPlanMetricsSet,
props: OnceCell<PlanProperties>,
}
impl LimitExec {
- pub fn new(input: Arc<dyn ExecutionPlan>, limit: u64) -> Self {
+ pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize, offset: usize) ->
Self {
Self {
input,
limit,
+ offset,
metrics: ExecutionPlanMetricsSet::new(),
props: OnceCell::new(),
}
@@ -59,7 +61,7 @@ impl LimitExec {
impl DisplayAs for LimitExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
- write!(f, "LimitExec(limit={})", self.limit)
+ write!(f, "LimitExec(limit={},offset={})", self.limit, self.offset)
}
}
@@ -95,7 +97,11 @@ impl ExecutionPlan for LimitExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(Self::new(children[0].clone(), self.limit)))
+ Ok(Arc::new(Self::new(
+ children[0].clone(),
+ self.limit,
+ self.offset,
+ )))
}
fn execute(
@@ -105,15 +111,19 @@ impl ExecutionPlan for LimitExec {
) -> Result<SendableRecordBatchStream> {
let exec_ctx = ExecutionContext::new(context, partition,
self.schema(), &self.metrics);
let input = exec_ctx.execute_with_input_stats(&self.input)?;
- execute_limit(input, self.limit, exec_ctx)
+ if self.offset == 0 {
+ execute_limit(input, self.limit, exec_ctx)
+ } else {
+ execute_limit_with_offset(input, self.limit, self.offset, exec_ctx)
+ }
}
fn statistics(&self) -> Result<Statistics> {
Statistics::with_fetch(
self.input.statistics()?,
self.schema(),
- Some(self.limit as usize),
- 0,
+ Some(self.limit),
+ self.offset,
1,
)
}
@@ -121,7 +131,7 @@ impl ExecutionPlan for LimitExec {
fn execute_limit(
mut input: SendableRecordBatchStream,
- limit: u64,
+ limit: usize,
exec_ctx: Arc<ExecutionContext>,
) -> Result<SendableRecordBatchStream> {
Ok(exec_ctx
@@ -131,11 +141,49 @@ fn execute_limit(
while remaining > 0
&& let Some(mut batch) = input.next().await.transpose()?
{
- if remaining < batch.num_rows() as u64 {
- batch = batch.slice(0, remaining as usize);
+ if remaining < batch.num_rows() {
+ batch = batch.slice(0, remaining);
+ remaining = 0;
+ } else {
+ remaining -= batch.num_rows();
+ }
+ exec_ctx.baseline_metrics().record_output(batch.num_rows());
+ sender.send(batch).await;
+ }
+ Ok(())
+ }))
+}
+
+fn execute_limit_with_offset(
+ mut input: SendableRecordBatchStream,
+ limit: usize,
+ offset: usize,
+ exec_ctx: Arc<ExecutionContext>,
+) -> Result<SendableRecordBatchStream> {
+ Ok(exec_ctx
+ .clone()
+ .output_with_sender("Limit", move |sender| async move {
+ let mut skip = offset;
+ let mut remaining = limit - skip;
+ while remaining > 0
+ && let Some(mut batch) = input.next().await.transpose()?
+ {
+ if skip > 0 {
+ let rows = batch.num_rows();
+ if skip >= rows {
+ skip -= rows;
+ continue;
+ }
+
+ batch = batch.slice(skip, rows - skip);
+ skip = 0;
+ }
+
+ if remaining < batch.num_rows() {
+ batch = batch.slice(0, remaining);
remaining = 0;
} else {
- remaining -= batch.num_rows() as u64;
+ remaining -= batch.num_rows();
}
exec_ctx.baseline_metrics().record_output(batch.num_rows());
sender.send(batch).await;
@@ -207,7 +255,7 @@ mod test {
("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
)?;
- let limit_exec = LimitExec::new(input, 2_u64);
+ let limit_exec = LimitExec::new(input, 2, 0);
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let output = limit_exec.execute(0, task_ctx)?;
@@ -226,4 +274,31 @@ mod test {
assert_eq!(row_count, Precision::Exact(2));
Ok(())
}
+
+ #[tokio::test]
+ async fn test_limit_with_offset() -> Result<()> {
+ let input = build_table(
+ ("a", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+ ("b", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+ ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+ )?;
+ let limit_exec = LimitExec::new(input, 7, 5);
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let output = limit_exec.execute(0, task_ctx)?;
+ let batches = common::collect(output).await?;
+ let row_count: usize = batches.iter().map(|batch|
batch.num_rows()).sum();
+
+ let expected = vec![
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 5 | 4 | 0 |",
+ "| 6 | 3 | 1 |",
+ "+---+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+ assert_eq!(row_count, 2);
+ Ok(())
+ }
}
diff --git a/native-engine/datafusion-ext-plans/src/sort_exec.rs
b/native-engine/datafusion-ext-plans/src/sort_exec.rs
index 659eff36..a39487c1 100644
--- a/native-engine/datafusion-ext-plans/src/sort_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/sort_exec.rs
@@ -86,7 +86,8 @@ const NUM_MAX_MERGING_BATCHES: usize = 32;
pub struct SortExec {
input: Arc<dyn ExecutionPlan>,
exprs: Vec<PhysicalSortExpr>,
- fetch: Option<usize>,
+ limit: Option<usize>,
+ offset: usize,
metrics: ExecutionPlanMetricsSet,
record_output: bool,
props: OnceCell<PlanProperties>,
@@ -96,13 +97,15 @@ impl SortExec {
pub fn new(
input: Arc<dyn ExecutionPlan>,
exprs: Vec<PhysicalSortExpr>,
- fetch: Option<usize>,
+ limit: Option<usize>,
+ offset: usize,
) -> Self {
let metrics = ExecutionPlanMetricsSet::new();
Self {
input,
exprs,
- fetch,
+ limit,
+ offset,
metrics,
record_output: true,
props: OnceCell::new(),
@@ -130,6 +133,7 @@ pub fn create_default_ascending_sort_exec(
})
.collect(),
None,
+ 0,
);
if let Some(execution_plan_metrics) = execution_plan_metrics {
sort_exec.metrics = execution_plan_metrics;
@@ -185,7 +189,8 @@ impl ExecutionPlan for SortExec {
Ok(Arc::new(Self::new(
children[0].clone(),
self.exprs.clone(),
- self.fetch,
+ self.limit,
+ self.offset,
)))
}
@@ -203,7 +208,13 @@ impl ExecutionPlan for SortExec {
}
fn statistics(&self) -> Result<Statistics> {
- Statistics::with_fetch(self.input.statistics()?, self.schema(),
self.fetch, 0, 1)
+ Statistics::with_fetch(
+ self.input.statistics()?,
+ self.schema(),
+ self.limit,
+ self.offset,
+ 1,
+ )
}
}
@@ -223,7 +234,8 @@ impl SortExec {
mem_consumer_info: None,
weak: Weak::new(),
prune_sort_keys_from_batch: prune_sort_keys_from_batch.clone(),
- limit: self.fetch.unwrap_or(usize::MAX),
+ skip: self.offset,
+ limit: self.limit.unwrap_or(usize::MAX),
record_output: self.record_output,
in_mem_blocks: Default::default(),
spills: Default::default(),
@@ -336,6 +348,7 @@ struct ExternalSorter {
mem_consumer_info: Option<Weak<MemConsumerInfo>>,
weak: Weak<Self>,
prune_sort_keys_from_batch: Arc<PruneSortKeysFromBatch>,
+ skip: usize,
limit: usize,
record_output: bool,
in_mem_blocks: Arc<Mutex<Vec<InMemSortedBlock>>>,
@@ -704,6 +717,9 @@ impl ExternalSorter {
let in_mem_blocks = std::mem::take(&mut
*self.in_mem_blocks.lock());
if !in_mem_blocks.is_empty() {
let mut merger = Merger::try_new(self.clone(), in_mem_blocks)?;
+ if self.skip > 0 {
+ merger.skip_rows::<InMemRowsKeyCollector>(self.skip,
output_batch_size);
+ }
while let Some((key_collector, pruned_batch)) =
merger.next::<InMemRowsKeyCollector>(output_batch_size)?
{
@@ -727,6 +743,9 @@ impl ExternalSorter {
let spill_blocks = spills.into_iter().map(|spill|
spill.block).collect();
let mut merger = Merger::try_new(self.to_arc(), spill_blocks)?;
+ if self.skip > 0 {
+ merger.skip_rows::<InMemRowsKeyCollector>(self.skip,
output_batch_size);
+ }
while let Some((key_collector, pruned_batch)) =
merger.next::<InMemRowsKeyCollector>(output_batch_size)?
{
@@ -1023,6 +1042,22 @@ impl<B: SortedBlock> Merger<B> {
}
Ok(Some((key_collector, pruned_batch)))
}
+
+ pub fn skip_rows<KC: KeyCollector>(
+ &mut self,
+ skip: usize,
+ suggested_batch_size: usize,
+ ) -> Result<()> {
+ let mut remaining = skip;
+ while remaining > 0 {
+ let batch_size = remaining.min(suggested_batch_size);
+ if self.next::<KC>(batch_size)?.is_none() {
+ break;
+ }
+ remaining -= batch_size;
+ }
+ Ok(())
+ }
}
fn merge_blocks<B: SortedBlock, KC: KeyCollector>(
@@ -1472,7 +1507,7 @@ mod test {
options: SortOptions::default(),
}];
- let sort = SortExec::new(input, sort_exprs, Some(6));
+ let sort = SortExec::new(input, sort_exprs, Some(6), 0);
let output = sort.execute(0, task_ctx)?;
let batches = common::collect(output).await?;
let expected = vec![
@@ -1491,6 +1526,40 @@ mod test {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_sort_i32_with_skip() -> Result<()> {
+ MemManager::init(100);
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let input = build_table(
+ ("a", &vec![9, 8, 7, 6, 5, 4, 3, 2, 1, 0]),
+ ("b", &vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
+ ("c", &vec![5, 6, 7, 8, 9, 0, 1, 2, 3, 4]),
+ )?;
+ let sort_exprs = vec![PhysicalSortExpr {
+ expr: Arc::new(Column::new("a", 0)),
+ options: SortOptions::default(),
+ }];
+
+ let sort = SortExec::new(input, sort_exprs, Some(8), 3);
+ let output = sort.execute(0, task_ctx)?;
+ let batches = common::collect(output).await?;
+ let expected = vec![
+ "+---+---+---+",
+ "| a | b | c |",
+ "+---+---+---+",
+ "| 3 | 6 | 1 |",
+ "| 4 | 5 | 0 |",
+ "| 5 | 4 | 9 |",
+ "| 6 | 3 | 8 |",
+ "| 7 | 2 | 7 |",
+ "+---+---+---+",
+ ];
+ assert_batches_eq!(expected, &batches);
+
+ Ok(())
+ }
}
#[cfg(test)]
@@ -1585,7 +1654,7 @@ mod fuzztest {
schema.clone(),
None,
)?);
- let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None));
+ let sort = Arc::new(SortExec::new(input, sort_exprs.clone(), None, 0));
let output = datafusion::physical_plan::collect(sort.clone(),
task_ctx.clone()).await?;
let a = concat_batches(&schema, &output)?;
let a_row_count = sort.clone().statistics()?.num_rows;
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
index 3de651e1..839f5655 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala
@@ -52,14 +52,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.First
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.CoalescedPartitionSpec
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.PartialMapperPartitionSpec
-import org.apache.spark.sql.execution.PartialReducerPartitionSpec
-import org.apache.spark.sql.execution.ShuffledRowRDD
-import org.apache.spark.sql.execution.ShufflePartitionSpec
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.UnaryExecNode
+import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
BroadcastQueryStageExec, QueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.auron.plan._
import org.apache.spark.sql.execution.auron.plan.ConvertToNativeExec
@@ -291,16 +284,38 @@ class ShimsImpl extends Shims with Logging {
child: SparkPlan): NativeGenerateBase =
NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput,
child)
- override def createNativeGlobalLimitExec(limit: Long, child: SparkPlan):
NativeGlobalLimitBase =
- NativeGlobalLimitExec(limit, child)
+ private def effectiveLimit(rawLimit: Int): Int =
+ if (rawLimit == -1) Int.MaxValue else rawLimit
- override def createNativeLocalLimitExec(limit: Long, child: SparkPlan):
NativeLocalLimitBase =
+ @sparkver("3.4 / 3.5")
+ override def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = {
+ (effectiveLimit(plan.limit), plan.offset)
+ }
+
+ @sparkver("3.4 / 3.5")
+ override def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int)
= {
+ (effectiveLimit(plan.limit), plan.offset)
+ }
+
+ override def createNativeGlobalLimitExec(
+ limit: Int,
+ offset: Int,
+ child: SparkPlan): NativeGlobalLimitBase =
+ NativeGlobalLimitExec(limit, offset, child)
+
+ override def createNativeLocalLimitExec(limit: Int, child: SparkPlan):
NativeLocalLimitBase =
NativeLocalLimitExec(limit, child)
+ @sparkver("3.4 / 3.5")
+ override def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = {
+ (effectiveLimit(plan.limit), plan.offset)
+ }
+
override def createNativeCollectLimitExec(
limit: Int,
+ offset: Int,
child: SparkPlan): NativeCollectLimitBase =
- NativeCollectLimitExec(limit, child)
+ NativeCollectLimitExec(limit, offset, child)
override def createNativeParquetInsertIntoHiveTableExec(
cmd: InsertIntoHiveTable,
@@ -337,13 +352,14 @@ class ShimsImpl extends Shims with Logging {
NativeSortExec(sortOrder, global, child)
override def createNativeTakeOrderedExec(
- limit: Long,
+ limit: Int,
+ offset: Int,
sortOrder: Seq[SortOrder],
child: SparkPlan): NativeTakeOrderedBase =
- NativeTakeOrderedExec(limit, sortOrder, child)
+ NativeTakeOrderedExec(limit, offset, sortOrder, child)
override def createNativePartialTakeOrderedExec(
- limit: Long,
+ limit: Int,
sortOrder: Seq[SortOrder],
child: SparkPlan,
metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase =
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
index ba514ab7..4ff7d804 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala
@@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.auron.sparkver
-case class NativeCollectLimitExec(limit: Int, override val child: SparkPlan)
- extends NativeCollectLimitBase(limit, child) {
+case class NativeCollectLimitExec(limit: Int, offset: Int, override val child:
SparkPlan)
+ extends NativeCollectLimitBase(limit, offset, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
index 8aba2de4..1b493f43 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala
@@ -20,8 +20,8 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.auron.sparkver
-case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan)
- extends NativeGlobalLimitBase(limit, child) {
+case class NativeGlobalLimitExec(limit: Int, offset: Int, override val child:
SparkPlan)
+ extends NativeGlobalLimitBase(limit, offset, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
index ad74b02e..805408c8 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.auron.sparkver
-case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan)
+case class NativeLocalLimitExec(limit: Int, override val child: SparkPlan)
extends NativeLocalLimitBase(limit, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
index e243e6f3..faf541a6 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.auron.sparkver
case class NativePartialTakeOrderedExec(
- limit: Long,
+ limit: Int,
sortOrder: Seq[SortOrder],
override val child: SparkPlan,
override val metrics: Map[String, SQLMetric])
diff --git
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
index cec298b6..16548155 100644
---
a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
+++
b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala
@@ -22,10 +22,11 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.auron.sparkver
case class NativeTakeOrderedExec(
- limit: Long,
+ limit: Int,
+ offset: Int,
sortOrder: Seq[SortOrder],
override val child: SparkPlan)
- extends NativeTakeOrderedBase(limit, sortOrder, child) {
+ extends NativeTakeOrderedBase(limit, offset, sortOrder, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
index d7adf3a7..7f62dd52 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/exec/AuronExecSuite.scala
@@ -17,13 +17,14 @@
package org.apache.auron.exec
import org.apache.spark.sql.AuronQueryTest
-import org.apache.spark.sql.execution.auron.plan.NativeCollectLimitExec
+import org.apache.spark.sql.execution.auron.plan.{NativeCollectLimitExec,
NativeGlobalLimitExec, NativeLocalLimitExec, NativeTakeOrderedExec}
import org.apache.auron.BaseAuronSQLSuite
+import org.apache.auron.util.AuronTestUtils
class AuronExecSuite extends AuronQueryTest with BaseAuronSQLSuite {
- test("Collect Limit") {
+ test("CollectLimit") {
withTable("t1") {
sql("create table t1(id INT) using parquet")
sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
@@ -36,4 +37,94 @@ class AuronExecSuite extends AuronQueryTest with
BaseAuronSQLSuite {
}
}
+ test("CollectLimit with offset") {
+ if (AuronTestUtils.isSparkV34OrGreater) {
+ withTempView("t1") {
+ sql("create table if not exists t1(id INT) using parquet")
+ sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+ Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+ case (limit, offset) => {
+ val query = s"select * from t1 limit $limit offset $offset"
+ val df = checkSparkAnswerAndOperator(() => spark.sql(query))
+ assert(collect(df.queryExecution.executedPlan) { case e:
NativeCollectLimitExec =>
+ e
+ }.size == 1)
+ }
+ }
+ }
+ }
+ }
+
+ test("GlobalLimit and LocalLimit") {
+ withTempView("t1") {
+ sql("create table if not exists t1(id INT) using parquet")
+ sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+ val df = checkSparkAnswerAndOperator(() =>
+ spark
+ .sql(s"""
+ |select id from (
+ | select * from t1 limit 5
+ |) where id > 0 limit 10;
+ |""".stripMargin)
+ .groupBy("id")
+ .count())
+ assert(collect(df.queryExecution.executedPlan) {
+ case e: NativeGlobalLimitExec => e
+ case e: NativeLocalLimitExec => e
+ }.size >= 2)
+ }
+ }
+
+ test("GlobalLimit with offset") {
+ if (AuronTestUtils.isSparkV34OrGreater) {
+ withTempView("t1") {
+ sql("create table if not exists t1(id INT) using parquet")
+ sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+ Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+ case (limit, offset) => {
+ val query = s"select * from t1 limit $limit offset $offset"
+ val df = checkSparkAnswerAndOperator(() =>
spark.sql(query).groupBy("id").count())
+ assert(collect(df.queryExecution.executedPlan) { case e:
NativeGlobalLimitExec =>
+ e
+ }.size == 1)
+ }
+ }
+ }
+ }
+ }
+
+ test("TakeOrderedAndProject") {
+ withTempView("t1") {
+ sql("create table if not exists t1(id INT) using parquet")
+ sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+ val df = checkSparkAnswerAndOperator(() =>
+ spark
+ .sql(s"""
+ | select id from t1 order by id limit 5
+ |""".stripMargin)
+ .groupBy("id")
+ .count())
+ assert(collect(df.queryExecution.executedPlan) { case e:
NativeTakeOrderedExec =>
+ e
+ }.size == 1)
+ }
+ }
+
+ test("TakeOrderedAndProject with offset") {
+ if (AuronTestUtils.isSparkV34OrGreater) {
+ withTempView("t1") {
+ sql("create table if not exists t1(id INT) using parquet")
+ sql("insert into t1
values(1),(2),(3),(3),(3),(4),(5),(6),(7),(8),(9),(10)")
+ Seq((5, 0), (10, 0), (5, 2), (10, 2), (1, 5), (3, 8)).foreach {
+ case (limit, offset) => {
+ val query = s"select * from t1 order by id limit $limit offset
$offset"
+ val df = checkSparkAnswerAndOperator(() =>
spark.sql(query).groupBy("id").count())
+ assert(collect(df.queryExecution.executedPlan) { case e:
NativeTakeOrderedExec =>
+ e
+ }.size == 1)
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index 67f2c5b2..9b8bed91 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -732,18 +732,21 @@ object AuronConverters extends Logging {
def convertLocalLimitExec(exec: LocalLimitExec): SparkPlan = {
logDebugPlanConversion(exec)
- Shims.get.createNativeLocalLimitExec(exec.limit.toLong, exec.child)
+ Shims.get.createNativeLocalLimitExec(exec.limit, exec.child)
}
def convertGlobalLimitExec(exec: GlobalLimitExec): SparkPlan = {
logDebugPlanConversion(exec)
- Shims.get.createNativeGlobalLimitExec(exec.limit.toLong, exec.child)
+ val (limit, offset) = Shims.get.getLimitAndOffset(exec)
+ Shims.get.createNativeGlobalLimitExec(limit, offset, exec.child)
}
def convertTakeOrderedAndProjectExec(exec: TakeOrderedAndProjectExec):
SparkPlan = {
logDebugPlanConversion(exec)
+ val (limit, offset) = Shims.get.getLimitAndOffset(exec)
val nativeTakeOrdered = Shims.get.createNativeTakeOrderedExec(
- exec.limit,
+ limit,
+ offset,
exec.sortOrder,
addRenameColumnsExec(convertToNative(exec.child)))
@@ -757,7 +760,8 @@ object AuronConverters extends Logging {
def convertCollectLimitExec(exec: CollectLimitExec): SparkPlan = {
logDebugPlanConversion(exec)
- Shims.get.createNativeCollectLimitExec(exec.limit, exec.child)
+ val (limit, offset) = Shims.get.getLimitAndOffset(exec)
+ Shims.get.createNativeCollectLimitExec(limit, offset, exec.child)
}
def convertHashAggregateExec(exec: HashAggregateExec): SparkPlan = {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
index 2b221d36..85ab65d5 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala
@@ -40,8 +40,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.FileSourceScanExec
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec,
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.auron.plan._
import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
@@ -122,11 +121,21 @@ abstract class Shims {
generatorOutput: Seq[Attribute],
child: SparkPlan): NativeGenerateBase
- def createNativeGlobalLimitExec(limit: Long, child: SparkPlan):
NativeGlobalLimitBase
+ def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = (plan.limit, 0)
- def createNativeLocalLimitExec(limit: Long, child: SparkPlan):
NativeLocalLimitBase
+ def createNativeGlobalLimitExec(
+ limit: Int,
+ offset: Int,
+ child: SparkPlan): NativeGlobalLimitBase
- def createNativeCollectLimitExec(limit: Int, child: SparkPlan):
NativeCollectLimitBase
+ def createNativeLocalLimitExec(limit: Int, child: SparkPlan):
NativeLocalLimitBase
+
+ def getLimitAndOffset(plan: CollectLimitExec): (Int, Int) = (plan.limit, 0)
+
+ def createNativeCollectLimitExec(
+ limit: Int,
+ offset: Int,
+ child: SparkPlan): NativeCollectLimitBase
def createNativeParquetInsertIntoHiveTableExec(
cmd: InsertIntoHiveTable,
@@ -154,13 +163,16 @@ abstract class Shims {
global: Boolean,
child: SparkPlan): NativeSortBase
+ def getLimitAndOffset(plan: TakeOrderedAndProjectExec): (Int, Int) =
(plan.limit, 0)
+
def createNativeTakeOrderedExec(
- limit: Long,
+ limit: Int,
+ offset: Int,
sortOrder: Seq[SortOrder],
child: SparkPlan): NativeTakeOrderedBase
def createNativePartialTakeOrderedExec(
- limit: Long,
+ limit: Int,
sortOrder: Seq[SortOrder],
child: SparkPlan,
metrics: Map[String, SQLMetric]): NativePartialTakeOrderedBase
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
index f8315ed1..d3366103 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitBase.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLMetrics}
import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.{LimitExecNode, PhysicalPlanNode}
-abstract class NativeCollectLimitBase(limit: Int, override val child:
SparkPlan)
+abstract class NativeCollectLimitBase(limit: Int, offset: Int, override val
child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
override def output: Seq[Attribute] = child.output
@@ -51,7 +51,8 @@ abstract class NativeCollectLimitBase(limit: Int, override
val child: SparkPlan)
val row = it.next().copy()
buf += row
}
- buf.toArray
+ val rows = buf.toArray
+ if (offset > 0) rows.drop(offset) else rows
}
override def doExecuteNative(): NativeRDD = {
@@ -78,6 +79,7 @@ abstract class NativeCollectLimitBase(limit: Int, override
val child: SparkPlan)
.newBuilder()
.setInput(singlePartitionRDD.nativePlan(inputPartition, taskContext))
.setLimit(limit)
+ .setOffset(offset)
.build()
PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build()
},
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
index 83c1f7d8..e8c54d47 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitBase.scala
@@ -35,7 +35,7 @@ import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.LimitExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
-abstract class NativeGlobalLimitBase(limit: Long, override val child:
SparkPlan)
+abstract class NativeGlobalLimitBase(limit: Int, offset: Int, override val
child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
@@ -67,6 +67,7 @@ abstract class NativeGlobalLimitBase(limit: Long, override
val child: SparkPlan)
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
.setLimit(limit)
+ .setOffset(offset)
.build()
PhysicalPlanNode.newBuilder().setLimit(nativeLimitExec).build()
},
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
index f0ba9e3b..10186728 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitBase.scala
@@ -33,7 +33,7 @@ import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.LimitExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
-abstract class NativeLocalLimitBase(limit: Long, override val child: SparkPlan)
+abstract class NativeLocalLimitBase(limit: Int, override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
index b3a5b7fe..dd7ed2f0 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedBase.scala
@@ -47,7 +47,8 @@ import org.apache.auron.protobuf.PhysicalSortExprNode
import org.apache.auron.protobuf.SortExecNode
abstract class NativeTakeOrderedBase(
- limit: Long,
+ limit: Int,
+ offset: Int,
sortOrder: Seq[SortOrder],
override val child: SparkPlan)
extends UnaryExecNode
@@ -81,35 +82,37 @@ abstract class NativeTakeOrderedBase(
val ord = new LazilyGeneratedOrdering(sortOrder, output)
// all partitions are sorted, so perform a sorted-merge to achieve the
result
- partial
- .execute()
- .map(_.copy())
- .mapPartitions(iter => Iterator.single(iter.toArray))
- .reduce { case (array1, array2) =>
- val result = ArrayBuffer[InternalRow]()
- var i = 0
- var j = 0
-
- while (result.length < limit && (i < array1.length || j <
array2.length)) {
- 0 match {
- case _ if i == array1.length =>
- result.append(array2(j))
- j += 1
- case _ if j == array2.length =>
- result.append(array1(i))
- i += 1
- case _ =>
- if (ord.compare(array1(i), array2(j)) <= 0) {
- result.append(array1(i))
- i += 1
- } else {
+ val rows =
+ partial
+ .execute()
+ .map(_.copy())
+ .mapPartitions(iter => Iterator.single(iter.toArray))
+ .reduce { case (array1, array2) =>
+ val result = ArrayBuffer[InternalRow]()
+ var i = 0
+ var j = 0
+
+ while (result.length < limit && (i < array1.length || j <
array2.length)) {
+ 0 match {
+ case _ if i == array1.length =>
result.append(array2(j))
j += 1
- }
+ case _ if j == array2.length =>
+ result.append(array1(i))
+ i += 1
+ case _ =>
+ if (ord.compare(array1(i), array2(j)) <= 0) {
+ result.append(array1(i))
+ i += 1
+ } else {
+ result.append(array2(j))
+ j += 1
+ }
+ }
}
+ result.toArray
}
- result.toArray
- }
+ if (offset > 0) rows.drop(offset) else rows
}
// check whether native converting is supported
@@ -141,7 +144,7 @@ abstract class NativeTakeOrderedBase(
.newBuilder()
.setInput(shuffledRDD.nativePlan(inputPartition, taskContext))
.addAllExpr(nativeSortExprs.asJava)
- .setFetchLimit(FetchLimit.newBuilder().setLimit(limit))
+
.setFetchLimit(FetchLimit.newBuilder().setLimit(limit).setOffset(offset))
.build()
PhysicalPlanNode.newBuilder().setSort(nativeTakeOrderedExec).build()
},
@@ -150,7 +153,7 @@ abstract class NativeTakeOrderedBase(
}
abstract class NativePartialTakeOrderedBase(
- limit: Long,
+ limit: Int,
sortOrder: Seq[SortOrder],
override val child: SparkPlan,
override val metrics: Map[String, SQLMetric])