Updated Branches:
  refs/heads/master 2182aa3c5 -> 919bd7f66

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 62cfa0a..4dcd0e4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values 
for each key are
-   * merged using the supplied reduce function. 
[[org.apache.spark.Partitioner]] is used to control the
-   * partitioning of each RDD.
+   * merged using the supplied reduce function. 
[[org.apache.spark.Partitioner]] is used to control
+   * thepartitioning of each RDD.
    */
   def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): 
JavaPairDStream[K, V] = {
     dstream.reduceByKey(func, partitioner)
@@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. 
This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in 
[[org.apache.spark.rdd.PairRDDFunctions]] for more
-   * information.
+   * combineByKey for RDDs. Please refer to combineByKey in
+   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. 
This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in 
[[org.apache.spark.rdd.PairRDDFunctions]] for more
-   * information.
+   * combineByKey for RDDs. Please refer to combineByKey in
+   * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
       mergeValue: JFunction2[C, V, C],
@@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream.
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream.
    */
   def reduceByKeyAndWindow(
       reduceFunc: Function2[V, V, V],
@@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream.
    * @param filterFunc     function to filter expired key-value pairs;
    *                       only pairs that satisfy the function are retained
    *                       set this to null if you do not want to filter
@@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of 
each RDD.
    * @param updateFunc State update function. If `this` function returns None, 
then
    *                   corresponding state key-value pair will be eliminated.
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream.
    * @tparam S State type
    */
   def updateStateByKey[S](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 921b561..2268160 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
    * @param appName Name to be used when registering with the scheduler
    * @param batchDuration The time interval at which streaming data will be 
divided into batches
    * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jarFile JAR file containing job code, to ship to cluster. This can 
be a path on the local
-   *                file system or an HDFS, HTTP, HTTPS, or FTP URL.
+   * @param jarFile JAR file containing job code, to ship to cluster. This can 
be a path on the
+   *                local file system or an HDFS, HTTP, HTTPS, or FTP URL.
    */
   def this(
       master: String,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 906a16e..903e3f3 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: 
DStream[T])
   }
 
   override def toString() = {
-    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + 
currentCheckpointFiles.mkString("\n") + "\n]"
+    "[\n" + currentCheckpointFiles.size + " checkpoint files \n" +
+      currentCheckpointFiles.mkString("\n") + "\n]"
   }
 
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 2730339..226844c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : 
StreamingContext)
     } else {
       // Time is valid, but check it it is more than lastValidTime
       if (lastValidTime != null && time < lastValidTime) {
-        logWarning("isTimeValid called with " + time + " where as last valid 
time is " + lastValidTime)
+        logWarning("isTimeValid called with " + time + " where as last valid 
time is " +
+          lastValidTime)
       }
       lastValidTime = time
       true

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index ce153f0..0dc6704 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient 
ssc_ : StreamingConte
 
 private[streaming] sealed trait NetworkReceiverMessage
 private[streaming] case class StopReceiver(msg: String) extends 
NetworkReceiverMessage
-private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) 
extends NetworkReceiverMessage
+private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
+  extends NetworkReceiverMessage
 private[streaming] case class ReportError(msg: String) extends 
NetworkReceiverMessage
 
 /**
@@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends 
Serializable with Logging
   }
 
   /**
-   * Batches objects created by a 
[[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into
-   * appropriately named blocks at regular intervals. This class starts two 
threads,
+   * Batches objects created by a 
[[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts
+   * them into appropriately named blocks at regular intervals. This class 
starts two threads,
    * one to periodically start a new batch and prepare the previous batch of 
as a block,
    * the other to push the blocks into the block manager.
    */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb9df2f..f3c58ae 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration}
  * these functions.
  */
 class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
