Merge branch 'master' of github.com:apache/incubator-spark into kill

Conflicts:
        core/src/main/scala/org/apache/spark/CacheManager.scala
        core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
        core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/dbae7795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/dbae7795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/dbae7795

Branch: refs/heads/master
Commit: dbae7795ba489bfc1fedb88155bf42bb4992b006
Parents: 53895f9 320418f
Author: Reynold Xin <r...@apache.org>
Authored: Wed Oct 9 22:57:35 2013 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Wed Oct 9 22:57:35 2013 -0700

----------------------------------------------------------------------
 assembly/pom.xml                                |  18 +-
 bagel/pom.xml                                   |   8 +-
 core/pom.xml                                    |  15 +-
 .../scala/org/apache/spark/CacheManager.scala   |  21 +-
 .../main/scala/org/apache/spark/FutureJob.scala | 126 ------------
 .../scala/org/apache/spark/SparkContext.scala   |  41 +++-
 .../spark/api/python/PythonPartitioner.scala    |  10 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   6 +-
 .../org/apache/spark/deploy/JsonProtocol.scala  |   3 +-
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  12 +-
 .../spark/deploy/master/ApplicationSource.scala |   2 +-
 .../spark/deploy/master/MasterSource.scala      |   6 +-
 .../spark/deploy/worker/WorkerSource.scala      |  10 +-
 .../apache/spark/executor/ExecutorSource.scala  |  18 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  | 154 +++++++++++---
 .../apache/spark/scheduler/DAGScheduler.scala   |   2 +-
 .../spark/scheduler/DAGSchedulerSource.scala    |  10 +-
 .../spark/scheduler/SparkListenerBus.scala      |  18 ++
 .../spark/serializer/KryoSerializer.scala       |  22 +-
 .../org/apache/spark/storage/BlockManager.scala |  12 +-
 .../spark/storage/BlockManagerSource.scala      |   8 +-
 .../scala/org/apache/spark/util/Utils.scala     |  13 ++
 .../spark/scheduler/SparkListenerSuite.scala    |  26 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  21 ++
 .../scala/org/apache/spark/ui/UISuite.scala     |   7 +-
 .../org/apache/spark/util/UtilsSuite.scala      |  11 +
 docs/_layouts/global.html                       |   4 +-
 docs/mllib-guide.md                             |  24 ++-
 docs/python-programming-guide.md                |   2 +-
 docs/running-on-yarn.md                         |   1 +
 ec2/README                                      |   2 +-
 ec2/spark_ec2.py                                |   4 +-
 examples/pom.xml                                |  30 ++-
 make-distribution.sh                            |   2 +-
 mllib/pom.xml                                   |   8 +-
 .../apache/spark/mllib/recommendation/ALS.scala | 199 ++++++++++++++++---
 .../mllib/recommendation/JavaALSSuite.java      |  85 ++++++--
 .../spark/mllib/recommendation/ALSSuite.scala   |  75 +++++--
 pom.xml                                         |   7 +-
 project/SparkBuild.scala                        |  16 +-
 python/pyspark/rdd.py                           |  70 ++++++-
 python/pyspark/serializers.py                   |   4 +
 repl-bin/pom.xml                                |  10 +-
 repl/pom.xml                                    |  20 +-
 streaming/pom.xml                               |   9 +-
 tools/pom.xml                                   |   8 +-
 yarn/pom.xml                                    |   6 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |   2 +-
 .../spark/deploy/yarn/ClientArguments.scala     |   6 +
 49 files changed, 813 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/CacheManager.scala
