Working ExternalAppendOnlyMap for Aggregator, but not for CoGroupedRDD
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/97fbb3ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/97fbb3ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/97fbb3ec Branch: refs/heads/master Commit: 97fbb3ec52785883a0eee8644f9f4603c4c9df21 Parents: 5e69fc5 Author: Andrew Or <andrewo...@gmail.com> Authored: Mon Dec 23 22:50:15 2013 -0800 Committer: Andrew Or <andrewo...@gmail.com> Committed: Thu Dec 26 23:40:07 2013 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/Aggregator.scala | 13 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 43 ++++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 11 +- .../spark/util/ExternalAppendOnlyMap.scala | 136 +++++++++++++++++++ 4 files changed, 182 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/Aggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 1a2ec55..ae16242 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,7 +17,7 @@ package org.apache.spark -import org.apache.spark.util.AppendOnlyMap +import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} /** * A set of functions used to aggregate data. @@ -32,7 +32,9 @@ case class Aggregator[K, V, C] ( mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { - val combiners = new AppendOnlyMap[K, C] + println("Combining values by key!!") + //val combiners = new AppendOnlyMap[K, C] + val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) @@ -45,7 +47,9 @@ case class Aggregator[K, V, C] ( } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { - val combiners = new AppendOnlyMap[K, C] + println("Combining combiners by key!!") + //val combiners = new AppendOnlyMap[K, C] + val combiners = new ExternalAppendOnlyMap[K, C](mergeCombiners) var kc: (K, C) = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 @@ -56,5 +60,4 @@ case class Aggregator[K, V, C] ( } combiners.iterator } -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 911a002..6283686 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} -import org.apache.spark.util.AppendOnlyMap +import org.apache.spark.util.{AppendOnlyMap, ExternalAppendOnlyMap} private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -101,36 +101,49 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: override val partitioner = Some(part) override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { + println("Computing in CoGroupedRDD!") + // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) val split = s.asInstanceOf[CoGroupPartition] val numRdds = split.deps.size - // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs) - val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] - - val update: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = (hadVal, oldVal) => { - if (hadVal) oldVal else Array.fill(numRdds)(new ArrayBuffer[Any]) - } - - val getSeq = (k: K) => { - map.changeValue(k, update) - } + val combineFunction: (Seq[ArrayBuffer[Any]], Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = + (x, y) => { x ++ y } + //val map = new AppendOnlyMap[K, Seq[ArrayBuffer[Any]]] + val map = new ExternalAppendOnlyMap[K, Seq[ArrayBuffer[Any]]](combineFunction) val ser = SparkEnv.get.serializerManager.get(serializerClass) for ((dep, depNum) <- split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { // Read them from the parent - rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv => - getSeq(kv._1)(depNum) += kv._2 + rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { + kv => addToMap(kv._1, kv._2, depNum) } } case ShuffleCoGroupSplitDep(shuffleId) => { // Read map outputs of shuffle val fetcher = SparkEnv.get.shuffleFetcher fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context, ser).foreach { - kv => getSeq(kv._1)(depNum) += kv._2 + kv => addToMap(kv._1, kv._2, depNum) } } } - new InterruptibleIterator(context, map.iterator) + + def addToMap(key: K, value: Any, depNum: Int) { + val updateFunction: (Boolean, Seq[ArrayBuffer[Any]]) => Seq[ArrayBuffer[Any]] = + (hadVal, oldVal) => { + var newVal = oldVal + if (!hadVal){ + newVal = Array.fill(numRdds)(new ArrayBuffer[Any]) + } + newVal(depNum) += value + newVal + } + map.changeValue(key, updateFunction) + } + + println("About to construct CoGroupedRDD iterator!") + val theIterator = map.iterator + println("Returning CoGroupedRDD iterator!") + new InterruptibleIterator(context, theIterator) } override def clearDependencies() { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 48168e1..6849703 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -85,10 +85,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { + println("Partitioner is some partitioner! In fact, it is " + self.partitioner.toString()) self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter)) }, preservesPartitioning = true) } else if (mapSideCombine) { + println("Otherwise, combining on map side.") val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializerClass) @@ -96,6 +98,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter)) }, preservesPartitioning = true) } else { + println("Else. No combining on map side!") // Don't apply map-side combiner. // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) @@ -647,6 +650,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { + println("SAVE AS HADOOP DATASET") val outputFormatClass = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass @@ -666,6 +670,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { + println("WRITE TO FILE") // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -673,13 +678,17 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() + println("START LOOP\n\n\n") var count = 0 while(iter.hasNext) { + println("Before next()") val record = iter.next() count += 1 + println("Before write. Record = "+record) writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + println("After write. Record = "+record) } - + println("ALL DONE! Woohoo.") writer.close() writer.commit() } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/97fbb3ec/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala new file mode 100644 index 0000000..28a3b7e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ExternalAppendOnlyMap.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable + +/** + * A simple map that spills sorted content to disk when the memory threshold is exceeded. A combiner + * function must be specified to merge values back into memory during read. + */ +class ExternalAppendOnlyMap[K,V](combinerFunction: (V, V) => V, + memoryThresholdMB: Int = 1024) + extends Iterable[(K,V)] with Serializable { + + var currentMap = new AppendOnlyMap[K,V] + var oldMaps = new ArrayBuffer[DiskKVIterator] + + def changeValue(key: K, updateFunc: (Boolean, V) => V): Unit = { + currentMap.changeValue(key, updateFunc) + val mapSize = SizeEstimator.estimate(currentMap) + //if (mapSize > memoryThresholdMB * math.pow(1024, 2)) { + if (mapSize > 1024 * 10) { + spill() + } + } + + def spill(): Unit = { + println("SPILL") + val file = File.createTempFile("external_append_only_map", "") // Add spill location + val out = new ObjectOutputStream(new FileOutputStream(file)) + val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) + sortedMap foreach { + out.writeObject( _ ) + } + out.close() + currentMap = new AppendOnlyMap[K,V] + oldMaps.append(new DiskKVIterator(file)) + } + + override def iterator: Iterator[(K,V)] = new ExternalIterator() + + /** + * An iterator that merges KV pairs from memory and disk in sorted order + */ + class ExternalIterator extends Iterator[(K, V)] { + + // Order by increasing key hash value + implicit object KVOrdering extends Ordering[KVITuple] { + def compare(a:KVITuple, b:KVITuple) = -a.key.hashCode().compareTo(b.key.hashCode()) + } + val pq = mutable.PriorityQueue[KVITuple]() + val inputStreams = Seq(new MemoryKVIterator(currentMap)) ++ oldMaps + inputStreams foreach { readFromIterator } + + override def hasNext: Boolean = !pq.isEmpty + + override def next(): (K,V) = { + println("ExternalIterator.next - How many left? "+pq.length) + val minKVI = pq.dequeue() + var (minKey, minValue, minIter) = (minKVI.key, minKVI.value, minKVI.iter) +// println("Min key = "+minKey) + readFromIterator(minIter) + while (!pq.isEmpty && pq.head.key == minKey) { + val newKVI = pq.dequeue() + val (newValue, newIter) = (newKVI.value, newKVI.iter) +// println("\tfound new value to merge! "+newValue) +// println("\tcombinerFunction("+minValue+" <====> "+newValue+")") + minValue = combinerFunction(minValue, newValue) +// println("\tCombine complete! New value = "+minValue) + readFromIterator(newIter) + } + println("Returning minKey = "+minKey+", minValue = "+minValue) + (minKey, minValue) + } + + def readFromIterator(iter: Iterator[(K,V)]): Unit = { + if (iter.hasNext) { + val (k, v) = iter.next() + pq.enqueue(KVITuple(k, v, iter)) + } + } + + case class KVITuple(key:K, value:V, iter:Iterator[(K,V)]) + } + + class MemoryKVIterator(map: AppendOnlyMap[K,V]) extends Iterator[(K,V)] { + val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode()) + val it = sortedMap.iterator + override def hasNext: Boolean = it.hasNext + override def next(): (K,V) = it.next() + } + + class DiskKVIterator(file: File) extends Iterator[(K,V)] { + val in = new ObjectInputStream(new FileInputStream(file)) + var nextItem:(K,V) = _ + var eof = false + + override def hasNext: Boolean = { + if (eof) { + return false + } + try { + nextItem = in.readObject().asInstanceOf[(K,V)] + } catch { + case e: EOFException => + eof = true + return false + } + true + } + + override def next(): (K,V) = { + if (eof) { + throw new NoSuchElementException + } + nextItem + } + } +}