Merge branch 'master' into wip-scala-2.10

Conflicts:
        core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
        core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
        
core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
        core/src/main/scala/org/apache/spark/rdd/RDD.scala
        python/pyspark/rdd.py


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/17987778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/17987778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/17987778

Branch: refs/heads/master
Commit: 17987778daac140027b7a01c0ec22f0b3e4f3b83
Parents: 54862af fb6875d
Author: Prashant Sharma <[email protected]>
Authored: Wed Nov 27 14:44:12 2013 +0530
Committer: Prashant Sharma <[email protected]>
Committed: Wed Nov 27 14:44:12 2013 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  25 ++
 .../apache/spark/api/java/JavaDoubleRDD.scala   |  40 +++
 .../org/apache/spark/api/python/PythonRDD.scala | 149 +++------
 .../org/apache/spark/executor/TaskMetrics.scala |  23 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 126 ++++++++
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |  11 +-
 .../spark/rdd/MapPartitionsWithContextRDD.scala |  42 ---
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  43 ++-
 .../apache/spark/scheduler/DAGScheduler.scala   |  15 +
 .../org/apache/spark/scheduler/StageInfo.scala  |   1 +
 .../org/apache/spark/scheduler/TaskInfo.scala   |   2 +
 .../cluster/ClusterTaskSetManager.scala         |   1 +
 .../spark/util/collection/OpenHashSet.scala     | 107 ++++---
 .../org/apache/spark/CheckpointSuite.scala      |   2 -
 .../scala/org/apache/spark/JavaAPISuite.java    |  14 +
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 271 +++++++++++++++++
 .../apache/spark/scheduler/JobLoggerSuite.scala |   7 +-
 .../util/collection/OpenHashMapSuite.scala      |  16 +-
 .../util/collection/OpenHashSetSuite.scala      |  20 +-
 .../PrimitiveKeyOpenHashMapSuite.scala          | 102 +++++++
 .../PrimitiveKeyOpenHashSetSuite.scala          |  90 ------
 docs/running-on-yarn.md                         |  27 +-
 docs/tuning.md                                  |   3 +-
 python/epydoc.conf                              |   2 +-
 python/pyspark/accumulators.py                  |   6 +-
 python/pyspark/context.py                       |  71 +++--
 python/pyspark/rdd.py                           |  97 +++---
 python/pyspark/serializers.py                   | 301 ++++++++++++++++---
 python/pyspark/tests.py                         |   3 +-
 python/pyspark/worker.py                        |  44 ++-
 python/run-tests                                |   1 +
 .../org/apache/spark/deploy/yarn/Client.scala   |  13 +-
 .../spark/deploy/yarn/ClientArguments.scala     |  40 +--
 .../spark/deploy/yarn/WorkerLauncher.scala      | 246 +++++++++++++++
 .../cluster/YarnClientClusterScheduler.scala    |  47 +++
 .../cluster/YarnClientSchedulerBackend.scala    | 109 +++++++
 36 files changed, 1619 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index e5e20db,9f02a9b..da30cf6
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@@ -29,9 -26,11 +29,11 @@@ import org.apache.spark.storage.Storage
  import java.lang.Double
  import org.apache.spark.Partitioner
  
+ import scala.collection.JavaConverters._
+ 
  class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, 
