Make serializer a parameter to ExternalAppendOnlyMap
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4a014dc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4a014dc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4a014dc5 Branch: refs/heads/master Commit: 4a014dc59c15bd35e025a754cf436629117e581a Parents: 8fbff9f Author: Andrew Or <andrewo...@gmail.com> Authored: Sun Dec 29 21:55:53 2013 -0800 Committer: Andrew Or <andrewo...@gmail.com> Committed: Sun Dec 29 21:55:53 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 4 ++-- .../org/apache/spark/util/collection/ExternalAppendOnlyMap.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a014dc5/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 77a594a..9066215 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -173,8 +173,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: case (v1, v2) => v1 ++ v2 } } - new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( - createCombiner, mergeValue, mergeCombiners) + new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](createCombiner, mergeValue, + mergeCombiners, SparkEnv.get.serializerManager.get(serializerClass)) } override def clearDependencies() { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a014dc5/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 1de545c..b15cae1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -41,10 +41,10 @@ import org.apache.spark.serializer.Serializer private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) + mergeCombiners: (C, C) => C, + serializer: Serializer = SparkEnv.get.serializerManager.default) extends Iterable[(K, C)] with Serializable { - private val serializer = SparkEnv.get.serializerManager.default private val mergeBeforeSpill: Boolean = mergeCombiners != null private val map: SpillableAppendOnlyMap[K, V, _, C] = {