Various merge corrections

I've diff'd this patch against my own -- since they were both created
independently, this means that two sets of eyes have gone over all the
merge conflicts that were created, so I'm feeling significantly more
confident in the resulting PR.

@rxin has looked at the changes to the repl and is resoundingly
confident that they are correct.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f629ba95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f629ba95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f629ba95

Branch: refs/heads/master
Commit: f629ba95b6a1a3508463bfdcb03efcfaa3327cb5
Parents: d4cd323
Author: Aaron Davidson <[email protected]>
Authored: Thu Nov 14 22:13:09 2013 -0800
Committer: Aaron Davidson <[email protected]>
Committed: Thu Nov 14 22:13:09 2013 -0800

----------------------------------------------------------------------
 .../spark/api/java/function/Function.java       |  2 -
 .../spark/api/java/function/Function2.java      |  2 -
 .../org/apache/spark/deploy/client/Client.scala |  4 +-
 .../spark/deploy/master/ApplicationState.scala  |  3 +-
 .../org/apache/spark/deploy/master/Master.scala | 14 +----
 .../org/apache/spark/deploy/worker/Worker.scala | 17 +-----
 .../executor/CoarseGrainedExecutorBackend.scala |  1 -
 .../org/apache/spark/executor/Executor.scala    |  6 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |  2 +-
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |  3 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  7 ++-
 .../scheduler/cluster/ClusterScheduler.scala    |  2 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |  1 +
 .../scala/org/apache/spark/util/Utils.scala     |  5 +-
 .../spark/util/collection/OpenHashMap.scala     |  1 -
 docs/hadoop-third-party-distributions.md        |  4 +-
 project/SparkBuild.scala                        | 11 +---
 .../org/apache/spark/repl/SparkILoop.scala      | 14 +----
 .../scala/org/apache/spark/repl/ReplSuite.scala |  2 +-
 .../spark/streaming/NetworkInputTracker.scala   |  2 +-
 .../spark/streaming/PairDStreamFunctions.scala  |  1 -
 .../streaming/api/java/JavaPairDStream.scala    |  2 +-
 .../streaming/dstream/CoGroupedDStream.scala    | 59 --------------------
 .../streaming/dstream/KafkaInputDStream.scala   |  1 -
 25 files changed, 33 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/function/Function.java 
b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
index 49e661a..537439e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -29,8 +29,6 @@ import java.io.Serializable;
  * when mapping RDDs of other types.
  */
 public abstract class Function<T, R> extends WrappedFunction1<T, R> implements 
