Repository: spark Updated Branches: refs/heads/master 5784c8d95 -> dec8f5371
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak This patch simply removes a `cache()` on an intermediate RDD when evaluating Python UDFs. Author: ksonj <[email protected]> Closes #5973 from ksonj/udf and squashes the following commits: db5b564 [ksonj] removed TODO about cleaning up fe70c54 [ksonj] Remove cache() causing memory leak Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dec8f537 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dec8f537 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dec8f537 Branch: refs/heads/master Commit: dec8f53719597119034dffbe43b2a9e5fd963083 Parents: 5784c8d Author: ksonj <[email protected]> Authored: Thu May 7 12:04:19 2015 -0700 Committer: Michael Armbrust <[email protected]> Committed: Thu May 7 12:04:43 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/execution/pythonUdfs.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dec8f537/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 7a43bfd..58cb198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -219,8 +219,8 @@ case class EvaluatePython( /** * :: DeveloperApi :: - * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input - * data is cached and zipped with the result of the udf evaluation. + * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. + * The input data is zipped with the result of the udf evaluation. */ @DeveloperApi case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan) @@ -229,8 +229,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: def children: Seq[SparkPlan] = child :: Nil def execute(): RDD[Row] = { - // TODO: Clean up after ourselves? - val childResults = child.execute().map(_.copy()).cache() + val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => val pickle = new Pickler --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
