Address Aaron's and Jerry's comments

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/94ddc91d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/94ddc91d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/94ddc91d

Branch: refs/heads/master
Commit: 94ddc91d063f290a0e230a153f9e63b2f7357d4a
Parents: 347fafe
Author: Andrew Or <andrewo...@gmail.com>
Authored: Tue Dec 31 10:50:08 2013 -0800
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Tue Dec 31 10:50:08 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala   | 4 +---
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala  | 7 +++++--
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/94ddc91d/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 77a594a..1b2e541 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -169,10 +169,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
     }
     val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
       (combiner1, combiner2) => {
-      combiner1.zipAll(combiner2, new CoGroup, new CoGroup).map {
-        case (v1, v2) => v1 ++ v2
+        combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
       }
-    }
     new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
       createCombiner, mergeValue, mergeCombiners)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/94ddc91d/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0e8f46c..680ebf9 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -257,14 +257,15 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: 
ClassTag, C: ClassTag](
 
   // Iterate through (K, G) pairs in sorted order from an on-disk map
   private class DiskKGIterator(file: File) extends Iterator[(K, G)] {
-    val in = ser.deserializeStream(new FileInputStream(file))
+    val fstream = new FileInputStream(file)
+    val dstream = ser.deserializeStream(fstream)
     var nextItem: Option[(K, G)] = None
     var eof = false
 
     def readNextItem(): Option[(K, G)] = {
       if (!eof) {
         try {
-          return Some(in.readObject().asInstanceOf[(K, G)])
+          return Some(dstream.readObject().asInstanceOf[(K, G)])
         } catch {
           case e: EOFException =>
             eof = true
@@ -296,6 +297,8 @@ private[spark] class SpillableAppendOnlyMap[K, V, G: 
ClassTag, C: ClassTag](
 
     // TODO: Ensure this gets called even if the iterator isn't drained.
     def cleanup() {
+      fstream.close()
+      dstream.close()
       file.delete()
     }
   }

Reply via email to