[ 
https://issues.apache.org/jira/browse/SPARK-21998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maryann Xue updated SPARK-21998:
--------------------------------
    Description: 
Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
fact that its children have already been sorted on the join keys, while this is 
often not true until EnsureRequirements has been applied.
{code}
  /**
   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
   * key, so we can get ordering for key from child's output ordering.
   */
  private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
    : Seq[SortOrder] = {
    keys.zip(childOutputOrdering).map { case (key, childOrder) =>
      SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
    }
  }
{code}
Thus SortMergeJoinExec's outputOrdering is most likely not correct during the 
physical planning stage, and as a result, potential physical optimizations that 
rely on the required/output orderings, like SPARK-18591, will not work for 
SortMergeJoinExec.
The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be:
1. If the childOutputOrdering satisfies (is a superset of) the required child 
ordering => childOutputOrdering
2. Otherwise => required child ordering

  was:
Right now SortMergeJoinExec calculates its outputOrdering based on its 
children's outputOrdering, thus oftentimes the SortMergeJoinExec's 
outputOrdering is NOT correct until after EnsureRequirements, which happens at 
a rather late stage. As a result, potential optimizations that rely on the 
required/output orderings, like SPARK-18591, will not work for 
SortMergeJoinExec.
Unlike operators like Project or Filter, which simply preserve the ordering of 
their inputs, the SortMergeJoinExec has a behavior that generates a new 
ordering in its output regardless of the orderings of its children. I think the 
code below together with its comment is buggy.
{code}
  /**
   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
   * key, so we can get ordering for key from child's output ordering.
   */
  private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
    : Seq[SortOrder] = {
    keys.zip(childOutputOrdering).map { case (key, childOrder) =>
      SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
    }
  }
{code}


> SortMergeJoinExec did not calculate its outputOrdering correctly during 
> physical planning
> -----------------------------------------------------------------------------------------
>
>                 Key: SPARK-21998
>                 URL: https://issues.apache.org/jira/browse/SPARK-21998
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Maryann Xue
>            Priority: Minor
>
> Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
> fact that its children have already been sorted on the join keys, while this 
> is often not true until EnsureRequirements has been applied.
> {code}
>   /**
>    * For SMJ, child's output must have been sorted on key or expressions with 
> the same order as
>    * key, so we can get ordering for key from child's output ordering.
>    */
>   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
> Seq[SortOrder])
>     : Seq[SortOrder] = {
>     keys.zip(childOutputOrdering).map { case (key, childOrder) =>
>       SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
> childOrder.child - key)
>     }
>   }
> {code}
> Thus SortMergeJoinExec's outputOrdering is most likely not correct during the 
> physical planning stage, and as a result, potential physical optimizations 
> that rely on the required/output orderings, like SPARK-18591, will not work 
> for SortMergeJoinExec.
> The right behavior of {{getKeyOrdering(keys, childOutputOrdering)}} should be:
> 1. If the childOutputOrdering satisfies (is a superset of) the required child 
> ordering => childOutputOrdering
> 2. Otherwise => required child ordering



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to