Make serializer a parameter to ExternalAppendOnlyMap

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4a014dc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4a014dc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4a014dc5

Branch: refs/heads/master
Commit: 4a014dc59c15bd35e025a754cf436629117e581a
Parents: 8fbff9f
Author: Andrew Or <andrewo...@gmail.com>
Authored: Sun Dec 29 21:55:53 2013 -0800
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Sun Dec 29 21:55:53 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala      | 4 ++--
 .../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a014dc5/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 77a594a..9066215 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -173,8 +173,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
         case (v1, v2) => v1 ++ v2
       }
     }
-    new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
-      createCombiner, mergeValue, mergeCombiners)
+    new ExternalAppendOnlyMap[K, CoGroupValue, 
CoGroupCombiner](createCombiner, mergeValue,
+      mergeCombiners, SparkEnv.get.serializerManager.get(serializerClass))
   }
 
   override def clearDependencies() {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a014dc5/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 1de545c..b15cae1 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -41,10 +41,10 @@ import org.apache.spark.serializer.Serializer
 private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
-    mergeCombiners: (C, C) => C)
+    mergeCombiners: (C, C) => C,
+    serializer: Serializer = SparkEnv.get.serializerManager.default)
   extends Iterable[(K, C)] with Serializable {
 
-  private val serializer = SparkEnv.get.serializerManager.default
   private val mergeBeforeSpill: Boolean = mergeCombiners != null
 
   private val map: SpillableAppendOnlyMap[K, V, _, C] = {

Reply via email to