-extends Serializable {
+  extends Serializable {
 
   private[streaming] def ssc = self.ssc
 
-  private[streaming] def defaultPartitioner(numPartitions: Int = 
self.ssc.sc.defaultParallelism) = {
+  private[streaming] def defaultPartitioner(numPartitions: Int = 
self.ssc.sc.defaultParallelism)
+  = {
     new HashPartitioner(numPartitions)
   }
 
@@ -63,8 +64,8 @@ extends Serializable {
   }
 
   /**
-   * Return a new DStream by applying `groupByKey` on each RDD. The supplied 
[[org.apache.spark.Partitioner]]
-   * is used to control the partitioning of each RDD.
+   * Return a new DStream by applying `groupByKey` on each RDD. The supplied
+   * [[org.apache.spark.Partitioner]] is used to control the partitioning of 
each RDD.
    */
   def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
     val createCombiner = (v: V) => ArrayBuffer[V](v)
@@ -94,8 +95,8 @@ extends Serializable {
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values 
for each key are
-   * merged using the supplied reduce function. 
[[org.apache.spark.Partitioner]] is used to control the
-   * partitioning of each RDD.
+   * merged using the supplied reduce function. 
[[org.apache.spark.Partitioner]] is used to control
+   * the partitioning of each RDD.
    */
   def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): 
DStream[(K, V)] = {
     val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -113,7 +114,8 @@ extends Serializable {
     mergeCombiner: (C, C) => C,
     partitioner: Partitioner,
     mapSideCombine: Boolean = true): DStream[(K, C)] = {
-    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, 
mergeCombiner, partitioner, mapSideCombine)
+    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, 
mergeCombiner, partitioner,
+      mapSideCombine)
   }
 
   /**
@@ -138,7 +140,8 @@ extends Serializable {
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): 
DStream[(K, Seq[V])] = {
+  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): 
DStream[(K, Seq[V])] =
+  {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
   }
 
@@ -170,7 +173,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of 
each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of 
each RDD in the new
+   *                       DStream.
    */
   def groupByKeyAndWindow(
       windowDuration: Duration,
@@ -239,7 +243,8 @@ extends Serializable {
       slideDuration: Duration,
       numPartitions: Int
     ): DStream[(K, V)] = {
-    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, 
defaultPartitioner(numPartitions))
+    reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration,
+      defaultPartitioner(numPartitions))
   }
 
   /**
@@ -315,7 +320,8 @@ extends Serializable {
    * @param slideDuration  sliding interval of the window (i.e., the interval 
after which
    *                       the new DStream will generate RDDs); must be a 
multiple of this
    *                       DStream's batching interval
-   * @param partitioner    partitioner for controlling the partitioning of 
each RDD in the new DStream.
+   * @param partitioner    partitioner for controlling the partitioning of 
each RDD in the new
+   *                       DStream.
    * @param filterFunc     Optional function to filter expired key-value pairs;
    *                       only pairs that satisfy the function are retained
    */
@@ -373,7 +379,8 @@ extends Serializable {
    * [[org.apache.spark.Partitioner]] is used to control the partitioning of 
each RDD.
    * @param updateFunc State update function. If `this` function returns None, 
then
    *                   corresponding state key-value pair will be eliminated.
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream.
    * @tparam S State type
    */
   def updateStateByKey[S: ClassTag](
@@ -395,7 +402,8 @@ extends Serializable {
    *                   this function may generate a different a tuple with a 
different key
    *                   than the input key. It is up to the developer to decide 
whether to
    *                   remember the partitioner despite the key being changed.
-   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new DStream.
+   * @param partitioner Partitioner for controlling the partitioning of each 
RDD in the new
+   *                    DStream
    * @param rememberPartitioner Whether to remember the paritioner object in 
the generated RDDs.
    * @tparam S State type
    */
@@ -438,7 +446,8 @@ extends Serializable {
    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream 
and `other` DStream.
    * Hash partitioning is used to generate the RDDs with `numPartitions` 
partitions.
    */
-  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): 
DStream[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
+  : DStream[(K, (Seq[V], Seq[W]))] = {
     cogroup(other, defaultPartitioner(numPartitions))
   }
 
@@ -566,7 +575,8 @@ extends Serializable {
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F]) {
-    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, 
fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+      fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**
@@ -580,7 +590,7 @@ extends Serializable {
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
       conf: JobConf = new JobConf
-    ) {  
+    ) {
     val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
       val file = rddToFileName(prefix, suffix, time)
       rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)
@@ -596,7 +606,8 @@ extends Serializable {
       prefix: String,
       suffix: String
     )(implicit fm: ClassTag[F])  {
-    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, 
fm.runtimeClass.asInstanceOf[Class[F]])
+    saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass,
+      fm.runtimeClass.asInstanceOf[Class[F]])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 7a6b1ea..ca0a8ae 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val invReduceF = invReduceFunc
 
     val currentTime = validTime
-    val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration, currentTime)
+    val currentWindow = new Interval(currentTime - windowDuration + 
parent.slideDuration,
+      currentTime)
     val previousWindow = currentWindow - slideDuration
 
     logDebug("Window time = " + windowDuration)
@@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= 
oldRDDs ++= newRDDs
 
     // Cogroup the reduced RDDs and merge the reduced values
-    val cogroupedRDD = new 
CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner)
+    val cogroupedRDD = new 
CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
+      partitioner)
     //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _
 
     val numOldValues = oldRDDs.size

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 4ecba03..57429a1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
     val rdds = new ArrayBuffer[RDD[T]]()
     parents.map(_.getOrCompute(validTime)).foreach(_ match {
       case Some(rdd) => rdds += rdd
-      case None => throw new Exception("Could not generate RDD from a parent 
for unifying at time " + validTime)
+      case None => throw new Exception("Could not generate RDD from a parent 
for unifying at time "
+        + validTime)
     })
     if (rdds.size > 0) {
       Some(new UnionRDD(ssc.sc, rdds))

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 6301772..24289b7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag](
     _slideDuration: Duration)
   extends DStream[T](parent.ssc) {
 
-  if (!_windowDuration.isMultipleOf(parent.slideDuration))
+  if (!_windowDuration.isMultipleOf(parent.slideDuration)) {
     throw new Exception("The window duration of windowed DStream (" + 
_slideDuration + ") " +
     "must be a multiple of the slide duration of parent DStream (" + 
parent.slideDuration + ")")
+  }
 
-  if (!_slideDuration.isMultipleOf(parent.slideDuration))
+  if (!_slideDuration.isMultipleOf(parent.slideDuration)) {
     throw new Exception("The slide duration of windowed DStream (" + 
_slideDuration + ") " +
     "must be a multiple of the slide duration of parent DStream (" + 
parent.slideDuration + ")")
+  }
 
   // Persist parent level by default, as those RDDs are going to be obviously 
reused.
   parent.persist(StorageLevel.MEMORY_ONLY_SER)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index b5f11d3..c730624 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
   }
   private val timer = new RecurringTimer(clock, 
ssc.graph.batchDuration.milliseconds,
     longTime => eventActor ! GenerateJobs(new Time(longTime)))
-  private lazy val checkpointWriter = if (ssc.checkpointDuration != null && 
ssc.checkpointDir != null) {
-    new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, 
ssc.sparkContext.hadoopConfiguration)
+  private lazy val checkpointWriter =
+    if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
+      new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, 
ssc.sparkContext.hadoopConfiguration)
   } else {
     null
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 0d9733f..e4fa163 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext}
 import org.apache.spark.util.AkkaUtils
 
 private[streaming] sealed trait NetworkInputTrackerMessage
