Updated Branches: refs/heads/master 2182aa3c5 -> 919bd7f66
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 62cfa0a..4dcd0e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -151,8 +151,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * thepartitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { dstream.reduceByKey(func, partitioner) @@ -160,8 +160,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -175,8 +175,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more - * information. + * combineByKey for RDDs. Please refer to combineByKey in + * [[org.apache.spark.rdd.PairRDDFunctions]] for more information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -241,7 +241,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -315,7 +316,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], @@ -403,7 +405,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter @@ -479,7 +482,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S]( http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 921b561..2268160 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -65,8 +65,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param appName Name to be used when registering with the scheduler * @param batchDuration The time interval at which streaming data will be divided into batches * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local - * file system or an HDFS, HTTP, HTTPS, or FTP URL. + * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the + * local file system or an HDFS, HTTP, HTTPS, or FTP URL. */ def this( master: String, http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 906a16e..903e3f3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -114,7 +114,8 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } override def toString() = { - "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + currentCheckpointFiles.mkString("\n") + "\n]" + "[\n" + currentCheckpointFiles.size + " checkpoint files \n" + + currentCheckpointFiles.mkString("\n") + "\n]" } @throws(classOf[IOException]) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala index 2730339..226844c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala @@ -53,7 +53,8 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) } else { // Time is valid, but check it it is more than lastValidTime if (lastValidTime != null && time < lastValidTime) { - logWarning("isTimeValid called with " + time + " where as last valid time is " + lastValidTime) + logWarning("isTimeValid called with " + time + " where as last valid time is " + + lastValidTime) } lastValidTime = time true http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index ce153f0..0dc6704 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -80,7 +80,8 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte private[streaming] sealed trait NetworkReceiverMessage private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) extends NetworkReceiverMessage +private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) + extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage /** @@ -202,8 +203,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging } /** - * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts them into - * appropriately named blocks at regular intervals. This class starts two threads, + * Batches objects created by a [[org.apache.spark.streaming.dstream.NetworkReceiver]] and puts + * them into appropriately named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. */ http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index fb9df2f..f3c58ae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -38,11 +38,12 @@ import org.apache.spark.streaming.{Time, Duration} * these functions. */ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) -extends Serializable { + extends Serializable { private[streaming] def ssc = self.ssc - private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { + private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) + = { new HashPartitioner(numPartitions) } @@ -63,8 +64,8 @@ extends Serializable { } /** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied [[org.apache.spark.Partitioner]] - * is used to control the partitioning of each RDD. + * Return a new DStream by applying `groupByKey` on each RDD. The supplied + * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) @@ -94,8 +95,8 @@ extends Serializable { /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control the - * partitioning of each RDD. + * merged using the supplied reduce function. [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -113,7 +114,8 @@ extends Serializable { mergeCombiner: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true): DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, + mapSideCombine) } /** @@ -138,7 +140,8 @@ extends Serializable { * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = + { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -170,7 +173,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. */ def groupByKeyAndWindow( windowDuration: Duration, @@ -239,7 +243,8 @@ extends Serializable { slideDuration: Duration, numPartitions: Int ): DStream[(K, V)] = { - reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) + reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, + defaultPartitioner(numPartitions)) } /** @@ -315,7 +320,8 @@ extends Serializable { * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner partitioner for controlling the partitioning of each RDD in the new + * DStream. * @param filterFunc Optional function to filter expired key-value pairs; * only pairs that satisfy the function are retained */ @@ -373,7 +379,8 @@ extends Serializable { * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream. * @tparam S State type */ def updateStateByKey[S: ClassTag]( @@ -395,7 +402,8 @@ extends Serializable { * this function may generate a different a tuple with a different key * than the input key. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new + * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */ @@ -438,7 +446,8 @@ extends Serializable { * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) + : DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -566,7 +575,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** @@ -580,7 +590,7 @@ extends Serializable { valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf - ) { + ) { val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) @@ -596,7 +606,8 @@ extends Serializable { prefix: String, suffix: String )(implicit fm: ClassTag[F]) { - saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]]) + saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, + fm.runtimeClass.asInstanceOf[Class[F]]) } /** http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 7a6b1ea..ca0a8ae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -87,7 +87,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val invReduceF = invReduceFunc val currentTime = validTime - val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, currentTime) + val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration, + currentTime) val previousWindow = currentWindow - slideDuration logDebug("Window time = " + windowDuration) @@ -125,7 +126,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs // Cogroup the reduced RDDs and merge the reduced values - val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], partitioner) + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], + partitioner) //val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ val numOldValues = oldRDDs.size http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 4ecba03..57429a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -48,7 +48,8 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) val rdds = new ArrayBuffer[RDD[T]]() parents.map(_.getOrCompute(validTime)).foreach(_ match { case Some(rdd) => rdds += rdd - case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + + validTime) }) if (rdds.size > 0) { Some(new UnionRDD(ssc.sc, rdds)) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index 6301772..24289b7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -31,13 +31,15 @@ class WindowedDStream[T: ClassTag]( _slideDuration: Duration) extends DStream[T](parent.ssc) { - if (!_windowDuration.isMultipleOf(parent.slideDuration)) + if (!_windowDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } - if (!_slideDuration.isMultipleOf(parent.slideDuration)) + if (!_slideDuration.isMultipleOf(parent.slideDuration)) { throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " + "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") + } // Persist parent level by default, as those RDDs are going to be obviously reused. parent.persist(StorageLevel.MEMORY_ONLY_SER) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index b5f11d3..c730624 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -46,8 +46,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventActor ! GenerateJobs(new Time(longTime))) - private lazy val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) + private lazy val checkpointWriter = + if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { + new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 0d9733f..e4fa163 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -34,9 +34,12 @@ import org.apache.spark.streaming.{Time, StreamingContext} import org.apache.spark.util.AkkaUtils private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) extends NetworkInputTrackerMessage -private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage +private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) + extends NetworkInputTrackerMessage +private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) + extends NetworkInputTrackerMessage +private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) + extends NetworkInputTrackerMessage /** * This class manages the execution of the receivers of NetworkInputDStreams. Instance of @@ -66,7 +69,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } if (!networkInputStreams.isEmpty) { - actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), + "NetworkInputTracker") receiverExecutor.start() logInfo("NetworkInputTracker started") } @@ -102,7 +106,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) - logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) + logInfo("Registered receiver for network stream " + streamId + " from " + + sender.path.address) sender ! true } case AddBlocks(streamId, blockIds, metadata) => { @@ -153,12 +158,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { }) // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined) + .reduce(_ && _) // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { - val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) + val receiversWithPreferences = + receivers.map(r => (r, Seq(r.getLocationPreference().toString))) ssc.sc.makeRDD[NetworkReceiver[_]](receiversWithPreferences) } else { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 3063cf1..18811fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ private[spark] class StreamingListenerBus() extends Logging { - private val listeners = new ArrayBuffer[StreamingListener]() with SynchronizedBuffer[StreamingListener] + private val listeners = new ArrayBuffer[StreamingListener]() + with SynchronizedBuffer[StreamingListener] /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 6a45bc2..2bb616c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -407,10 +407,11 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) } } } - if (!done) + if (!done) { logError("Could not generate file " + hadoopFile) - else + } else { logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis) + } Thread.sleep(interval) localFile.delete() } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/919bd7f6/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala index 179fd75..2b8cdb7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala @@ -71,8 +71,12 @@ class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends Outpu } } else { // Calculate how much time we should sleep to bring ourselves to the desired rate. - // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) - val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS) + // Based on throttler in Kafka + // scalastyle:off + // (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala) + // scalastyle:on + val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), + SECONDS) if (sleepTime > 0) Thread.sleep(sleepTime) waitToWrite(numBytes) }