Repository: spark
Updated Branches:
  refs/heads/master 5cdb8a23d -> 5c27b0d4f


[SPARK-19355][SQL][FOLLOWUP] Remove the child.outputOrdering check in global 
limit

## What changes were proposed in this pull request?

This is based on the discussion 
https://github.com/apache/spark/pull/16677/files#r212805327.

As SQL standard doesn't mandate that a nested order by followed by a limit has 
to respect that ordering clause, this patch removes the `child.outputOrdering` 
check.

## How was this patch tested?

Unit tests.

Closes #22239 from viirya/improve-global-limit-parallelism-followup.

Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c27b0d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c27b0d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c27b0d4

Branch: refs/heads/master
Commit: 5c27b0d4f8d378bd7889d26fb358f478479b9996
Parents: 5cdb8a2
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Mon Aug 27 14:02:50 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Aug 27 14:02:50 2018 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/execution/limit.scala | 10 +++++-----
 .../spark/sql/execution/TakeOrderedAndProjectSuite.scala  | 10 ++++++++++
 2 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c27b0d4/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 392ca13..fb46970 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -122,11 +122,11 @@ case class GlobalLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode {
       Nil
     }
 
-    // During global limit, try to evenly distribute limited rows across data
-    // partitions. If disabled, scanning data partitions sequentially until 
reaching limit number.
-    // Besides, if child output has certain ordering, we can't evenly pick up 
rows from
-    // each parititon.
-    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
+    // This is an optimization to evenly distribute limited rows across all 
partitions.
+    // When enabled, Spark goes to take rows at each partition repeatedly 
until reaching
+    // limit number. When disabled, Spark takes all rows at first partition, 
then rows
+    // at second partition ..., until reaching limit number.
+    val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit
 
     val shuffled = new ShuffledRowRDD(shuffleDependency)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5c27b0d4/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 7e317a4..0a1c94c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -31,10 +32,19 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with 
SharedSQLContext {
   private var rand: Random = _
   private var seed: Long = 0
 
+  private val originalLimitFlatGlobalLimit = 
SQLConf.get.getConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT)
+
   protected override def beforeAll(): Unit = {
     super.beforeAll()
     seed = System.currentTimeMillis()
     rand = new Random(seed)
+
+    // Disable the optimization to make Sort-Limit match 
`TakeOrderedAndProject` semantics.
+    SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, false)
+  }
+
+  protected override def afterAll() = {
+    SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, 
originalLimitFlatGlobalLimit)
   }
 
   private def generateRandomInputData(): DataFrame = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to