Repository: spark Updated Branches: refs/heads/master 174e72cec -> b67385203
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies MapPartitionsRDD was keeping a reference to `prev` after a call to `clearDependencies` which could lead to memory leak. Author: Guillaume Poulin <[email protected]> Closes #10623 from gpoulin/map_partition_deps. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6738520 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6738520 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6738520 Branch: refs/heads/master Commit: b6738520374637347ab5ae6c801730cdb6b35daa Parents: 174e72c Author: Guillaume Poulin <[email protected]> Authored: Wed Jan 6 21:34:46 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Jan 6 21:34:46 2016 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b6738520/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index 4312d3a..e4587c9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.{Partition, TaskContext} * An RDD that applies the provided function to every partition of the parent RDD. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( - prev: RDD[T], + var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { @@ -36,4 +36,9 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) + + override def clearDependencies() { + super.clearDependencies() + prev = null + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