Serializable {
-  public abstract R call(T t) throws Exception;
-
   public ClassTag<R> returnType() {
     return ClassTag$.MODULE$.apply(Object.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java 
b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
index cf77bb6..a2d1214 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -28,8 +28,6 @@ import java.io.Serializable;
 public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
   implements Serializable {
 
-  public abstract R call(T1 t1, T2 t2) throws Exception;
-
   public ClassTag<R> returnType() {
     return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 3953a3e..572fc34 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -145,11 +145,11 @@ private[spark] class Client(
         markDisconnected()
 
       case DisassociatedEvent(_, address, _) if address == masterAddress =>
-        logError("Connection to master failed; stopping client")
+        logWarning("Connection to master failed; waiting for master to 
reconnect...")
         markDisconnected()
 
       case AssociationErrorEvent(_, _, address, _) if address == masterAddress 
=>
-        logError("Connection to master failed; stopping client")
+        logWarning("Connection to master failed; waiting for master to 
reconnect...")
         markDisconnected()
 
       case StopClient =>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index a74d7be..67e6c5d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.deploy.master
 
-private[spark] object ApplicationState
-  extends Enumeration {
+private[spark] object ApplicationState extends Enumeration {
 
   type ApplicationState = Value
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0545ad1..7db5097 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Await
 import scala.concurrent.duration._
-import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.concurrent.duration.{Duration, FiniteDuration}
 
 import akka.actor._
 import akka.pattern.ask
@@ -41,16 +41,6 @@ import 
org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
 import org.apache.spark.deploy.DeployMessages.KillExecutor
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
 import scala.Some
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisteredApplication
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
-import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
-import org.apache.spark.deploy.DeployMessages.MasterStateResponse
-import org.apache.spark.deploy.DeployMessages.ExecutorAdded
-import org.apache.spark.deploy.DeployMessages.RegisterApplication
-import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
 import akka.actor.Terminated
 import akka.serialization.SerializationExtension
 import java.util.concurrent.TimeUnit
@@ -571,7 +561,7 @@ private[spark] object Master {
   def startSystemAndActor(host: String, port: Int, webUiPort: Int): 
(ActorSystem, Int, Int) = {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, 
host, port)
     val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, 
webUiPort), name = actorName)
-    val timeoutDuration : FiniteDuration = Duration.create(
+    val timeoutDuration: FiniteDuration = Duration.create(
       System.getProperty("spark.akka.askTimeout", "10").toLong, 
TimeUnit.SECONDS)
     implicit val timeout = Timeout(timeoutDuration)
     val respFuture = actor ? RequestWebUIPort   // ask pattern

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 98c57ca..07189ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 
 import akka.actor._
-import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, 
DisassociatedEvent}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
 
 import org.apache.spark.Logging
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -34,19 +34,6 @@ import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
-import akka.remote.DisassociatedEvent
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
-import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse
-import org.apache.spark.deploy.DeployMessages.MasterChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import akka.actor.Terminated
 
 /**
   * @param masterUrls Each url should look like spark://host:port.
@@ -248,7 +235,7 @@ private[spark] class Worker(
         }
       }
 
-    case DisassociatedEvent(_, _, _) =>
+    case DisassociatedEvent(_, address, _) if address == master.path.address =>
       masterDisconnected()
 
     case RequestWorkerState => {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 73fa7d6..50302fc 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -107,7 +107,6 @@ private[spark] object CoarseGrainedExecutorBackend {
     // set it
     val sparkHostPort = hostname + ":" + boundPort
     System.setProperty("spark.hostPort", sparkHostPort)
-
     actorSystem.actorOf(
       Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, 
sparkHostPort, cores),
       name = "Executor")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index de45404..0b0a60e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -118,7 +118,11 @@ private[spark] class Executor(
     }
   }
 
-  private val akkaFrameSize = 
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
+  // Akka's message frame size. If task result is bigger than this, we use the 
block manager
+  // to send the result back.
+  private val akkaFrameSize = {
+    
env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
+  }
 
   // Start worker thread pool
   val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch 
worker")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 44c5078..d1c74a5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.ExecutionContext.Implicits.global
+import scala.reflect.ClassTag
 
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
-import scala.reflect.ClassTag
 
 /**
  * A set of asynchronous RDD actions available through an implicit conversion.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 63b9fe1..424354a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.rdd
 
+import scala.reflect.ClassTag
+
 import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
 import org.apache.spark.storage.{BlockId, BlockManager}
-import scala.reflect.ClassTag
 
 private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends 
Partition {
   val index = idx

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 47e958b..53f77a3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, 
@transient s: InputSp
  * sources in HBase, or S3).
  *
  * @param sc The SparkContext to associate the RDD with.
- * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. 
If the enclosed
+ * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. 
If the enclosed
  *     variabe references an instance of JobConf, then that JobConf will be 
used for the Hadoop job.
  *     Otherwise, a new JobConf will be created on each slave using the 
enclosed Configuration.
  * @param initLocalJobConfFuncOpt Optional closure used to initialize any 
JobConf that HadoopRDD

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7b4fc6b..fdea3f6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -20,13 +20,14 @@ package org.apache.spark.scheduler
 import java.io.NotSerializableException
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
 
-import akka.actor._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
 import scala.reflect.ClassTag
 
+import akka.actor._
+
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 2d8a0a6..9975ec1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 
-import scala.concurrent.duration._
 import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 8de9b72..84fe309 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend(
             !slaveIdsWithExecutors.contains(slaveId)) {
           // Launch an executor on the slave
           val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+          totalCoresAcquired += cpusToUse
           val taskId = newMesosTaskId()
           taskIdToSlaveId(taskId) = slaveId
           slaveIdsWithExecutors += slaveId

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7557dda..02adcb4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,14 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, 
Inet4Address}
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
 
-
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.collection.Map
+import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 import scala.reflect.ClassTag
-import scala.Some
-
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala 
b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 45849b3..c26f23d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util.collection
 
 import scala.reflect.ClassTag
 
-
 /**
  * A fast hash map implementation for nullable keys. This hash map supports 
insertions and updates,
  * but not deletions. This map is about 5X faster than java.util.HashMap, 
while using much less

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/docs/hadoop-third-party-distributions.md
----------------------------------------------------------------------
diff --git a/docs/hadoop-third-party-distributions.md 
b/docs/hadoop-third-party-distributions.md
index f706625..b33af2c 100644
--- a/docs/hadoop-third-party-distributions.md
+++ b/docs/hadoop-third-party-distributions.md
@@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any 
compatibility errors.
       <h3>CDH Releases</h3>
       <table class="table" style="width:350px; margin-right: 20px;">
         <tr><th>Release</th><th>Version code</th></tr>
-        <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr>
-        <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr>
+        <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr>
+        <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr>
         <tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
         <tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
         <tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 26e6a83..476e7c5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -105,12 +105,6 @@ object SparkBuild extends Build {
     // also check the local Maven repository ~/.m2
     resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + 
"/.m2/repository"))),
 
-    // Shared between both core and streaming.
-    resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/";),
-
-    // Shared between both examples and streaming.
-    resolvers ++= Seq("Mqtt Repository" at 
"https://repo.eclipse.org/content/repositories/paho-releases/";),
-
    // For Sonatype publishing
     resolvers ++= Seq("sonatype-snapshots" at 
"https://oss.sonatype.org/content/repositories/snapshots";,
       "sonatype-staging" at 
"https://oss.sonatype.org/service/local/staging/deploy/maven2/";),
@@ -292,11 +286,10 @@ object SparkBuild extends Build {
 
     libraryDependencies ++= Seq(
       "org.apache.flume"      % "flume-ng-sdk"     % "1.2.0" % "compile"  
excludeAll(excludeNetty, excludeSnappy),
-      "com.sksamuel.kafka"    %% "kafka"            % "0.8.0-beta1"
+      "org.apache.kafka"      % "kafka_2.9.2"      % "0.8.0-beta1"
         exclude("com.sun.jdmk", "jmxtools")
         exclude("com.sun.jmx", "jmxri")
-        exclude("net.sf.jopt-simple", "jopt-simple")
-        excludeAll(excludeNetty),
+        exclude("net.sf.jopt-simple", "jopt-simple"),
       "org.eclipse.paho"      % "mqtt-client"      % "0.4.0",
       "com.github.sgroschupf" % "zkclient"         % "0.1"                
excludeAll(excludeNetty),
       "org.twitter4j"         % "twitter4j-stream" % "3.0.3"              
excludeAll(excludeNetty),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 43e504c..523fd12 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -940,17 +940,9 @@ class SparkILoop(in0: Option[BufferedReader], protected 
val out: JPrintWriter,
         if (prop != null) prop else "local"
       }
     }
-    val jars = Option(System.getenv("ADD_JARS")).map(_.split(','))
-      .getOrElse(new Array[String](0))
-      .map(new java.io.File(_).getAbsolutePath)
-    try {
-      sparkContext = new SparkContext(master, "Spark shell", 
System.getenv("SPARK_HOME"), jars)
-    } catch {
-      case e: Exception =>
-        e.printStackTrace()
-        echo("Failed to create SparkContext, exiting...")
-        sys.exit(1)
-    }
+    val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
+    sparkContext = new SparkContext(master, "Spark shell", 
System.getenv("SPARK_HOME"), jars)
+    echo("Created spark context..")
     sparkContext
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 418c31e..c230a03 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -78,7 +78,7 @@ class ReplSuite extends FunSuite {
     System.clearProperty("spark.hostPort")
   }
 
-  test ("simple foreach with accumulator") {
+  test("simple foreach with accumulator") {
     val output = runInterpreter("local", """
                                            |val accum = sc.accumulator(0)
                                            |sc.parallelize(1 to 10).foreach(x 
=> accum += x)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index 66fe6e7..6e9a781 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._
 
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.Queue
+import scala.concurrent.duration._
 
 import akka.actor._
 import akka.pattern.ask
-import scala.concurrent.duration._
 import akka.dispatch._
 import org.apache.spark.storage.BlockId
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index ea5c165..80af96c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => 
NewOutputFormat}
 import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
-import scala.Some
 
 class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
 extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 3ba37be..dfd6e27 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 }
 
 object JavaPairDStream {
-  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, 
V)]) : JavaPairDStream[K, V] = {
+  implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, 
V)]) = {
     new JavaPairDStream[K, V](dstream)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 16c1567..0000000
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-import scala.reflect.ClassTag
-
-private[streaming]
-class CoGroupedDStream[K : ClassTag](
-    parents: Seq[DStream[(K, _)]],
-    partitioner: Partitioner
-  ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
-  if (parents.length == 0) {
-    throw new IllegalArgumentException("Empty array of parents")
-  }
-
-  if (parents.map(_.ssc).distinct.size > 1) {
-    throw new IllegalArgumentException("Array of parents have different 
StreamingContexts")
-  }
-
-  if (parents.map(_.slideDuration).distinct.size > 1) {
-    throw new IllegalArgumentException("Array of parents have different slide 
times")
-  }
-
-  override def dependencies = parents.toList
-
-  override def slideDuration: Duration = parents.head.slideDuration
-
-  override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
-    val part = partitioner
-    val rdds = parents.flatMap(_.getOrCompute(validTime))
-    if (rdds.size > 0) {
-      val q = new CoGroupedRDD[K](rdds, part)
-      Some(q)
-    } else {
-      None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f629ba95/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index ec0096c..526f556 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -33,7 +33,6 @@ import org.I0Itec.zkclient._
 import scala.collection.Map
 import scala.reflect.ClassTag
 
-
 /**
  * Input stream that pulls messages from a Kafka Broker.
  *

Reply via email to