Merge branch 'master' of github.com:apache/incubator-spark into kill Conflicts: core/src/main/scala/org/apache/spark/CacheManager.scala core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dbae7795 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dbae7795 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dbae7795 Branch: refs/heads/master Commit: dbae7795ba489bfc1fedb88155bf42bb4992b006 Parents: 53895f9 320418f Author: Reynold Xin <r...@apache.org> Authored: Wed Oct 9 22:57:35 2013 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Wed Oct 9 22:57:35 2013 -0700 ---------------------------------------------------------------------- assembly/pom.xml | 18 +- bagel/pom.xml | 8 +- core/pom.xml | 15 +- .../scala/org/apache/spark/CacheManager.scala | 21 +- .../main/scala/org/apache/spark/FutureJob.scala | 126 ------------ .../scala/org/apache/spark/SparkContext.scala | 41 +++- .../spark/api/python/PythonPartitioner.scala | 10 +- .../org/apache/spark/api/python/PythonRDD.scala | 6 +- .../org/apache/spark/deploy/JsonProtocol.scala | 3 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 12 +- .../spark/deploy/master/ApplicationSource.scala | 2 +- .../spark/deploy/master/MasterSource.scala | 6 +- .../spark/deploy/worker/WorkerSource.scala | 10 +- .../apache/spark/executor/ExecutorSource.scala | 18 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 154 +++++++++++--- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../spark/scheduler/DAGSchedulerSource.scala | 10 +- .../spark/scheduler/SparkListenerBus.scala | 18 ++ .../spark/serializer/KryoSerializer.scala | 22 +- .../org/apache/spark/storage/BlockManager.scala | 12 +- .../spark/storage/BlockManagerSource.scala | 8 +- .../scala/org/apache/spark/util/Utils.scala | 13 ++ .../spark/scheduler/SparkListenerSuite.scala | 26 ++- .../spark/serializer/KryoSerializerSuite.scala | 21 ++ .../scala/org/apache/spark/ui/UISuite.scala | 7 +- .../org/apache/spark/util/UtilsSuite.scala | 11 + docs/_layouts/global.html | 4 +- docs/mllib-guide.md | 24 ++- docs/python-programming-guide.md | 2 +- docs/running-on-yarn.md | 1 + ec2/README | 2 +- ec2/spark_ec2.py | 4 +- examples/pom.xml | 30 ++- make-distribution.sh | 2 +- mllib/pom.xml | 8 +- .../apache/spark/mllib/recommendation/ALS.scala | 199 ++++++++++++++++--- .../mllib/recommendation/JavaALSSuite.java | 85 ++++++-- .../spark/mllib/recommendation/ALSSuite.scala | 75 +++++-- pom.xml | 7 +- project/SparkBuild.scala | 16 +- python/pyspark/rdd.py | 70 ++++++- python/pyspark/serializers.py | 4 + repl-bin/pom.xml | 10 +- repl/pom.xml | 20 +- streaming/pom.xml | 9 +- tools/pom.xml | 8 +- yarn/pom.xml | 6 +- .../org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/ClientArguments.scala | 6 + 49 files changed, 813 insertions(+), 381 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/CacheManager.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/CacheManager.scala index 00217b8,4cf7eb9..17e0f05 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@@ -32,12 -34,11 +34,11 @@@ private[spark] class CacheManager(block def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel) : Iterator[T] = { val key = "rdd_%d_%d".format(rdd.id, split.index) - logInfo("Cache key is " + key) + logDebug("Looking for partition " + key) blockManager.get(key) match { - case Some(cachedValues) => - // Partition is in cache, so just return its values - logInfo("Found partition in cache!") - return new InterruptibleIterator(context, cachedValues.asInstanceOf[Iterator[T]]) + case Some(values) => + // Partition is already materialized, so just return its values - return values.asInstanceOf[Iterator[T]] ++ return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => // Mark the split as loading (unless someone else marks it first) @@@ -55,9 -56,9 +56,9 @@@ // downside of the current code is that threads wait serially if this does happen. blockManager.get(key) match { case Some(values) => - return values.asInstanceOf[Iterator[T]] + return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) case None => - logInfo("Whoever was loading " + key + " failed; we'll try it ourselves") + logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key)) loading.add(key) } } else { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e08255a,d3b3fff..18c4c01 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@@ -26,18 -27,54 +27,53 @@@ import org.apache.hadoop.mapred.RecordR import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils - import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, - SparkContext, SparkEnv, TaskContext} -import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, - TaskContext} ++import org.apache.spark._ + import org.apache.spark.broadcast.Broadcast import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} + /** + * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file + * system, or S3). + * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same + * across multiple reads; the 'path' is the only variable that is different across new JobConfs + * created from the Configuration. + */ + class HadoopFileRDD[K, V]( - sc: SparkContext, - path: String, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) ++ sc: SparkContext, ++ path: String, ++ broadcastedConf: Broadcast[SerializableWritable[Configuration]], ++ inputFormatClass: Class[_ <: InputFormat[K, V]], ++ keyClass: Class[K], ++ valueClass: Class[V], ++ minSplits: Int) + extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { + + override def getJobConf(): JobConf = { + if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a new JobConf, set the input file/directory paths to read from, and cache the + // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through + // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple + // getJobConf() calls for this RDD in the local process. + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + FileInputFormat.setInputPaths(newJobConf, path) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } + } /** * A Spark split class that wraps around a Hadoop InputSplit. */ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit) extends Partition { -- ++ val inputSplit = new SerializableWritable[InputSplit](s) override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt @@@ -46,29 -83,80 +82,80 @@@ } /** - * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file - * system, or S3, tables in HBase, etc). + * A base class that provides core functionality for reading data partitions stored in Hadoop. */ class HadoopRDD[K, V]( -- sc: SparkContext, - @transient conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) ++ sc: SparkContext, ++ broadcastedConf: Broadcast[SerializableWritable[Configuration]], ++ inputFormatClass: Class[_ <: InputFormat[K, V]], ++ keyClass: Class[K], ++ valueClass: Class[V], ++ minSplits: Int) extends RDD[(K, V)](sc, Nil) with Logging { - // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + def this( - sc: SparkContext, - conf: JobConf, - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) = { ++ sc: SparkContext, ++ conf: JobConf, ++ inputFormatClass: Class[_ <: InputFormat[K, V]], ++ keyClass: Class[K], ++ valueClass: Class[V], ++ minSplits: Int) = { + this( + sc, + sc.broadcast(new SerializableWritable(conf)) + .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + inputFormatClass, + keyClass, + valueClass, + minSplits) + } + + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) + + protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) + + // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. + protected def getJobConf(): JobConf = { + val conf: Configuration = broadcastedConf.value.value + if (conf.isInstanceOf[JobConf]) { + // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. + return conf.asInstanceOf[JobConf] + } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { + // getJobConf() has been called previously, so there is already a local cache of the JobConf + // needed by this RDD. + return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] + } else { + // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the + // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). + // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. + val newJobConf = new JobConf(broadcastedConf.value.value) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + return newJobConf + } + } + + protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { + if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) { + return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]] + } + // Once an InputFormat for this RDD is created, cache it so that only one reflection call is + // done in each local process. + val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[K, V]] + if (newInputFormat.isInstanceOf[Configurable]) { + newInputFormat.asInstanceOf[Configurable].setConf(conf) + } + HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat) + return newInputFormat + } override def getPartitions: Array[Partition] = { - val env = SparkEnv.get - env.hadoop.addCredentials(conf) - val inputFormat = createInputFormat(conf) + val jobConf = getJobConf() + val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(conf) + inputFormat.asInstanceOf[Configurable].setConf(jobConf) } - val inputSplits = inputFormat.getSplits(conf, minSplits) + val inputSplits = inputFormat.getSplits(jobConf, minSplits) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) @@@ -76,49 -164,38 +163,41 @@@ array } - def createInputFormat(conf: JobConf): InputFormat[K, V] = { - ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) - .asInstanceOf[InputFormat[K, V]] - } - - override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] { - val split = theSplit.asInstanceOf[HadoopPartition] - logInfo("Input split: " + split.inputSplit) - var reader: RecordReader[K, V] = null - - val jobConf = getJobConf() - val inputFormat = getInputFormat(jobConf) - reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) - - // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback{ () => closeIfNeeded() } - - val key: K = reader.createKey() - val value: V = reader.createValue() - - override def getNext() = { - try { - finished = !reader.next(key, value) - } catch { - case eof: EOFException => - finished = true + override def compute(theSplit: Partition, context: TaskContext) = { + val iter = new NextIterator[(K, V)] { + val split = theSplit.asInstanceOf[HadoopPartition] + logInfo("Input split: " + split.inputSplit) + var reader: RecordReader[K, V] = null + - val conf = confBroadcast.value.value - val fmt = createInputFormat(conf) - if (fmt.isInstanceOf[Configurable]) { - fmt.asInstanceOf[Configurable].setConf(conf) - } - reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) ++ val jobConf = getJobConf() ++ val inputFormat = getInputFormat(jobConf) ++ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) + + // Register an on-task-completion callback to close the input stream. + context.addOnCompleteCallback{ () => closeIfNeeded() } + + val key: K = reader.createKey() + val value: V = reader.createValue() + + override def getNext() = { + try { + finished = !reader.next(key, value) + } catch { + case eof: EOFException => + finished = true + } + (key, value) } - (key, value) - } - override def close() { - try { - reader.close() - } catch { - case e: Exception => logWarning("Exception in RecordReader.close()", e) + override def close() { + try { + reader.close() + } catch { + case e: Exception => logWarning("Exception in RecordReader.close()", e) + } } } - new InterruptibleIterator(context, iter) ++ new InterruptibleIterator[(K, V)](context, iter) } override def getPreferredLocations(split: Partition): Seq[String] = { @@@ -131,5 -208,18 +210,18 @@@ // Do nothing. Hadoop RDD should not be checkpointed. } - def getConf: Configuration = confBroadcast.value.value + def getConf: Configuration = getJobConf() } + + private[spark] object HadoopRDD { + /** + * The three methods below are helpers for accessing the local map, a property of the SparkEnv of + * the local process. + */ + def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key) + + def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key) + + def putCachedMetadata(key: String, value: Any) = + SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value) -} ++} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala ---------------------------------------------------------------------- diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 9fe7002,1515148..7b5c0e2 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@@ -39,11 -39,11 +39,11 @@@ private[spark] class DAGSchedulerSource override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] { - override def getValue: Int = dagScheduler.nextJobId.get() + override def getValue: Int = dagScheduler.numTotalJobs }) - metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) }