Minor cleanup for Scala style
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fcc443b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fcc443b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fcc443b3 Branch: refs/heads/master Commit: fcc443b3db3664987a6f863b59c06be7169175d5 Parents: 2a2ca2a Author: Aaron Davidson <aa...@databricks.com> Authored: Wed Dec 25 18:42:06 2013 -0800 Committer: Andrew Or <andrewo...@gmail.com> Committed: Thu Dec 26 23:40:07 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/Aggregator.scala | 6 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 28 ++++---- .../spark/util/ExternalAppendOnlyMap.scala | 76 ++++++++++---------- 3 files changed, 55 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/Aggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index ecaeb2d..5826255 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -67,10 +67,10 @@ case class Aggregator[K, V, C] ( combiners.iterator } else { // Spilling - def combinerIdentity(combiner:C) = combiner - val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, mergeCombiners, mergeCombiners) + val combiners = + new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, mergeCombiners) iter.foreach { case(k, c) => combiners.insert(k, c) } combiners.iterator } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index a7265f3..3af0376 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -25,7 +25,6 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} - private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] case class NarrowCoGroupSplitDep( @@ -62,6 +61,10 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { + type CoGroup = ArrayBuffer[Any] + type CoGroupValue = (Any, Int) // Int is dependency number + type CoGroupCombiner = Seq[CoGroup] + private var serializerClass: String = null def setSerializer(cls: String): CoGroupedRDD[K] = { @@ -125,7 +128,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: if (!externalSorting) { val map = new AppendOnlyMap[K, CoGroupCombiner] - val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { + val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, oldVal) => { if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) } val getSeq = (k: K) => map.changeValue(k, update) @@ -147,30 +150,29 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } } - private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] = { - def createCombiner(v: CoGroupValue): CoGroupCombiner = { + private def createExternalMap(numRdds: Int) + : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { + + val createCombiner: (CoGroupValue) => CoGroupCombiner = v => { val newCombiner = Array.fill(numRdds)(new CoGroup) - mergeValue(newCombiner, v) + v match { case (value, depNum) => newCombiner(depNum) += value } + newCombiner } - def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = { + val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, v) => { v match { case (value, depNum) => c(depNum) += value } c } - def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): CoGroupCombiner = { + val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner = (c1, c2) => { c1.zipAll(c2, new CoGroup, new CoGroup).map { case (v1, v2) => v1 ++ v2 } } - new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] ( - createCombiner,mergeValue, mergeCombiners) + new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( + createCombiner, mergeValue, mergeCombiners) } override def clearDependencies() { super.clearDependencies() rdds = null } - - type CoGroup = ArrayBuffer[Any] - type CoGroupValue = (Any, Int) // Int is dependency number - type CoGroupCombiner = Seq[CoGroup] } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala index 790dcf0..c8c0534 100644 --- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -18,21 +18,27 @@ package org.apache.spark.util import java.io._ +import java.text.DecimalFormat + +import scala.Some +import scala.Predef._ import scala.collection.mutable.{ArrayBuffer, PriorityQueue} +import scala.util.Random /** * A wrapper for SpillableAppendOnlyMap that handles two cases: * * (1) If a mergeCombiners function is specified, merge values into combiners before - * disk spill, as it is possible to merge the resulting combiners later + * disk spill, as it is possible to merge the resulting combiners later. * * (2) Otherwise, group values of the same key together before disk spill, and merge - * them into combiners only after reading them back from disk + * them into combiners only after reading them back from disk. */ -class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - memoryThresholdMB: Int = 1024) +class ExternalAppendOnlyMap[K, V, C]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + memoryThresholdMB: Long = 1024) extends Iterable[(K, C)] with Serializable { private val mergeBeforeSpill: Boolean = mergeCombiners != null @@ -40,8 +46,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, private val map: SpillableAppendOnlyMap[K, V, _, C] = { if (mergeBeforeSpill) { new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, - mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB) + mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB) } else { + val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value) new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup, mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB) } @@ -51,8 +58,6 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, override def iterator: Iterator[(K, C)] = map.iterator - private def combinerIdentity(combiner: C): C = combiner - private def createGroup(value: V): ArrayBuffer[V] = ArrayBuffer[V](value) private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): ArrayBuffer[V] = { group += value group @@ -78,14 +83,16 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C, * is exceeded. A group with type M is an intermediate combiner, and shares the same * type as either C or ArrayBuffer[V]. */ -class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, - mergeValue: (M, V) => M, - mergeGroups: (M, M) => M, - createCombiner: M => C, - memoryThresholdMB: Int = 1024) +class SpillableAppendOnlyMap[K, V, M, C]( + createGroup: V => M, + mergeValue: (M, V) => M, + mergeGroups: (M, M) => M, + createCombiner: M => C, + memoryThresholdMB: Long = 1024) extends Iterable[(K, C)] with Serializable { var currentMap = new AppendOnlyMap[K, M] + var sizeTracker = new SamplingSizeTracker(currentMap) var oldMaps = new ArrayBuffer[DiskIterator] def insert(key: K, value: V): Unit = { @@ -93,9 +100,8 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, if (hadVal) mergeValue(oldVal, value) else createGroup(value) } currentMap.changeValue(key, update) - val mapSize = SizeEstimator.estimate(currentMap) - //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) { - if (mapSize > 1024 * 10) { + sizeTracker.updateMade() + if (sizeTracker.estimateSize() > memoryThresholdMB * 1024 * 1024) { spill() } } @@ -104,9 +110,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, val file = File.createTempFile("external_append_only_map", "") // Add spill location val out = new ObjectOutputStream(new FileOutputStream(file)) val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) - sortedMap.foreach { out.writeObject( _ ) } + sortedMap.foreach(out.writeObject) out.close() currentMap = new AppendOnlyMap[K, M] + sizeTracker = new SamplingSizeTracker(currentMap) oldMaps.append(new DiskIterator(file)) } @@ -115,13 +122,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, // An iterator that sort-merges (K, M) pairs from memory and disk into (K, C) pairs class ExternalIterator extends Iterator[(K, C)] { - // Order by increasing key hash value - implicit object KVOrdering extends Ordering[KMITuple] { - def compare(a:KMITuple, b:KMITuple) = -a.key.hashCode().compareTo(b.key.hashCode()) - } - val pq = PriorityQueue[KMITuple]() + // Order by key hash value + val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode())) val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps - inputStreams.foreach { readFromIterator( _ ) } + inputStreams.foreach(readFromIterator) // Read from the given iterator until a key of different hash is retrieved def readFromIterator(iter: Iterator[(K, M)]): Unit = { @@ -131,10 +135,7 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, pq.enqueue(KMITuple(k, m, iter)) minHash match { case None => minHash = Some(k.hashCode()) - case Some(expectedHash) => - if (k.hashCode() != expectedHash){ - return - } + case Some(expectedHash) if k.hashCode() != expectedHash => return } } } @@ -159,16 +160,16 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, collidedKMI += newKMI } } - collidedKMI.foreach { pq.enqueue( _ ) } + collidedKMI.foreach(pq.enqueue(_)) (minKey, createCombiner(minGroup)) } - case class KMITuple(key:K, group:M, iterator:Iterator[(K, M)]) + case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) } // Iterate through (K, M) pairs in sorted order from the in-memory map class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] { - val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode()) + val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode()) val it = sortedMap.iterator override def hasNext: Boolean = it.hasNext override def next(): (K, M) = it.next() @@ -180,21 +181,18 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M, var nextItem: Option[(K, M)] = None override def hasNext: Boolean = { - try { - nextItem = Some(in.readObject().asInstanceOf[(K, M)]) - true + nextItem = try { + Some(in.readObject().asInstanceOf[(K, M)]) } catch { - case e: EOFException => - nextItem = None - false + case e: EOFException => None } + nextItem.isDefined } override def next(): (K, M) = { nextItem match { case Some(item) => item - case None => - throw new NoSuchElementException + case None => throw new NoSuchElementException } } }