Minor cleanup for Scala style

Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fcc443b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fcc443b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fcc443b3

Branch: refs/heads/master
Commit: fcc443b3db3664987a6f863b59c06be7169175d5
Parents: 2a2ca2a
Author: Aaron Davidson <aa...@databricks.com>
Authored: Wed Dec 25 18:42:06 2013 -0800
Committer: Andrew Or <andrewo...@gmail.com>
Committed: Thu Dec 26 23:40:07 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     |  6 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 28 ++++----
 .../spark/util/ExternalAppendOnlyMap.scala      | 76 ++++++++++----------
 3 files changed, 55 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala 
b/core/src/main/scala/org/apache/spark/Aggregator.scala
index ecaeb2d..5826255 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -67,10 +67,10 @@ case class Aggregator[K, V, C] (
       combiners.iterator
     } else {
       // Spilling
-      def combinerIdentity(combiner:C) = combiner
-      val combiners = new ExternalAppendOnlyMap[K, C, C](combinerIdentity, 
mergeCombiners, mergeCombiners)
+      val combiners =
+        new ExternalAppendOnlyMap[K, C, C](Predef.identity, mergeCombiners, 
mergeCombiners)
       iter.foreach { case(k, c) => combiners.insert(k, c) }
       combiners.iterator
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index a7265f3..3af0376 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -25,7 +25,6 @@ import org.apache.spark.{InterruptibleIterator, Partition, 
Partitioner, SparkEnv
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap}
 
-
 private[spark] sealed trait CoGroupSplitDep extends Serializable
 
 private[spark] case class NarrowCoGroupSplitDep(
@@ -62,6 +61,10 @@ class CoGroupPartition(idx: Int, val deps: 
Array[CoGroupSplitDep])
 class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], 
part: Partitioner)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
+  type CoGroup = ArrayBuffer[Any]
+  type CoGroupValue = (Any, Int)  // Int is dependency number
+  type CoGroupCombiner = Seq[CoGroup]
+
   private var serializerClass: String = null
 
   def setSerializer(cls: String): CoGroupedRDD[K] = {
@@ -125,7 +128,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
 
     if (!externalSorting) {
       val map = new AppendOnlyMap[K, CoGroupCombiner]
-      val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = 
(hadVal, oldVal) => {
+      val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, 
oldVal) => {
         if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any])
       }
       val getSeq = (k: K) => map.changeValue(k, update)
@@ -147,30 +150,29 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: 
Product2[K, _]]], part:
     }
   }
 
