Repository: spark
Updated Branches:
  refs/heads/master 7a8412352 -> 0c2aee69b


[SPARK-22410][SQL] Remove unnecessary output from BatchEvalPython's children 
plans

## What changes were proposed in this pull request?

When we insert `BatchEvalPython` for Python UDFs into a query plan, if its 
child has some outputs that are not used by the original parent node, 
`BatchEvalPython` will still take those outputs and save into the queue. When 
the data for those outputs are big, it is easily to generate big spill on disk.

For example, the following reproducible code is from the JIRA ticket.

```python
from pyspark.sql.functions import *
from pyspark.sql.types import *

lines_of_file = [ "this is a line" for x in xrange(10000) ]
file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ]
data = [ file_obj for x in xrange(5) ]

small_df = spark.sparkContext.parallelize(data).map(lambda x : (x[0], 
x[1])).toDF(["file", "lines"])
exploded = small_df.select("file", explode("lines"))

def split_key(s):
    return s.split("/")[1]

split_key_udf = udf(split_key, StringType())

with_filename = exploded.withColumn("filename", split_key_udf("file"))
with_filename.explain(True)
```

The physical plan before/after this change:

Before:

```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [file#0, lines#1, col#5, pythonUDF0#14]
   +- Generate explode(lines#1), true, false, [col#5]
      +- Scan ExistingRDD[file#0,lines#1]

```

After:

```
*Project [file#0, col#5, pythonUDF0#14 AS filename#9]
+- BatchEvalPython [split_key(file#0)], [col#5, file#0, pythonUDF0#14]
   +- *Project [col#5, file#0]
      +- Generate explode(lines#1), true, false, [col#5]
         +- Scan ExistingRDD[file#0,lines#1]
```

Before this change, `lines#1` is a redundant input to `BatchEvalPython`. This 
patch removes it by adding a Project.

## How was this patch tested?

Manually test.

Author: Liang-Chi Hsieh <[email protected]>

Closes #19642 from viirya/SPARK-22410.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c2aee69
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c2aee69
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c2aee69

Branch: refs/heads/master
Commit: 0c2aee69b0efeea5ce8d39c0564e9e4511faf387
Parents: 7a84123
Author: Liang-Chi Hsieh <[email protected]>
Authored: Sat Nov 4 13:11:09 2017 +0100
Committer: Wenchen Fan <[email protected]>
Committed: Sat Nov 4 13:11:09 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/execution/python/ExtractPythonUDFs.scala | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c2aee69/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index d682536..e15e760 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -127,8 +127,19 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
       // If there aren't any, we are done.
       plan
     } else {
+      val inputsForPlan = plan.references ++ plan.outputSet
+      val prunedChildren = plan.children.map { child =>
+        val allNeededOutput = inputsForPlan.intersect(child.outputSet).toSeq
+        if (allNeededOutput.length != child.output.length) {
+          ProjectExec(allNeededOutput, child)
+        } else {
+          child
+        }
+      }
+      val planWithNewChildren = plan.withNewChildren(prunedChildren)
+
       val attributeMap = mutable.HashMap[PythonUDF, Expression]()
-      val splitFilter = trySplitFilter(plan)
+      val splitFilter = trySplitFilter(planWithNewChildren)
       // Rewrite the child that has the input required for the UDF
       val newChildren = splitFilter.children.map { child =>
         // Pick the UDF we are going to evaluate


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to