index 00217b8,4cf7eb9..17e0f05
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@@ -32,12 -34,11 +34,11 @@@ private[spark] class CacheManager(block
    def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, 
storageLevel: StorageLevel)
        : Iterator[T] = {
      val key = "rdd_%d_%d".format(rdd.id, split.index)
-     logInfo("Cache key is " + key)
+     logDebug("Looking for partition " + key)
      blockManager.get(key) match {
-       case Some(cachedValues) =>
-         // Partition is in cache, so just return its values
-         logInfo("Found partition in cache!")
-         return new InterruptibleIterator(context, 
cachedValues.asInstanceOf[Iterator[T]])
+       case Some(values) =>
+         // Partition is already materialized, so just return its values
 -        return values.asInstanceOf[Iterator[T]]
++        return new InterruptibleIterator(context, 
values.asInstanceOf[Iterator[T]])
  
        case None =>
          // Mark the split as loading (unless someone else marks it first)
@@@ -55,9 -56,9 +56,9 @@@
              // downside of the current code is that threads wait serially if 
this does happen.
              blockManager.get(key) match {
                case Some(values) =>
 -                return values.asInstanceOf[Iterator[T]]
 +                return new InterruptibleIterator(context, 
values.asInstanceOf[Iterator[T]])
                case None =>
-                 logInfo("Whoever was loading " + key + " failed; we'll try it 
ourselves")
+                 logInfo("Whoever was loading %s failed; we'll try it 
ourselves".format(key))
                  loading.add(key)
              }
            } else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e08255a,d3b3fff..18c4c01
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@@ -26,18 -27,54 +27,53 @@@ import org.apache.hadoop.mapred.RecordR
  import org.apache.hadoop.mapred.Reporter
  import org.apache.hadoop.util.ReflectionUtils
  
- import org.apache.spark.{InterruptibleIterator, Logging, Partition, 
SerializableWritable,
-   SparkContext, SparkEnv, TaskContext}
 -import org.apache.spark.{Logging, Partition, SerializableWritable, 
SparkContext, SparkEnv,
 -  TaskContext}
++import org.apache.spark._
+ import org.apache.spark.broadcast.Broadcast
  import org.apache.spark.util.NextIterator
  import org.apache.hadoop.conf.{Configuration, Configurable}
  
+ /**
+  * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in 
HDFS, the local file
+  * system, or S3).
+  * This accepts a general, broadcasted Hadoop Configuration because those 
tend to remain the same
+  * across multiple reads; the 'path' is the only variable that is different 
across new JobConfs
+  * created from the Configuration.
+  */
+ class HadoopFileRDD[K, V](
 -    sc: SparkContext,
 -    path: String,
 -    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
 -    inputFormatClass: Class[_ <: InputFormat[K, V]],
 -    keyClass: Class[K],
 -    valueClass: Class[V],
 -    minSplits: Int)
++                           sc: SparkContext,
++                           path: String,
++                           broadcastedConf: 
Broadcast[SerializableWritable[Configuration]],
++                           inputFormatClass: Class[_ <: InputFormat[K, V]],
++                           keyClass: Class[K],
++                           valueClass: Class[V],
++                           minSplits: Int)
+   extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, 
valueClass, minSplits) {
+ 
+   override def getJobConf(): JobConf = {
+     if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+       // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+       // needed by this RDD.
+       return 
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+     } else {
+       // Create a new JobConf, set the input file/directory paths to read 
from, and cache the
+       // JobConf (i.e., in a shared hash map in the slave's JVM process 
that's accessible through
+       // HadoopRDD.putCachedMetadata()), so that we only create one copy 
across multiple
+       // getJobConf() calls for this RDD in the local process.
+       // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+       val newJobConf = new JobConf(broadcastedConf.value.value)
+       FileInputFormat.setInputPaths(newJobConf, path)
+       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+       return newJobConf
+     }
+   }
+ }
  
  /**
   * A Spark split class that wraps around a Hadoop InputSplit.
   */
  private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: 
