Repository: spark
Updated Branches:
  refs/heads/branch-2.3 7f1708a44 -> d3255a571


revert [SPARK-21743][SQL] top-most limit should not cause memory leak

## What changes were proposed in this pull request?

There is a performance regression in Spark 2.3. When we read a big compressed 
text file which is un-splittable(e.g. gz), and then take the first record, 
Spark will scan all the data in the text file which is very slow. For example, 
`spark.read.text("/tmp/test.csv.gz").head(1)`, we can check out the SQL UI and 
see that the file is fully scanned.

![image](https://user-images.githubusercontent.com/3182036/41445252-264b1e5a-6ffd-11e8-9a67-4c31d129a314.png)

This is introduced by #18955 , which adds a LocalLimit to the query when 
executing `Dataset.head`. The foundamental problem is, `Limit` is not well 
whole-stage-codegened. It keeps consuming the input even if we have already hit 
the limitation.

However, if we just fix LIMIT whole-stage-codegen, the memory leak test will 
fail, as we don't fully consume the inputs to trigger the resource cleanup.

To fix it completely, we should do the following
1. fix LIMIT whole-stage-codegen, stop consuming inputs after hitting the 
limitation.
2. in whole-stage-codegen, provide a way to release resource of the parant 
operator, and apply it in LIMIT
3. automatically release resource when task ends.

Howere this is a non-trivial change, and is risky to backport to Spark 2.3.

This PR proposes to revert #18955 in Spark 2.3. The memory leak is not a big 
issue. When task ends, Spark will release all the pages allocated by this task, 
which is kind of releasing most of the resources.

I'll submit a exhaustive fix to master later.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #21573 from cloud-fan/limit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3255a57
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3255a57
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3255a57

Branch: refs/heads/branch-2.3
Commit: d3255a57109a5cea79948aa4192008b988961aa3
Parents: 7f1708a
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Jun 15 14:33:17 2018 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Fri Jun 15 14:33:17 2018 +0200

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/SparkStrategies.scala      | 7 +------
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala   | 5 -----
 2 files changed, 1 insertion(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3255a57/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c6565fc..a0a641b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -70,12 +70,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
           TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
         case Limit(IntegerLiteral(limit), child) =>
-          // With whole stage codegen, Spark releases resources only when all 
the output data of the
-          // query plan are consumed. It's possible that `CollectLimitExec` 
only consumes a little
-          // data from child plan and finishes the query without releasing 
resources. Here we wrap
-          // the child plan with `LocalLimitExec`, to stop the processing of 
whole stage codegen and
-          // trigger the resource releasing work, after we consume `limit` 
rows.
-          CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: 
Nil
+          CollectLimitExec(limit, planLater(child)) :: Nil
         case other => planLater(other) :: Nil
       }
       case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/d3255a57/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ebebf62..bc57efe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2720,11 +2720,6 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
-  test("SPARK-21743: top-most limit should not cause memory leak") {
-    // In unit test, Spark will fail the query if memory leak detected.
-    spark.range(100).groupBy("id").count().limit(1).collect()
-  }
-
   test("SPARK-21652: rule confliction of InferFiltersFromConstraints and 
ConstantPropagation") {
     withTempView("t1", "t2") {
       Seq((1, 1)).toDF("col1", "col2").createOrReplaceTempView("t1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to