Repository: spark
Updated Branches:
  refs/heads/master 69350aa2f -> 8a837bf4f


[SPARK-24193] create TakeOrderedAndProjectExec only when the limit number is 
below spark.sql.execution.topKSortFallbackThreshold.

## What changes were proposed in this pull request?

Physical plan of `select colA from t order by colB limit M` is 
`TakeOrderedAndProject`;
Currently `TakeOrderedAndProject` sorts data in memory, see 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
We can add a config – if the number of limit (M) is too big, we can sort by 
disk. Thus memory issue can be resolved.

## How was this patch tested?

Test added

Author: jinxing <jinxing6...@126.com>

Closes #21252 from jinxing64/SPARK-24193.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a837bf4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a837bf4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a837bf4

Branch: refs/heads/master
Commit: 8a837bf4f3f2758f7825d2362cf9de209026651a
Parents: 69350aa
Author: jinxing <jinxing6...@126.com>
Authored: Thu May 17 22:29:18 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu May 17 22:29:18 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/internal/SQLConf.scala   | 11 +++++++++++
 .../apache/spark/sql/execution/SparkStrategies.scala    | 12 ++++++++----
 .../org/apache/spark/sql/execution/PlannerSuite.scala   | 12 ++++++++++++
 3 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a837bf4/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b00edca..2a673c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1253,6 +1253,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val TOP_K_SORT_FALLBACK_THRESHOLD =
+    buildConf("spark.sql.execution.topKSortFallbackThreshold")
+      .internal()
+      .doc("In SQL queries with a SORT followed by a LIMIT like " +
+          "'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, 
do a top-K sort" +
+          " in memory, otherwise do a global sort which spills to disk if 
necessary.")
+      .intConf
+      .createWithDefault(Int.MaxValue)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -1424,6 +1433,8 @@ class SQLConf extends Serializable with Logging {
 
   def sortBeforeRepartition: Boolean = getConf(SORT_BEFORE_REPARTITION)
 
+  def topKSortFallbackThreshold: Int = getConf(TOP_K_SORT_FALLBACK_THRESHOLD)
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used 
to determine if two
    * identifiers are equal.

http://git-wip-us.apache.org/repos/asf/spark/blob/8a837bf4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 37a0b9d..b97a87a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,9 +66,11 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ReturnAnswer(rootPlan) => rootPlan match {
-        case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+        case Limit(IntegerLiteral(limit), Sort(order, true, child))
+            if limit < conf.topKSortFallbackThreshold =>
           TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
+        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
+            if limit < conf.topKSortFallbackThreshold =>
           TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
           // With whole stage codegen, Spark releases resources only when all 
the output data of the
@@ -79,9 +81,11 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: 
Nil
         case other => planLater(other) :: Nil
       }
-      case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+      case Limit(IntegerLiteral(limit), Sort(order, true, child))
+          if limit < conf.topKSortFallbackThreshold =>
         TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-      case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child))) =>
+      case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child)))
+          if limit < conf.topKSortFallbackThreshold =>
         TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a837bf4/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index f0dfe6b..a375f88 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -197,6 +197,18 @@ class PlannerSuite extends SharedSQLContext {
     assert(planned.cachedPlan.isInstanceOf[CollectLimitExec])
   }
 
+  test("TakeOrderedAndProjectExec appears only when number of limit is below 
the threshold.") {
+    withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1000") {
+      val query0 = testData.select('value).orderBy('key).limit(100)
+      val planned0 = query0.queryExecution.executedPlan
+      
assert(planned0.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
+
+      val query1 = testData.select('value).orderBy('key).limit(2000)
+      val planned1 = query1.queryExecution.executedPlan
+      assert(planned1.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isEmpty)
+    }
+  }
+
   test("SPARK-23375: Cached sorted data doesn't need to be re-sorted") {
     val query = testData.select('key, 'value).sort('key.desc).cache()
     assert(query.queryExecution.optimizedPlan.isInstanceOf[InMemoryRelation])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to