InputSplit)
    extends Partition {
--  
++
    val inputSplit = new SerializableWritable[InputSplit](s)
  
    override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
@@@ -46,29 -83,80 +82,80 @@@
  }
  
  /**
-  * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files 
in HDFS, the local file
-  * system, or S3, tables in HBase, etc).
+  * A base class that provides core functionality for reading data partitions 
stored in Hadoop.
   */
  class HadoopRDD[K, V](
--    sc: SparkContext,
-     @transient conf: JobConf,
-     inputFormatClass: Class[_ <: InputFormat[K, V]],
-     keyClass: Class[K],
-     valueClass: Class[V],
-     minSplits: Int)
 -    broadcastedConf: Broadcast[SerializableWritable[Configuration]],
 -    inputFormatClass: Class[_ <: InputFormat[K, V]],
 -    keyClass: Class[K],
 -    valueClass: Class[V],
 -    minSplits: Int)
++                       sc: SparkContext,
++                       broadcastedConf: 
Broadcast[SerializableWritable[Configuration]],
++                       inputFormatClass: Class[_ <: InputFormat[K, V]],
++                       keyClass: Class[K],
++                       valueClass: Class[V],
++                       minSplits: Int)
    extends RDD[(K, V)](sc, Nil) with Logging {
  
-   // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-   private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+   def this(
 -      sc: SparkContext,
 -      conf: JobConf,
 -      inputFormatClass: Class[_ <: InputFormat[K, V]],
 -      keyClass: Class[K],
 -      valueClass: Class[V],
 -      minSplits: Int) = {
++            sc: SparkContext,
++            conf: JobConf,
++            inputFormatClass: Class[_ <: InputFormat[K, V]],
++            keyClass: Class[K],
++            valueClass: Class[V],
++            minSplits: Int) = {
+     this(
+       sc,
+       sc.broadcast(new SerializableWritable(conf))
+         .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+       inputFormatClass,
+       keyClass,
+       valueClass,
+       minSplits)
+   }
+ 
+   protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+ 
+   protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+ 
+   // Returns a JobConf that will be used on slaves to obtain input splits for 
Hadoop reads.
+   protected def getJobConf(): JobConf = {
+     val conf: Configuration = broadcastedConf.value.value
+     if (conf.isInstanceOf[JobConf]) {
+       // A user-broadcasted JobConf was provided to the HadoopRDD, so always 
use it.
+       return conf.asInstanceOf[JobConf]
+     } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+       // getJobConf() has been called previously, so there is already a local 
cache of the JobConf
+       // needed by this RDD.
+       return 
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+     } else {
+       // Create a JobConf that will be cached and used across this RDD's 
getJobConf() calls in the
+       // local process. The local cache is accessed through 
HadoopRDD.putCachedMetadata().
+       // The caching helps minimize GC, since a JobConf can contain ~10KB of 
temporary objects.
+       val newJobConf = new JobConf(broadcastedConf.value.value)
+       HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+       return newJobConf
+     }
+   }
+ 
+   protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+     if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+       return 
HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+     }
+     // Once an InputFormat for this RDD is created, cache it so that only one 
reflection call is
+     // done in each local process.
+     val newInputFormat = 
ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+       .asInstanceOf[InputFormat[K, V]]
+     if (newInputFormat.isInstanceOf[Configurable]) {
+       newInputFormat.asInstanceOf[Configurable].setConf(conf)
+     }
+     HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+     return newInputFormat
+   }
  
    override def getPartitions: Array[Partition] = {
-     val env = SparkEnv.get
-     env.hadoop.addCredentials(conf)
-     val inputFormat = createInputFormat(conf)
+     val jobConf = getJobConf()
+     val inputFormat = getInputFormat(jobConf)
      if (inputFormat.isInstanceOf[Configurable]) {
-       inputFormat.asInstanceOf[Configurable].setConf(conf)
+       inputFormat.asInstanceOf[Configurable].setConf(jobConf)
      }
-     val inputSplits = inputFormat.getSplits(conf, minSplits)
+     val inputSplits = inputFormat.getSplits(jobConf, minSplits)
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
@@@ -76,49 -164,38 +163,41 @@@
      array
    }
  
-   def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-     ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-       .asInstanceOf[InputFormat[K, V]]
-   }
- 
 -  override def compute(theSplit: Partition, context: TaskContext) = new 