-  private def createExternalMap(numRdds:Int): ExternalAppendOnlyMap [K, 
CoGroupValue, CoGroupCombiner] = {
-    def createCombiner(v: CoGroupValue): CoGroupCombiner = {
+  private def createExternalMap(numRdds: Int)
+    : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
+
+    val createCombiner: (CoGroupValue) => CoGroupCombiner = v => {
       val newCombiner = Array.fill(numRdds)(new CoGroup)
-      mergeValue(newCombiner, v)
+      v match { case (value, depNum) => newCombiner(depNum) += value }
+      newCombiner
     }
-    def mergeValue(c: CoGroupCombiner, v: CoGroupValue): CoGroupCombiner = {
+    val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner = (c, 
v) => {
       v match { case (value, depNum) => c(depNum) += value }
       c
     }
-    def mergeCombiners(c1: CoGroupCombiner, c2: CoGroupCombiner): 
CoGroupCombiner = {
+    val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner 
= (c1, c2) => {
       c1.zipAll(c2, new CoGroup, new CoGroup).map {
         case (v1, v2) => v1 ++ v2
       }
     }
-    new ExternalAppendOnlyMap [K, CoGroupValue, CoGroupCombiner] (
-      createCombiner,mergeValue, mergeCombiners)
+    new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
+      createCombiner, mergeValue, mergeCombiners)
   }
 
   override def clearDependencies() {
     super.clearDependencies()
     rdds = null
   }
-
-  type CoGroup = ArrayBuffer[Any]
-  type CoGroupValue = (Any, Int)  // Int is dependency number
-  type CoGroupCombiner = Seq[CoGroup]
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fcc443b3/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala 
b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
index 790dcf0..c8c0534 100644
--- a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala
@@ -18,21 +18,27 @@
 package org.apache.spark.util
 
 import java.io._
+import java.text.DecimalFormat
+
+import scala.Some
+import scala.Predef._
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
+import scala.util.Random
 
 /**
  * A wrapper for SpillableAppendOnlyMap that handles two cases:
  *
  * (1)  If a mergeCombiners function is specified, merge values into combiners 
before
- *      disk spill, as it is possible to merge the resulting combiners later
+ *      disk spill, as it is possible to merge the resulting combiners later.
  *
  * (2)  Otherwise, group values of the same key together before disk spill, 
and merge
- *      them into combiners only after reading them back from disk
+ *      them into combiners only after reading them back from disk.
  */
-class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
-                                      mergeValue: (C, V) => C,
-                                      mergeCombiners: (C, C) => C,
-                                      memoryThresholdMB: Int = 1024)
+class ExternalAppendOnlyMap[K, V, C](
+    createCombiner: V => C,
+    mergeValue: (C, V) => C,
+    mergeCombiners: (C, C) => C,
+    memoryThresholdMB: Long = 1024)
   extends Iterable[(K, C)] with Serializable {
 
   private val mergeBeforeSpill: Boolean = mergeCombiners != null
@@ -40,8 +46,9 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
   private val map: SpillableAppendOnlyMap[K, V, _, C] = {
     if (mergeBeforeSpill) {
       new SpillableAppendOnlyMap[K, V, C, C] (createCombiner,
-        mergeValue, mergeCombiners, combinerIdentity, memoryThresholdMB)
+        mergeValue, mergeCombiners, Predef.identity, memoryThresholdMB)
     } else {
+      val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
       new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C] (createGroup,
         mergeValueIntoGroup, mergeGroups, combineGroup, memoryThresholdMB)
     }
@@ -51,8 +58,6 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => C,
 
   override def iterator: Iterator[(K, C)] = map.iterator
 
-  private def combinerIdentity(combiner: C): C = combiner
-  private def createGroup(value: V): ArrayBuffer[V] = ArrayBuffer[V](value)
   private def mergeValueIntoGroup(group: ArrayBuffer[V], value: V): 
ArrayBuffer[V] = {
     group += value
     group
@@ -78,14 +83,16 @@ class ExternalAppendOnlyMap[K, V, C] (createCombiner: V => 
C,
  * is exceeded. A group with type M is an intermediate combiner, and shares 
the same
  * type as either C or ArrayBuffer[V].
  */
-class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => M,
-                                          mergeValue: (M, V) => M,
-                                          mergeGroups: (M, M) => M,
-                                          createCombiner: M => C,
-                                          memoryThresholdMB: Int = 1024)
+class SpillableAppendOnlyMap[K, V, M, C](
+    createGroup: V => M,
+    mergeValue: (M, V) => M,
+    mergeGroups: (M, M) => M,
+    createCombiner: M => C,
+    memoryThresholdMB: Long = 1024)
   extends Iterable[(K, C)] with Serializable {
 
   var currentMap = new AppendOnlyMap[K, M]
+  var sizeTracker = new SamplingSizeTracker(currentMap)
   var oldMaps = new ArrayBuffer[DiskIterator]
 
   def insert(key: K, value: V): Unit = {
@@ -93,9 +100,8 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V => 
M,
       if (hadVal) mergeValue(oldVal, value) else createGroup(value)
     }
     currentMap.changeValue(key, update)
-    val mapSize = SizeEstimator.estimate(currentMap)
-    //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) {
-    if (mapSize > 1024 * 10) {
+    sizeTracker.updateMade()
+    if (sizeTracker.estimateSize() > memoryThresholdMB * 1024 * 1024) {
       spill()
     }
   }
@@ -104,9 +110,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V 
=> M,
     val file = File.createTempFile("external_append_only_map", "")  // Add 
spill location
     val out = new ObjectOutputStream(new FileOutputStream(file))
     val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
-    sortedMap.foreach { out.writeObject( _ ) }
+    sortedMap.foreach(out.writeObject)
     out.close()
     currentMap = new AppendOnlyMap[K, M]
+    sizeTracker = new SamplingSizeTracker(currentMap)
     oldMaps.append(new DiskIterator(file))
   }
 
@@ -115,13 +122,10 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V 
=> M,
   // An iterator that sort-merges (K, M) pairs from memory and disk into (K, 
C) pairs
   class ExternalIterator extends Iterator[(K, C)] {
 
-    // Order by increasing key hash value
-    implicit object KVOrdering extends Ordering[KMITuple] {
-      def compare(a:KMITuple, b:KMITuple) = 
-a.key.hashCode().compareTo(b.key.hashCode())
-    }
-    val pq = PriorityQueue[KMITuple]()
+    // Order by key hash value
+    val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode()))
     val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
-    inputStreams.foreach { readFromIterator( _ ) }
+    inputStreams.foreach(readFromIterator)
 
     // Read from the given iterator until a key of different hash is retrieved
     def readFromIterator(iter: Iterator[(K, M)]): Unit = {
@@ -131,10 +135,7 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V 
=> M,
         pq.enqueue(KMITuple(k, m, iter))
         minHash match {
           case None => minHash = Some(k.hashCode())
-          case Some(expectedHash) =>
-            if (k.hashCode() != expectedHash){
-              return
-            }
+          case Some(expectedHash) if k.hashCode() != expectedHash => return
         }
       }
     }
@@ -159,16 +160,16 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V 
=> M,
           collidedKMI += newKMI
         }
       }
-      collidedKMI.foreach { pq.enqueue( _ ) }
+      collidedKMI.foreach(pq.enqueue(_))
       (minKey, createCombiner(minGroup))
     }
 
-    case class KMITuple(key:K, group:M, iterator:Iterator[(K, M)])
+    case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)])
   }
 
   // Iterate through (K, M) pairs in sorted order from the in-memory map
   class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
-    val sortedMap = currentMap.iterator.toList.sortBy(kc => kc._1.hashCode())
+    val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode())
     val it = sortedMap.iterator
     override def hasNext: Boolean = it.hasNext
     override def next(): (K, M) = it.next()
@@ -180,21 +181,18 @@ class SpillableAppendOnlyMap[K, V, M, C] (createGroup: V 
=> M,
     var nextItem: Option[(K, M)] = None
 
     override def hasNext: Boolean = {
-      try {
-        nextItem = Some(in.readObject().asInstanceOf[(K, M)])
-        true
+      nextItem = try {
+        Some(in.readObject().asInstanceOf[(K, M)])
       } catch {
-        case e: EOFException =>
-          nextItem = None
-          false
+        case e: EOFException => None
       }
+      nextItem.isDefined
     }
 
     override def next(): (K, M) = {
       nextItem match {
         case Some(item) => item
-        case None =>
-          throw new NoSuchElementException
+        case None => throw new NoSuchElementException
       }
     }
   }

Reply via email to