Repository: spark
Updated Branches:
  refs/heads/master e07baf141 -> 095862a3c


[SPARK-17271][SQL] Planner adds un-necessary Sort even if child ordering is 
semantically same as required ordering

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17271

Planner is adding un-needed SORT operation due to bug in the way comparison for 
`SortOrder` is done at 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L253
`SortOrder` needs to be compared semantically because `Expression` within two 
`SortOrder` can be "semantically equal" but not literally equal objects.

eg. In case of `sql("SELECT * FROM table1 a JOIN table2 b ON a.col1=b.col1")`

Expression in required SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId,
        qualifier = Some("a")
      )
```

Expression in child SortOrder:
```
      AttributeReference(
        name = "col1",
        dataType = LongType,
        nullable = false
      ) (exprId = exprId)
```

Notice that the output column has a qualifier but the child attribute does not 
but the inherent expression is the same and hence in this case we can say that 
the child satisfies the required sort order.

This PR includes following changes:
- Added a `semanticEquals` method to `SortOrder` so that it can compare 
underlying child expressions semantically (and not using default Object.equals)
- Fixed `EnsureRequirements` to use semantic comparison of SortOrder

## How was this patch tested?

- Added a test case to `PlannerSuite`. Ran rest tests in `PlannerSuite`

Author: Tejas Patil <[email protected]>

Closes #14841 from tejasapatil/SPARK-17271_sort_order_equals_bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/095862a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/095862a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/095862a3

Branch: refs/heads/master
Commit: 095862a3cff73fd88db9ed37a63e7629e664ff64
Parents: e07baf1
Author: Tejas Patil <[email protected]>
Authored: Sun Aug 28 19:14:58 2016 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Sun Aug 28 19:14:58 2016 +0200

----------------------------------------------------------------------
 .../sql/catalyst/expressions/SortOrder.scala    |  3 ++
 .../execution/exchange/EnsureRequirements.scala | 11 +++++-
 .../spark/sql/execution/PlannerSuite.scala      | 40 +++++++++++++++++++-
 3 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/095862a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index de779ed..f498f35 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -61,6 +61,9 @@ case class SortOrder(child: Expression, direction: 
SortDirection)
   override def sql: String = child.sql + " " + direction.sql
 
   def isAscending: Boolean = direction == Ascending
+
+  def semanticEquals(other: SortOrder): Boolean =
+    (direction == other.direction) && child.semanticEquals(other.child)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/095862a3/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 951051c..fee7010 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -250,7 +250,16 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
     children = children.zip(requiredChildOrderings).map { case (child, 
requiredOrdering) =>
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we 
do not need to sort.
-        if (requiredOrdering != 
child.outputOrdering.take(requiredOrdering.length)) {
+        val orderingMatched = if (requiredOrdering.length > 
child.outputOrdering.length) {
+          false
+        } else {
+          requiredOrdering.zip(child.outputOrdering).forall {
+            case (requiredOrder, childOutputOrder) =>
+              requiredOrder.semanticEquals(childOutputOrder)
+          }
+        }
+
+        if (!orderingMatched) {
           SortExec(requiredOrdering, global = false, child = child)
         } else {
           child

http://git-wip-us.apache.org/repos/asf/spark/blob/095862a3/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 436ff59..07efc72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Literal, SortOrder}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
@@ -444,6 +444,44 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("EnsureRequirements skips sort when required ordering is semantically 
equal to " +
+    "existing ordering") {
+    val exprId: ExprId = NamedExpression.newExprId
+    val attribute1 =
+      AttributeReference(
+        name = "col1",
+        dataType = LongType,
+        nullable = false
+      ) (exprId = exprId,
+        qualifier = Some("col1_qualifier")
+      )
+
+    val attribute2 =
+      AttributeReference(
+        name = "col1",
+        dataType = LongType,
+        nullable = false
+      ) (exprId = exprId)
+
+    val orderingA1 = SortOrder(attribute1, Ascending)
+    val orderingA2 = SortOrder(attribute2, Ascending)
+
+    assert(orderingA1 != orderingA2, s"$orderingA1 should NOT equal to 
$orderingA2")
+    assert(orderingA1.semanticEquals(orderingA2),
+      s"$orderingA1 should be semantically equal to $orderingA2")
+
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA1)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA2)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = 
EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
+      fail(s"No sorts should have been added:\n$outputPlan")
+    }
+  }
+
   // This is a regression test for SPARK-11135
   test("EnsureRequirements adds sort when required ordering isn't a prefix of 
existing ordering") {
     val orderingA = SortOrder(Literal(1), Ascending)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to