Merge branch 'master' into wip-scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithContextRDD.scala
core/src/main/scala/org/apache/spark/rdd/RDD.scala
python/pyspark/rdd.py
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/17987778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/17987778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/17987778
Branch: refs/heads/master
Commit: 17987778daac140027b7a01c0ec22f0b3e4f3b83
Parents: 54862af fb6875d
Author: Prashant Sharma <[email protected]>
Authored: Wed Nov 27 14:44:12 2013 +0530
Committer: Prashant Sharma <[email protected]>
Committed: Wed Nov 27 14:44:12 2013 +0530
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 25 ++
.../apache/spark/api/java/JavaDoubleRDD.scala | 40 +++
.../org/apache/spark/api/python/PythonRDD.scala | 149 +++------
.../org/apache/spark/executor/TaskMetrics.scala | 23 +-
.../apache/spark/rdd/DoubleRDDFunctions.scala | 126 ++++++++
.../org/apache/spark/rdd/MapPartitionsRDD.scala | 11 +-
.../spark/rdd/MapPartitionsWithContextRDD.scala | 42 ---
.../main/scala/org/apache/spark/rdd/RDD.scala | 43 ++-
.../apache/spark/scheduler/DAGScheduler.scala | 15 +
.../org/apache/spark/scheduler/StageInfo.scala | 1 +
.../org/apache/spark/scheduler/TaskInfo.scala | 2 +
.../cluster/ClusterTaskSetManager.scala | 1 +
.../spark/util/collection/OpenHashSet.scala | 107 ++++---
.../org/apache/spark/CheckpointSuite.scala | 2 -
.../scala/org/apache/spark/JavaAPISuite.java | 14 +
.../org/apache/spark/rdd/DoubleRDDSuite.scala | 271 +++++++++++++++++
.../apache/spark/scheduler/JobLoggerSuite.scala | 7 +-
.../util/collection/OpenHashMapSuite.scala | 16 +-
.../util/collection/OpenHashSetSuite.scala | 20 +-
.../PrimitiveKeyOpenHashMapSuite.scala | 102 +++++++
.../PrimitiveKeyOpenHashSetSuite.scala | 90 ------
docs/running-on-yarn.md | 27 +-
docs/tuning.md | 3 +-
python/epydoc.conf | 2 +-
python/pyspark/accumulators.py | 6 +-
python/pyspark/context.py | 71 +++--
python/pyspark/rdd.py | 97 +++---
python/pyspark/serializers.py | 301 ++++++++++++++++---
python/pyspark/tests.py | 3 +-
python/pyspark/worker.py | 44 ++-
python/run-tests | 1 +
.../org/apache/spark/deploy/yarn/Client.scala | 13 +-
.../spark/deploy/yarn/ClientArguments.scala | 40 +--
.../spark/deploy/yarn/WorkerLauncher.scala | 246 +++++++++++++++
.../cluster/YarnClientClusterScheduler.scala | 47 +++
.../cluster/YarnClientSchedulerBackend.scala | 109 +++++++
36 files changed, 1619 insertions(+), 498 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index e5e20db,9f02a9b..da30cf6
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@@ -29,9 -26,11 +29,11 @@@ import org.apache.spark.storage.Storage
import java.lang.Double
import org.apache.spark.Partitioner
+ import scala.collection.JavaConverters._
+
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double,
JavaDoubleRDD] {
- override val classManifest: ClassManifest[Double] =
implicitly[ClassManifest[Double]]
+ override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]]
override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 53b53df,132e4fb..2bf7ac2
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@@ -28,12 -27,12 +28,11 @@@ import org.apache.spark.api.java.{JavaS
import org.apache.spark.broadcast.Broadcast
import org.apache.spark._
import org.apache.spark.rdd.RDD
- import org.apache.spark.rdd.PipedRDD
import org.apache.spark.util.Utils
-
-private[spark] class PythonRDD[T: ClassManifest](
+private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
- command: Seq[String],
+ command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
preservePartitoning: Boolean,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index cdb5946,ae70d55..db15baf
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@@ -18,13 -18,11 +18,11 @@@
package org.apache.spark.rdd
import org.apache.spark.{Partition, TaskContext}
+import scala.reflect.ClassTag
--
- private[spark]
- class MapPartitionsRDD[U: ClassTag, T: ClassTag](
-private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
++private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
- f: Iterator[T] => Iterator[U],
+ f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext,
partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/rdd/RDD.scala
index da18d45,5b12853..f80d3d6
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@@ -443,29 -439,31 +442,31 @@@ abstract class RDD[T: ClassTag]
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] = {
- new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
- def mapPartitions[U: ClassManifest](
++ def mapPartitions[U: ClassTag](
+ f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false):
RDD[U] = {
+ val func = (context: TaskContext, index: Int, iter: Iterator[T]) =>
f(iter)
+ new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
/**
* Return a new RDD by applying a function to each partition of this RDD,
while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex[U: ClassManifest](
+ def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean =
false): RDD[U] = {
- val func = (context: TaskContext, iter: Iterator[T]) =>
f(context.partitionId, iter)
- new MapPartitionsWithContextRDD(this, sc.clean(func),
preservesPartitioning)
+ val func = (context: TaskContext, index: Int, iter: Iterator[T]) =>
f(index, iter)
+ new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
/**
* Return a new RDD by applying a function to each partition of this RDD.
This is a variant of
* mapPartitions that also passes the TaskContext into the closure.
*/
- def mapPartitionsWithContext[U: ClassManifest](
+ def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
- new MapPartitionsWithContextRDD(this, sc.clean(f), preservesPartitioning)
+ val func = (context: TaskContext, index: Int, iter: Iterator[T]) =>
f(context, iter)
+ new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
/**
@@@ -483,14 -481,13 +484,13 @@@
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def mapWith[A: ClassManifest, U: ClassManifest]
+ def mapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => U): RDD[U] = {
- def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
- val a = constructA(context.partitionId)
+ mapPartitionsWithIndex((index, iter) => {
+ val a = constructA(index)
iter.map(t => f(t, a))
- }
- new MapPartitionsWithContextRDD(this, sc.clean(iterF _),
preservesPartitioning)
+ }, preservesPartitioning)
}
/**
@@@ -498,14 -495,13 +498,13 @@@
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def flatMapWith[A: ClassManifest, U: ClassManifest]
+ def flatMapWith[A: ClassTag, U: ClassTag]
(constructA: Int => A, preservesPartitioning: Boolean = false)
(f: (T, A) => Seq[U]): RDD[U] = {
- def iterF(context: TaskContext, iter: Iterator[T]): Iterator[U] = {
- val a = constructA(context.partitionId)
+ mapPartitionsWithIndex((index, iter) => {
+ val a = constructA(index)
iter.flatMap(t => f(t, a))
- }
- new MapPartitionsWithContextRDD(this, sc.clean(iterF _),
preservesPartitioning)
+ }, preservesPartitioning)
}
/**
@@@ -513,12 -509,11 +512,11 @@@
* This additional parameter is produced by constructA, which is called in
each
* partition with the index of that partition.
*/
- def foreachWith[A: ClassManifest](constructA: Int => A)(f: (T, A) => Unit) {
+ def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) {
- def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
- val a = constructA(context.partitionId)
+ mapPartitionsWithIndex { (index, iter) =>
+ val a = constructA(index)
iter.map(t => {f(t, a); t})
- }
- new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true).foreach(_
=> {})
+ }.foreach(_ => {})
}
/**
@@@ -526,12 -521,11 +524,11 @@@
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
- def filterWith[A: ClassManifest](constructA: Int => A)(p: (T, A) =>
Boolean): RDD[T] = {
+ def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean):
RDD[T] = {
- def iterF(context: TaskContext, iter: Iterator[T]): Iterator[T] = {
- val a = constructA(context.partitionId)
+ mapPartitionsWithIndex((index, iter) => {
+ val a = constructA(index)
iter.filter(t => p(t, a))
- }
- new MapPartitionsWithContextRDD(this, sc.clean(iterF _), true)
+ }, preservesPartitioning = true)
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 773e9ec,4457525..201572d
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@@ -112,10 -110,12 +112,13 @@@ class DAGScheduler
// resubmit failed stages
val POLL_TIMEOUT = 10L
+ // Warns the user if a stage contains a task with size greater than this
value (in KB)
+ val TASK_SIZE_TO_WARN = 100
+
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new
Actor {
override def preStart() {
- context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds,
RESUBMIT_TIMEOUT milliseconds) {
+ import context.dispatcher
+ context.system.scheduler.schedule(RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT) {
if (failed.size > 0) {
resubmitFailedStages()
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/17987778/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --cc python/pyspark/rdd.py
index 245a132,957f3f8..d2cb5f1
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@@ -971,8 -981,9 +981,9 @@@ class PipelinedRDD(RDD)
includes = ListConverter().convert(self.ctx._python_includes,
self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
- pipe_command, env, includes, self.preservesPartitioning,
self.ctx.pythonExec,
- broadcast_vars, self.ctx._javaAccumulator, class_tag)
+ bytearray(pickled_command), env, includes,
self.preservesPartitioning,
+ self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
- class_manifest)
++ class_tag)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val