Various merge corrections I've diff'd this patch against my own -- since they were both created independently, this means that two sets of eyes have gone over all the merge conflicts that were created, so I'm feeling significantly more confident in the resulting PR.
@rxin has looked at the changes to the repl and is resoundingly confident that they are correct. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f629ba95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f629ba95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f629ba95 Branch: refs/heads/master Commit: f629ba95b6a1a3508463bfdcb03efcfaa3327cb5 Parents: d4cd323 Author: Aaron Davidson <[email protected]> Authored: Thu Nov 14 22:13:09 2013 -0800 Committer: Aaron Davidson <[email protected]> Committed: Thu Nov 14 22:13:09 2013 -0800 ---------------------------------------------------------------------- .../spark/api/java/function/Function.java | 2 - .../spark/api/java/function/Function2.java | 2 - .../org/apache/spark/deploy/client/Client.scala | 4 +- .../spark/deploy/master/ApplicationState.scala | 3 +- .../org/apache/spark/deploy/master/Master.scala | 14 +---- .../org/apache/spark/deploy/worker/Worker.scala | 17 +----- .../executor/CoarseGrainedExecutorBackend.scala | 1 - .../org/apache/spark/executor/Executor.scala | 6 +- .../org/apache/spark/rdd/AsyncRDDActions.scala | 2 +- .../scala/org/apache/spark/rdd/BlockRDD.scala | 3 +- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 7 ++- .../scheduler/cluster/ClusterScheduler.scala | 2 +- .../mesos/CoarseMesosSchedulerBackend.scala | 1 + .../scala/org/apache/spark/util/Utils.scala | 5 +- .../spark/util/collection/OpenHashMap.scala | 1 - docs/hadoop-third-party-distributions.md | 4 +- project/SparkBuild.scala | 11 +--- .../org/apache/spark/repl/SparkILoop.scala | 14 +---- .../scala/org/apache/spark/repl/ReplSuite.scala | 2 +- .../spark/streaming/NetworkInputTracker.scala | 2 +- .../spark/streaming/PairDStreamFunctions.scala | 1 - .../streaming/api/java/JavaPairDStream.scala | 2 +- .../streaming/dstream/CoGroupedDStream.scala | 59 -------------------- .../streaming/dstream/KafkaInputDStream.scala | 1 - 25 files changed, 33 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/api/java/function/Function.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java index 49e661a..537439e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java @@ -29,8 +29,6 @@ import java.io.Serializable; * when mapping RDDs of other types. */ public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable { - public abstract R call(T t) throws Exception; - public ClassTag<R> returnType() { return ClassTag$.MODULE$.apply(Object.class); } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/api/java/function/Function2.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java index cf77bb6..a2d1214 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java @@ -28,8 +28,6 @@ import java.io.Serializable; public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R> implements Serializable { - public abstract R call(T1 t1, T2 t2) throws Exception; - public ClassTag<R> returnType() { return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class); } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/client/Client.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 3953a3e..572fc34 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -145,11 +145,11 @@ private[spark] class Client( markDisconnected() case DisassociatedEvent(_, address, _) if address == masterAddress => - logError("Connection to master failed; stopping client") + logWarning("Connection to master failed; waiting for master to reconnect...") markDisconnected() case AssociationErrorEvent(_, _, address, _) if address == masterAddress => - logError("Connection to master failed; stopping client") + logWarning("Connection to master failed; waiting for master to reconnect...") markDisconnected() case StopClient => http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala index a74d7be..67e6c5d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object ApplicationState - extends Enumeration { +private[spark] object ApplicationState extends Enumeration { type ApplicationState = Value http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0545ad1..7db5097 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor._ import akka.pattern.ask @@ -41,16 +41,6 @@ import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed import org.apache.spark.deploy.DeployMessages.KillExecutor import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import scala.Some -import org.apache.spark.deploy.DeployMessages.LaunchExecutor -import org.apache.spark.deploy.DeployMessages.RegisteredApplication -import org.apache.spark.deploy.DeployMessages.RegisterWorker -import org.apache.spark.deploy.DeployMessages.ExecutorUpdated -import org.apache.spark.deploy.DeployMessages.MasterStateResponse -import org.apache.spark.deploy.DeployMessages.ExecutorAdded -import org.apache.spark.deploy.DeployMessages.RegisterApplication -import org.apache.spark.deploy.DeployMessages.ApplicationRemoved -import org.apache.spark.deploy.DeployMessages.Heartbeat -import org.apache.spark.deploy.DeployMessages.RegisteredWorker import akka.actor.Terminated import akka.serialization.SerializationExtension import java.util.concurrent.TimeUnit @@ -571,7 +561,7 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) - val timeoutDuration : FiniteDuration = Duration.create( + val timeoutDuration: FiniteDuration = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) implicit val timeout = Timeout(timeoutDuration) val respFuture = actor ? RequestWebUIPort // ask pattern http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98c57ca..07189ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap import scala.concurrent.duration._ import akka.actor._ -import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent} +import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} @@ -34,19 +34,6 @@ import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} -import org.apache.spark.deploy.DeployMessages.WorkerStateResponse -import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed -import org.apache.spark.deploy.DeployMessages.KillExecutor -import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import scala.Some -import akka.remote.DisassociatedEvent -import org.apache.spark.deploy.DeployMessages.LaunchExecutor -import org.apache.spark.deploy.DeployMessages.RegisterWorker -import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse -import org.apache.spark.deploy.DeployMessages.MasterChanged -import org.apache.spark.deploy.DeployMessages.Heartbeat -import org.apache.spark.deploy.DeployMessages.RegisteredWorker -import akka.actor.Terminated /** * @param masterUrls Each url should look like spark://host:port. @@ -248,7 +235,7 @@ private[spark] class Worker( } } - case DisassociatedEvent(_, _, _) => + case DisassociatedEvent(_, address, _) if address == master.path.address => masterDisconnected() case RequestWorkerState => { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 73fa7d6..50302fc 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -107,7 +107,6 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) - actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index de45404..0b0a60e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -118,7 +118,11 @@ private[spark] class Executor( } } - private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") + // Akka's message frame size. If task result is bigger than this, we use the block manager + // to send the result back. + private val akkaFrameSize = { + env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size") + } // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index 44c5078..d1c74a5 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.ArrayBuffer import scala.concurrent.ExecutionContext.Implicits.global +import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} -import scala.reflect.ClassTag /** * A set of asynchronous RDD actions available through an implicit conversion. http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index 63b9fe1..424354a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -17,9 +17,10 @@ package org.apache.spark.rdd +import scala.reflect.ClassTag + import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext} import org.apache.spark.storage.{BlockId, BlockManager} -import scala.reflect.ClassTag private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 47e958b..53f77a3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * sources in HBase, or S3). * * @param sc The SparkContext to associate the RDD with. - * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7b4fc6b..fdea3f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.duration._ -import scala.concurrent.ExecutionContext.Implicits.global -import akka.actor._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.reflect.ClassTag +import akka.actor._ + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.executor.TaskMetrics http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 2d8a0a6..9975ec1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import org.apache.spark._ import org.apache.spark.TaskState.TaskState http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8de9b72..84fe309 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend( !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7557dda..02adcb4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,14 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} - -import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.collection.Map +import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag -import scala.Some - import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 45849b3..c26f23d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -19,7 +19,6 @@ package org.apache.spark.util.collection import scala.reflect.ClassTag - /** * A fast hash map implementation for nullable keys. This hash map supports insertions and updates, * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/docs/hadoop-third-party-distributions.md ---------------------------------------------------------------------- diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md index f706625..b33af2c 100644 --- a/docs/hadoop-third-party-distributions.md +++ b/docs/hadoop-third-party-distributions.md @@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors. <h3>CDH Releases</h3> <table class="table" style="width:350px; margin-right: 20px;"> <tr><th>Release</th><th>Version code</th></tr> - <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr> - <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr> + <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr> + <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr> <tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr> <tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr> <tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr> http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 26e6a83..476e7c5 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -105,12 +105,6 @@ object SparkBuild extends Build { // also check the local Maven repository ~/.m2 resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))), - // Shared between both core and streaming. - resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"), - - // Shared between both examples and streaming. - resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"), - // For Sonatype publishing resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots", "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"), @@ -292,11 +286,10 @@ object SparkBuild extends Build { libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") - exclude("net.sf.jopt-simple", "jopt-simple") - excludeAll(excludeNetty), + exclude("net.sf.jopt-simple", "jopt-simple"), "org.eclipse.paho" % "mqtt-client" % "0.4.0", "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala ---------------------------------------------------------------------- diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 43e504c..523fd12 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -940,17 +940,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter, if (prop != null) prop else "local" } } - val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')) - .getOrElse(new Array[String](0)) - .map(new java.io.File(_).getAbsolutePath) - try { - sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) - } catch { - case e: Exception => - e.printStackTrace() - echo("Failed to create SparkContext, exiting...") - sys.exit(1) - } + val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath) + sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) + echo("Created spark context..") sparkContext } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala ---------------------------------------------------------------------- diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 418c31e..c230a03 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -78,7 +78,7 @@ class ReplSuite extends FunSuite { System.clearProperty("spark.hostPort") } - test ("simple foreach with accumulator") { + test("simple foreach with accumulator") { val output = runInterpreter("local", """ |val accum = sc.accumulator(0) |sc.parallelize(1 to 10).foreach(x => accum += x) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index 66fe6e7..6e9a781 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import scala.concurrent.duration._ import akka.dispatch._ import org.apache.spark.storage.BlockId http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index ea5c165..80af96c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration -import scala.Some class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 3ba37be..dfd6e27 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } object JavaPairDStream { - implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = { + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 16c1567..0000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} -import scala.reflect.ClassTag - -private[streaming] -class CoGroupedDStream[K : ClassTag]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index ec0096c..526f556 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -33,7 +33,6 @@ import org.I0Itec.zkclient._ import scala.collection.Map import scala.reflect.ClassTag - /** * Input stream that pulls messages from a Kafka Broker. *
