Repository: spark
Updated Branches:
  refs/heads/master 6a2359ff1 -> eb0b4d6e2


[SPARK-11135] [SQL] Exchange incorrectly skips sorts when existing ordering is 
non-empty subset of required ordering

In Spark SQL, the Exchange planner tries to avoid unnecessary sorts in cases 
where the data has already been sorted by a superset of the requested sorting 
columns. For instance, let's say that a query calls for an operator's input to 
be sorted by `a.asc` and the input happens to already be sorted by `[a.asc, 
b.asc]`. In this case, we do not need to re-sort the input. The converse, 
however, is not true: if the query calls for `[a.asc, b.asc]`, then `a.asc` 
alone will not satisfy the ordering requirements, requiring an additional sort 
to be planned by Exchange.

However, the current Exchange code gets this wrong and incorrectly skips 
sorting when the existing output ordering is a subset of the required ordering. 
This is simple to fix, however.

This bug was introduced in https://github.com/apache/spark/pull/7458, so it 
affects 1.5.0+.

This patch fixes the bug and significantly improves the unit test coverage of 
Exchange's sort-planning logic.

Author: Josh Rosen <[email protected]>

Closes #9140 from JoshRosen/SPARK-11135.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb0b4d6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb0b4d6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb0b4d6e

Branch: refs/heads/master
Commit: eb0b4d6e2ddfb765f082d0d88472626336ad2609
Parents: 6a2359f
Author: Josh Rosen <[email protected]>
Authored: Thu Oct 15 17:36:55 2015 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Thu Oct 15 17:36:55 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/Exchange.scala   |  5 +-
 .../spark/sql/execution/PlannerSuite.scala      | 49 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb0b4d6e/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 2894537..1d3379a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -219,6 +219,8 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
     val requiredChildDistributions: Seq[Distribution] = 
operator.requiredChildDistribution
     val requiredChildOrderings: Seq[Seq[SortOrder]] = 
operator.requiredChildOrdering
     var children: Seq[SparkPlan] = operator.children
+    assert(requiredChildDistributions.length == children.length)
+    assert(requiredChildOrderings.length == children.length)
 
     // Ensure that the operator's children satisfy their output distribution 
requirements:
     children = children.zip(requiredChildDistributions).map { case (child, 
distribution) =>
@@ -248,8 +250,7 @@ private[sql] case class EnsureRequirements(sqlContext: 
SQLContext) extends Rule[
     children = children.zip(requiredChildOrderings).map { case (child, 
requiredOrdering) =>
       if (requiredOrdering.nonEmpty) {
         // If child.outputOrdering is [a, b] and requiredOrdering is [a], we 
do not need to sort.
-        val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min
-        if (minSize == 0 || requiredOrdering.take(minSize) != 
child.outputOrdering.take(minSize)) {
+        if (requiredOrdering != 
child.outputOrdering.take(requiredOrdering.length)) {
           sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, 
global = false, child)
         } else {
           child

http://git-wip-us.apache.org/repos/asf/spark/blob/eb0b4d6e/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index cafa1d5..ebdab1c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -354,6 +354,55 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
+  test("EnsureRequirements adds sort when there is no existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq.empty) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.isEmpty) {
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
+  test("EnsureRequirements skips sort when required ordering is prefix of 
existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA, orderingB)) :: 
Nil,
+      requiredChildOrdering = Seq(Seq(orderingA)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.nonEmpty) {
+      fail(s"No sorts should have been added:\n$outputPlan")
+    }
+  }
+
+  // This is a regression test for SPARK-11135
+  test("EnsureRequirements adds sort when required ordering isn't a prefix of 
existing ordering") {
+    val orderingA = SortOrder(Literal(1), Ascending)
+    val orderingB = SortOrder(Literal(2), Ascending)
+    assert(orderingA != orderingB)
+    val inputPlan = DummySparkPlan(
+      children = DummySparkPlan(outputOrdering = Seq(orderingA)) :: Nil,
+      requiredChildOrdering = Seq(Seq(orderingA, orderingB)),
+      requiredChildDistribution = Seq(UnspecifiedDistribution)
+    )
+    val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
+    assertDistributionRequirementsAreSatisfied(outputPlan)
+    if (outputPlan.collect { case s: TungstenSort => true; case s: Sort => 
true }.isEmpty) {
+      fail(s"Sort should have been added:\n$outputPlan")
+    }
+  }
+
   // 
---------------------------------------------------------------------------------------------
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to