NextIterator[(K, V)] {
 -    val split = theSplit.asInstanceOf[HadoopPartition]
 -    logInfo("Input split: " + split.inputSplit)
 -    var reader: RecordReader[K, V] = null
 -
 -    val jobConf = getJobConf()
 -    val inputFormat = getInputFormat(jobConf)
 -    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 -
 -    // Register an on-task-completion callback to close the input stream.
 -    context.addOnCompleteCallback{ () => closeIfNeeded() }
 -
 -    val key: K = reader.createKey()
 -    val value: V = reader.createValue()
 -
 -    override def getNext() = {
 -      try {
 -        finished = !reader.next(key, value)
 -      } catch {
 -        case eof: EOFException =>
 -          finished = true
 +  override def compute(theSplit: Partition, context: TaskContext) = {
 +    val iter = new NextIterator[(K, V)] {
 +      val split = theSplit.asInstanceOf[HadoopPartition]
 +      logInfo("Input split: " + split.inputSplit)
 +      var reader: RecordReader[K, V] = null
 +
-       val conf = confBroadcast.value.value
-       val fmt = createInputFormat(conf)
-       if (fmt.isInstanceOf[Configurable]) {
-         fmt.asInstanceOf[Configurable].setConf(conf)
-       }
-       reader = fmt.getRecordReader(split.inputSplit.value, conf, 
Reporter.NULL)
++      val jobConf = getJobConf()
++      val inputFormat = getInputFormat(jobConf)
++      reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, 
Reporter.NULL)
 +
 +      // Register an on-task-completion callback to close the input stream.
 +      context.addOnCompleteCallback{ () => closeIfNeeded() }
 +
 +      val key: K = reader.createKey()
 +      val value: V = reader.createValue()
 +
 +      override def getNext() = {
 +        try {
 +          finished = !reader.next(key, value)
 +        } catch {
 +          case eof: EOFException =>
 +            finished = true
 +        }
 +        (key, value)
        }
 -      (key, value)
 -    }
  
 -    override def close() {
 -      try {
 -        reader.close()
 -      } catch {
 -        case e: Exception => logWarning("Exception in RecordReader.close()", 
e)
 +      override def close() {
 +        try {
 +          reader.close()
 +        } catch {
 +          case e: Exception => logWarning("Exception in 
RecordReader.close()", e)
 +        }
        }
      }
-     new InterruptibleIterator(context, iter)
++    new InterruptibleIterator[(K, V)](context, iter)
    }
  
    override def getPreferredLocations(split: Partition): Seq[String] = {
@@@ -131,5 -208,18 +210,18 @@@
      // Do nothing. Hadoop RDD should not be checkpointed.
    }
  
-   def getConf: Configuration = confBroadcast.value.value
+   def getConf: Configuration = getJobConf()
  }
+ 
+ private[spark] object HadoopRDD {
+   /**
+    * The three methods below are helpers for accessing the local map, a 
property of the SparkEnv of
+    * the local process.
+    */
+   def getCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+ 
+   def containsCachedMetadata(key: String) = 
SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+ 
+   def putCachedMetadata(key: String, value: Any) =
+     SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
 -}
++}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/dbae7795/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --cc 
core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 9fe7002,1515148..7b5c0e2
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@@ -39,11 -39,11 +39,11 @@@ private[spark] class DAGSchedulerSource
      override def getValue: Int = dagScheduler.waiting.size
    })
  
-   metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), 
new Gauge[Int] {
+   metricRegistry.register(MetricRegistry.name("job", "allJobs"), new 
Gauge[Int] {
 -    override def getValue: Int = dagScheduler.nextJobId.get()
 +    override def getValue: Int = dagScheduler.numTotalJobs
    })
  
-   metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), 
new Gauge[Int] {
+   metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new 
Gauge[Int] {
      override def getValue: Int = dagScheduler.activeJobs.size
    })
  }

Reply via email to