[ 
https://issues.apache.org/jira/browse/SPARK-44804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wan Kun updated SPARK-44804:
----------------------------
    Description: 
In each partition, SortMergeJoin will compute one by one from the streamed 
side, so we could respect the streamed side ordering to remove unnecessary sort.
For example, when REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION is false:


{code:java}
SELECT *
FROM (
    SELECT t1.*, row_number() over(partition by a order by b) rn
    FROM values(1, 1) t1(a, b)
) t1
JOIN values(1) t2(c)
ON a = c
JOIN values(1, 1) t3(d, e)
ON a = d
AND b = e
{code}


Plan:

{code:java}
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#220, b#221], [d#223, e#224], Inner
   :- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
   :  +- SortMergeJoin [a#220], [c#222], Inner
   :     :- Window [row_number() windowspecdefinition(a#220, b#221 ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rn#218], [a#220], [b#221 ASC NULLS FIRST]
   :     :  +- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
   :     :     +- Exchange hashpartitioning(a#220, 5), ENSURE_REQUIREMENTS, 
[plan_id=93]
   :     :        +- LocalTableScan [a#220, b#221]
   :     +- Sort [c#222 ASC NULLS FIRST], false, 0
   :        +- Exchange hashpartitioning(c#222, 5), ENSURE_REQUIREMENTS, 
[plan_id=98]
   :           +- LocalTableScan [c#222]
   +- Sort [d#223 ASC NULLS FIRST, e#224 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(d#223, 5), ENSURE_REQUIREMENTS, [plan_id=104]
         +- LocalTableScan [d#223, e#224]
{code}


The second *Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0* in 
the plan could be removed

  was:
In each partition, SortMergeJoin will compute one by one from the streamed 
side, so we could respect the streamed side ordering to remove unneeded sort.
For example, when REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION is false:


{code:java}
SELECT *
FROM (
    SELECT t1.*, row_number() over(partition by a order by b) rn
    FROM values(1, 1) t1(a, b)
) t1
JOIN values(1) t2(c)
ON a = c
JOIN values(1, 1) t3(d, e)
ON a = d
AND b = e
{code}


Plan:

{code:java}
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [a#220, b#221], [d#223, e#224], Inner
   :- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
   :  +- SortMergeJoin [a#220], [c#222], Inner
   :     :- Window [row_number() windowspecdefinition(a#220, b#221 ASC NULLS 
FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
rn#218], [a#220], [b#221 ASC NULLS FIRST]
   :     :  +- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
   :     :     +- Exchange hashpartitioning(a#220, 5), ENSURE_REQUIREMENTS, 
[plan_id=93]
   :     :        +- LocalTableScan [a#220, b#221]
   :     +- Sort [c#222 ASC NULLS FIRST], false, 0
   :        +- Exchange hashpartitioning(c#222, 5), ENSURE_REQUIREMENTS, 
[plan_id=98]
   :           +- LocalTableScan [c#222]
   +- Sort [d#223 ASC NULLS FIRST, e#224 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(d#223, 5), ENSURE_REQUIREMENTS, [plan_id=104]
         +- LocalTableScan [d#223, e#224]
{code}


The second *Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0* in 
the plan could be removed


> SortMergeJoin should respect the streamed side ordering
> -------------------------------------------------------
>
>                 Key: SPARK-44804
>                 URL: https://issues.apache.org/jira/browse/SPARK-44804
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Wan Kun
>            Priority: Major
>
> In each partition, SortMergeJoin will compute one by one from the streamed 
> side, so we could respect the streamed side ordering to remove unnecessary 
> sort.
> For example, when REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION is false:
> {code:java}
> SELECT *
> FROM (
>     SELECT t1.*, row_number() over(partition by a order by b) rn
>     FROM values(1, 1) t1(a, b)
> ) t1
> JOIN values(1) t2(c)
> ON a = c
> JOIN values(1, 1) t3(d, e)
> ON a = d
> AND b = e
> {code}
> Plan:
> {code:java}
> AdaptiveSparkPlan isFinalPlan=false
> +- SortMergeJoin [a#220, b#221], [d#223, e#224], Inner
>    :- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
>    :  +- SortMergeJoin [a#220], [c#222], Inner
>    :     :- Window [row_number() windowspecdefinition(a#220, b#221 ASC NULLS 
> FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS rn#218], [a#220], [b#221 ASC NULLS FIRST]
>    :     :  +- Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0
>    :     :     +- Exchange hashpartitioning(a#220, 5), ENSURE_REQUIREMENTS, 
> [plan_id=93]
>    :     :        +- LocalTableScan [a#220, b#221]
>    :     +- Sort [c#222 ASC NULLS FIRST], false, 0
>    :        +- Exchange hashpartitioning(c#222, 5), ENSURE_REQUIREMENTS, 
> [plan_id=98]
>    :           +- LocalTableScan [c#222]
>    +- Sort [d#223 ASC NULLS FIRST, e#224 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(d#223, 5), ENSURE_REQUIREMENTS, 
> [plan_id=104]
>          +- LocalTableScan [d#223, e#224]
> {code}
> The second *Sort [a#220 ASC NULLS FIRST, b#221 ASC NULLS FIRST], false, 0* in 
> the plan could be removed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to