JavaDoubleRDD] {
  
 -  override val classManifest: ClassManifest[Double] = 
implicitly[ClassManifest[Double]]
 +  override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]]
  
    override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 53b53df,132e4fb..2bf7ac2
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@@ -28,12 -27,12 +28,11 @@@ import org.apache.spark.api.java.{JavaS
  import org.apache.spark.broadcast.Broadcast
  import org.apache.spark._
  import org.apache.spark.rdd.RDD
- import org.apache.spark.rdd.PipedRDD
  import org.apache.spark.util.Utils
  
 -
 -private[spark] class PythonRDD[T: ClassManifest](
 +private[spark] class PythonRDD[T: ClassTag](
      parent: RDD[T],
-     command: Seq[String],
+     command: Array[Byte],
      envVars: JMap[String, String],
      pythonIncludes: JList[String],
      preservePartitoning: Boolean,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index cdb5946,ae70d55..db15baf
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@@ -18,13 -18,11 +18,11 @@@
  package org.apache.spark.rdd
  
  import org.apache.spark.{Partition, TaskContext}
 +import scala.reflect.ClassTag
  
--
- private[spark]
- class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 -private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
++private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
      prev: RDD[T],
-     f: Iterator[T] => Iterator[U],
+     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
      preservesPartitioning: Boolean = false)
    extends RDD[U](prev) {
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/RDD.scala
index da18d45,5b12853..f80d3d6
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@@ -443,29 -439,31 +442,31 @@@ abstract class RDD[T: ClassTag]
    /**
     * Return a new RDD by applying a function to each partition of this RDD.
     */
-   def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
-     preservesPartitioning: Boolean = false): RDD[U] = {
-     new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
 -  def mapPartitions[U: ClassManifest](
++  def mapPartitions[U: ClassTag](
+       f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): 
RDD[U] = {
+     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
f(iter)
+     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
    }
  
    /**
     * Return a new RDD by applying a function to each partition of this RDD, 
while tracking the index
     * of the original partition.
     */
 -  def mapPartitionsWithIndex[U: ClassManifest](
 +  def mapPartitionsWithIndex[U: ClassTag](
        f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = 
false): RDD[U] = {
-     val func = (context: TaskContext, iter: Iterator[T]) => 
f(context.partitionId, iter)
-     new MapPartitionsWithContextRDD(this, sc.clean(func), 
preservesPartitioning)
+     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
f(index, iter)
+     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
    }
  
    /**
     * Return a new RDD by applying a function to each partition of this RDD. 
This is a variant of
     * mapPartitions that also passes the TaskContext into the closure.
     */
 -  def mapPartitionsWithContext[U: ClassManifest](
 +  def mapPartitionsWithContext[U: ClassTag](
        f: (TaskContext, Iterator[T]) => Iterator[U],
        preservesPartitioning: Boolean = false): RDD[U] = {
-     new MapPartitionsWithContextRDD(this, sc.clean(f), preservesPartitioning)
+     val func = (context: TaskContext, index: Int, iter: Iterator[T]) => 
f(context, iter)
+     new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
    }
  
    /**
@@@ -483,14 -481,13 +484,13 @@@
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
 -  def mapWith[A: ClassManifest, U: ClassManifest]
 +  def mapWith[A: ClassTag, U: ClassTag]
        (constructA: Int => A, preservesPartitioning: Boolean = false)
        (f: (T, A) => U): RDD[U] = {
-     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
-       val a = constructA(context.partitionId)
+     mapPartitionsWithIndex((index, iter) => {
+       val a = constructA(index)
        iter.map(t => f(t, a))
-     }
-     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), 
preservesPartitioning)
+     }, preservesPartitioning)
    }
  
    /**
@@@ -498,14 -495,13 +498,13 @@@
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
 -  def flatMapWith[A: ClassManifest, U: ClassManifest]
 +  def flatMapWith[A: ClassTag, U: ClassTag]
        (constructA: Int => A, preservesPartitioning: Boolean = false)
        (f: (T, A) => Seq[U]): RDD[U] = {
-     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
-       val a = constructA(context.partitionId)
+     mapPartitionsWithIndex((index, iter) => {
+       val a = constructA(index)
        iter.flatMap(t => f(t, a))
-     }
-     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), 
preservesPartitioning)
+     }, preservesPartitioning)
    }
  
    /**
@@@ -513,12 -509,11 +512,11 @@@
     * This additional parameter is produced by constructA, which is called in 
each
     * partition with the index of that partition.
     */
 -  def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) {
 +  def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
-     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
-       val a = constructA(context.partitionId)
+     mapPartitionsWithIndex { (index, iter) =>
+       val a = constructA(index)
        iter.map(t => {f(t, a); t})
-     }
-     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true).foreach(_ 
=> {})
+     }.foreach(_ => {})
    }
  
    /**
@@@ -526,12 -521,11 +524,11 @@@
     * additional parameter is produced by constructA, which is called in each
     * partition with the index of that partition.
     */
 -  def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) => 
Boolean): RDD[T] = {
 +  def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): 
RDD[T] = {
-     def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
-       val a = constructA(context.partitionId)
+     mapPartitionsWithIndex((index, iter) => {
+       val a = constructA(index)
        iter.filter(t => p(t, a))
-     }
-     new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true)
+     }, preservesPartitioning = true)
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 773e9ec,4457525..201572d
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -112,10 -110,12 +112,13 @@@ class DAGScheduler
    // resubmit failed stages
    val POLL_TIMEOUT = 10L
  
+   // Warns the user if a stage contains a task with size greater than this 
value (in KB)
+   val TASK_SIZE_TO_WARN = 100
+ 
    private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new 
Actor {
      override def preStart() {
 -      context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, 
RESUBMIT_TIMEOUT milliseconds) {
 +      import context.dispatcher
 +      context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) {
          if (failed.size > 0) {
            resubmitFailedStages()
          }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --cc python/pyspark/rdd.py
index 245a132,957f3f8..d2cb5f1
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@@ -971,8 -981,9 +981,9 @@@ class PipelinedRDD(RDD)
          includes = ListConverter().convert(self.ctx._python_includes,
                                       self.ctx._gateway._gateway_client)
          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
-             pipe_command, env, includes, self.preservesPartitioning, 
self.ctx.pythonExec,
-             broadcast_vars, self.ctx._javaAccumulator, class_tag)
+             bytearray(pickled_command), env, includes, 
self.preservesPartitioning,
+             self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
 -            class_manifest)
++            class_tag)
          self._jrdd_val = python_rdd.asJavaRDD()
          return self._jrdd_val
  

Reply via email to