Repository: spark Updated Branches: refs/heads/branch-2.3 7f1708a44 -> d3255a571
revert [SPARK-21743][SQL] top-most limit should not cause memory leak ## What changes were proposed in this pull request? There is a performance regression in Spark 2.3. When we read a big compressed text file which is un-splittable(e.g. gz), and then take the first record, Spark will scan all the data in the text file which is very slow. For example, `spark.read.text("/tmp/test.csv.gz").head(1)`, we can check out the SQL UI and see that the file is fully scanned. ![image](https://user-images.githubusercontent.com/3182036/41445252-264b1e5a-6ffd-11e8-9a67-4c31d129a314.png) This is introduced by #18955 , which adds a LocalLimit to the query when executing `Dataset.head`. The foundamental problem is, `Limit` is not well whole-stage-codegened. It keeps consuming the input even if we have already hit the limitation. However, if we just fix LIMIT whole-stage-codegen, the memory leak test will fail, as we don't fully consume the inputs to trigger the resource cleanup. To fix it completely, we should do the following 1. fix LIMIT whole-stage-codegen, stop consuming inputs after hitting the limitation. 2. in whole-stage-codegen, provide a way to release resource of the parant operator, and apply it in LIMIT 3. automatically release resource when task ends. Howere this is a non-trivial change, and is risky to backport to Spark 2.3. This PR proposes to revert #18955 in Spark 2.3. The memory leak is not a big issue. When task ends, Spark will release all the pages allocated by this task, which is kind of releasing most of the resources. I'll submit a exhaustive fix to master later. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #21573 from cloud-fan/limit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3255a57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3255a57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3255a57 Branch: refs/heads/branch-2.3 Commit: d3255a57109a5cea79948aa4192008b988961aa3 Parents: 7f1708a Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Jun 15 14:33:17 2018 +0200 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Fri Jun 15 14:33:17 2018 +0200 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/SparkStrategies.scala | 7 +------ .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 ----- 2 files changed, 1 insertion(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d3255a57/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c6565fc..a0a641b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -70,12 +70,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) => TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil case Limit(IntegerLiteral(limit), child) => - // With whole stage codegen, Spark releases resources only when all the output data of the - // query plan are consumed. It's possible that `CollectLimitExec` only consumes a little - // data from child plan and finishes the query without releasing resources. Here we wrap - // the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and - // trigger the resource releasing work, after we consume `limit` rows. - CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil + CollectLimitExec(limit, planLater(child)) :: Nil case other => planLater(other) :: Nil } case Limit(IntegerLiteral(limit), Sort(order, true, child)) => http://git-wip-us.apache.org/repos/asf/spark/blob/d3255a57/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ebebf62..bc57efe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2720,11 +2720,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-21743: top-most limit should not cause memory leak") { - // In unit test, Spark will fail the query if memory leak detected. - spark.range(100).groupBy("id").count().limit(1).collect() - } - test("SPARK-21652: rule confliction of InferFiltersFromConstraints and ConstantPropagation") { withTempView("t1", "t2") { Seq((1, 1)).toDF("col1", "col2").createOrReplaceTempView("t1") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org