Address Aaron's and Jerry's comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/94ddc91d Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/94ddc91d Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/94ddc91d Branch: refs/heads/master Commit: 94ddc91d063f290a0e230a153f9e63b2f7357d4a Parents: 347fafe Author: Andrew Or <andrewo...@gmail.com> Authored: Tue Dec 31 10:50:08 2013 -0800 Committer: Andrew Or <andrewo...@gmail.com> Committed: Tue Dec 31 10:50:08 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 4 +--- .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 7 +++++-- 2 files changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/94ddc91d/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 77a594a..1b2e541 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -169,10 +169,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (combiner1, combiner2) => { - combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map { - case (v1, v2) => v1 ++ v2 + combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 } } - } new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( createCombiner, mergeValue, mergeCombiners) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/94ddc91d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 0e8f46c..680ebf9 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -257,14 +257,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // Iterate through (K, G) pairs in sorted order from an on-disk map private class DiskKGIterator(file: File) extends Iterator[(K, G)] { - val in = ser.deserializeStream(new FileInputStream(file)) + val fstream = new FileInputStream(file) + val dstream = ser.deserializeStream(fstream) var nextItem: Option[(K, G)] = None var eof = false def readNextItem(): Option[(K, G)] = { if (!eof) { try { - return Some(in.readObject().asInstanceOf[(K, G)]) + return Some(dstream.readObject().asInstanceOf[(K, G)]) } catch { case e: EOFException => eof = true @@ -296,6 +297,8 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: ClassTag, C: ClassTag]( // TODO: Ensure this gets called even if the iterator isn't drained. def cleanup() { + fstream.close() + dstream.close() file.delete() } }