-private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: 
ActorRef) extends NetworkInputTrackerMessage
-private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], 
metadata: Any) extends NetworkInputTrackerMessage
-private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) 
extends NetworkInputTrackerMessage
+private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: 
ActorRef)
+  extends NetworkInputTrackerMessage
+private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], 
metadata: Any)
+  extends NetworkInputTrackerMessage
+private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
+  extends NetworkInputTrackerMessage
 
 /**
  * This class manages the execution of the receivers of NetworkInputDStreams. 
Instance of
@@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends 
Logging {
     }
 
     if (!networkInputStreams.isEmpty) {
-      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), 
"NetworkInputTracker")
+      actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor),
+        "NetworkInputTracker")
       receiverExecutor.start()
       logInfo("NetworkInputTracker started")
     }
@@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends 
Logging {
           throw new Exception("Register received for unexpected id " + 
streamId)
         }
         receiverInfo += ((streamId, receiverActor))
-        logInfo("Registered receiver for network stream " + streamId + " from 
" + sender.path.address)
+        logInfo("Registered receiver for network stream " + streamId + " from "
+          + sender.path.address)
         sender ! true
       }
       case AddBlocks(streamId, blockIds, metadata) => {
@@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends 
Logging {
       })
 
       // Right now, we only honor preferences if all receivers have them
-      val hasLocationPreferences = 
receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+      val hasLocationPreferences = 
receivers.map(_.getLocationPreference().isDefined)
+        .reduce(_ && _)
 
       // Create the parallel collection of receivers to distributed them on 
the worker nodes
       val tempRDD =
         if (hasLocationPreferences) {
-          val receiversWithPreferences = receivers.map(r => (r, 
Seq(r.getLocationPreference().toString)))
+          val receiversWithPreferences =
+            receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
           ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences)
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 3063cf1..18811fc 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue
 
 /** Asynchronously passes StreamingListenerEvents to registered 
StreamingListeners. */
 private[spark] class StreamingListenerBus() extends Logging {
-  private val listeners = new ArrayBuffer[StreamingListener]() with 
SynchronizedBuffer[StreamingListener]
+  private val listeners = new ArrayBuffer[StreamingListener]()
+    with SynchronizedBuffer[StreamingListener]
 
   /* Cap the capacity of the SparkListenerEvent queue so we get an explicit 
error (rather than
    * an OOM exception) if it's perpetually being added to more quickly than 
it's being drained. */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 6a45bc2..2bb616c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: 
Path, interval: Long)
             }
           }
         }
-    if (!done)
+        if (!done) {
           logError("Could not generate file " + hadoopFile)
-        else
+        } else {
           logInfo("Generated file " + hadoopFile + " at " + 
System.currentTimeMillis)
+        }
         Thread.sleep(interval)
         localFile.delete()
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
index 179fd75..2b8cdb7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, 
bytesPerSec: Int) extends Outpu
       }
     } else {
       // Calculate how much time we should sleep to bring ourselves to the 
desired rate.
-      // Based on throttler in Kafka 
(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
-      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / 
bytesPerSec - elapsedSecs), SECONDS)
+      // Based on throttler in Kafka
+      // scalastyle:off
+      // 
(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+      // scalastyle:on
+      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / 
bytesPerSec - elapsedSecs),
+        SECONDS)
       if (sleepTime > 0) Thread.sleep(sleepTime)
       waitToWrite(numBytes)
     }

Reply via email to