http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index c80545b,c6cd635..3ba37be --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@@ -37,8 -36,8 +37,8 @@@ import org.apache.spark.rdd.RD import org.apache.spark.rdd.PairRDDFunctions class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( - implicit val kTag: ClassTag[K], - implicit val vTag: ClassTag[V]) - implicit val kManifest: ClassManifest[K], - implicit val vManifest: ClassManifest[V]) ++ implicit val kManifest: ClassTag[K], ++ implicit val vManifest: ClassTag[V]) extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] { override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd) @@@ -441,6 -446,8 +447,8 @@@ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], numPartitions: Int) : JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] ++ implicit val cm: ClassTag[S] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) } @@@ -457,12 -464,19 +465,19 @@@ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], partitioner: Partitioner ): JavaPairDStream[K, S] = { - implicit val cm: ClassManifest[S] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] ++ implicit val cm: ClassTag[S] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]] dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) } + + /** + * Return a new DStream by applying a map function to the value of each key-value pairs in + * 'this' DStream without changing the key. + */ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { - implicit val cm: ClassManifest[U] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + implicit val cm: ClassTag[U] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]] dstream.mapValues(f) } @@@ -487,41 -504,150 +505,150 @@@ } /** - * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` - * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that - * key in both RDDs. Partitioner is used to partition each generated RDD. + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ - def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + dstream.cogroup(other.dstream, numPartitions) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def cogroup[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } /** - * Join `this` DStream with `other` DStream. HashPartitioner is used - * to partition each generated RDD into default number of partitions. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream) } /** - * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will - * be generated by joining RDDs from `this` and other DStream. Uses the given - * Partitioner to partition each generated RDD. + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ + def join[W](other: JavaPairDStream[K, W], numPartitions: Int): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + dstream.join(other.dstream, numPartitions) + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. */ - def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) - : JavaPairDStream[K, (V, W)] = { + def join[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + implicit val cm: ClassTag[W] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] dstream.join(other.dstream, partitioner) } /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def leftOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. + * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def leftOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (V, Optional[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.leftOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + */ + def rightOuterJoin[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` + * partitions. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + numPartitions: Int + ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, numPartitions) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** + * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and + * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control + * the partitioning of each RDD. + */ + def rightOuterJoin[W]( + other: JavaPairDStream[K, W], + partitioner: Partitioner + ): JavaPairDStream[K, (Optional[V], W)] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] ++ implicit val cm: ClassTag[W] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]] + val joinResult = dstream.rightOuterJoin(other.dstream, partitioner) + joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)} + } + + /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ @@@ -591,20 -717,25 +718,25 @@@ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } + /** Convert to a JavaDStream */ + def toJavaDStream(): JavaDStream[(K, V)] = { + new JavaDStream[(K, V)](dstream) + } + - override val classManifest: ClassManifest[(K, V)] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + override val classTag: ClassTag[(K, V)] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]] } object JavaPairDStream { - implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) - :JavaPairDStream[K, V] = - implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) = { ++ implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = { new JavaPairDStream[K, V](dstream) + } def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmk: ClassTag[K] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] + implicit val cmv: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] new JavaPairDStream[K, V](dstream.dstream) }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8242af6,cf30b54..ca0c905 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@@ -19,10 -19,9 +19,10 @@@ package org.apache.spark.streaming.api. import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream - import java.util.{Map => JMap} + import java.util.{Map => JMap, List => JList} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import twitter4j.Status @@@ -144,12 -141,11 +144,12 @@@ class JavaStreamingContext(val ssc: Str zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), StorageLevel.MEMORY_ONLY_SER_2) + } /** @@@ -166,9 -162,9 +166,9 @@@ groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { - implicit val cmt: ClassManifest[String] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + implicit val cmt: ClassTag[String] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@@ -189,11 -189,16 +193,16 @@@ kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[T] = { - implicit val cmt: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] - ssc.kafkaStream[T, D]( + : JavaPairDStream[K, V] = { - implicit val keyCmt: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val valueCmt: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] ++ implicit val keyCmt: ClassTag[K] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] ++ implicit val valueCmt: ClassTag[V] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) @@@ -589,6 -594,77 +598,77 @@@ } /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[T](first: JavaDStream[T], rest: JList[JavaDStream[T]]): JavaDStream[T] = { + val dstreams: Seq[DStream[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[T] = first.classManifest ++ implicit val cm: ClassTag[T] = first.classTag + ssc.union(dstreams)(cm) + } + + /** + * Create a unified DStream from multiple DStreams of the same type and same slide duration. + */ + def union[K, V]( + first: JavaPairDStream[K, V], + rest: JList[JavaPairDStream[K, V]] + ): JavaPairDStream[K, V] = { + val dstreams: Seq[DStream[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.dstream) - implicit val cm: ClassManifest[(K, V)] = first.classManifest - implicit val kcm: ClassManifest[K] = first.kManifest - implicit val vcm: ClassManifest[V] = first.vManifest ++ implicit val cm: ClassTag[(K, V)] = first.classTag ++ implicit val kcm: ClassTag[K] = first.kManifest ++ implicit val vcm: ClassTag[V] = first.vManifest + new JavaPairDStream[K, V](ssc.union(dstreams)(cm))(kcm, vcm) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[T]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaRDD[T]] + ): JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] ++ implicit val cmt: ClassTag[T] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** + * Create a new DStream in which each RDD is generated by applying a function on RDDs of + * the DStreams. The order of the JavaRDDs in the transform function parameter will be the + * same as the order of corresponding DStreams in the list. Note that for adding a + * JavaPairDStream in the list of JavaDStreams, convert it to a JavaDStream using + * [[org.apache.spark.streaming.api.java.JavaPairDStream]].toJavaDStream(). + * In the transform function, convert the JavaRDD corresponding to that JavaDStream to + * a JavaPairRDD using [[org.apache.spark.api.java.JavaPairRDD]].fromJavaRDD(). + */ + def transform[K, V]( + dstreams: JList[JavaDStream[_]], + transformFunc: JFunction2[JList[JavaRDD[_]], Time, JavaPairRDD[K, V]] + ): JavaPairDStream[K, V] = { - implicit val cmk: ClassManifest[K] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] - implicit val cmv: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] ++ implicit val cmk: ClassTag[K] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]] ++ implicit val cmv: ClassTag[V] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + val scalaDStreams = dstreams.map(_.dstream).toSeq + val scalaTransformFunc = (rdds: Seq[RDD[_]], time: Time) => { + val jrdds = rdds.map(rdd => JavaRDD.fromRDD[AnyRef](rdd.asInstanceOf[RDD[AnyRef]])).toList + transformFunc.call(jrdds, time).rdd + } + ssc.transform(scalaDStreams, scalaTransformFunc) + } + + /** * Sets the context to periodically checkpoint the DStream operations for master * fault-tolerance. The graph will be checkpointed every batch interval. * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index 9613486,a5de5e1..ec0096c --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@@ -33,20 -31,22 +31,23 @@@ import kafka.utils.ZKStringSerialize import org.I0Itec.zkclient._ import scala.collection.Map - import scala.collection.mutable.HashMap - import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + /** * Input stream that pulls messages from a Kafka Broker. - * + * * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. */ private[streaming] - class KafkaInputDStream[T: ClassTag, D <: Decoder[_]: Manifest]( + class KafkaInputDStream[ - K: ClassManifest, - V: ClassManifest, ++ K: ClassTag, ++ V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], @@@ -61,10 -60,14 +61,14 @@@ } private[streaming] - class KafkaReceiver[T: ClassTag, D <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel + class KafkaReceiver[ - K: ClassManifest, - V: ClassManifest, ++ K: ClassTag, ++ V: ClassTag, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager @@@ -97,14 -100,21 +101,22 @@@ // When autooffset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. - if (kafkaParams.contains("autooffset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } - // Create Threads for each Topic/Message Stream we are listening - val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties]) ++ val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] - val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties]) ++ val valueDecoder = manifest[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].runtimeClass.newInstance.asInstanceOf[Decoder[T]] - val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } @@@ -112,7 -122,8 +124,8 @@@ } // Handles Kafka Messages - private class MessageHandler[T: ClassTag](stream: KafkaStream[T]) extends Runnable { - private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V]) ++ private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala index 0000000,ac05282..ef4a737 mode 000000,100644..100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MQTTInputDStream.scala @@@ -1,0 -1,109 +1,110 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.spark.streaming.dstream + + import org.apache.spark.Logging + import org.apache.spark.storage.StorageLevel + import org.apache.spark.streaming.{ Time, DStreamCheckpointData, StreamingContext } + + import java.util.Properties + import java.util.concurrent.Executors + import java.io.IOException + + import org.eclipse.paho.client.mqttv3.MqttCallback + import org.eclipse.paho.client.mqttv3.MqttClient + import org.eclipse.paho.client.mqttv3.MqttClientPersistence + import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence + import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken + import org.eclipse.paho.client.mqttv3.MqttException + import org.eclipse.paho.client.mqttv3.MqttMessage + import org.eclipse.paho.client.mqttv3.MqttTopic + + import scala.collection.Map + import scala.collection.mutable.HashMap + import scala.collection.JavaConversions._ ++import scala.reflect.ClassTag + + /** + * Input stream that subscribe messages from a Mqtt Broker. + * Uses eclipse paho as MqttClient http://www.eclipse.org/paho/ + * @param brokerUrl Url of remote mqtt publisher + * @param topic topic name to subscribe to + * @param storageLevel RDD storage level. + */ + + private[streaming] -class MQTTInputDStream[T: ClassManifest]( ++class MQTTInputDStream[T: ClassTag]( + @transient ssc_ : StreamingContext, + brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkInputDStream[T](ssc_) with Logging { + + def getReceiver(): NetworkReceiver[T] = { + new MQTTReceiver(brokerUrl, topic, storageLevel) + .asInstanceOf[NetworkReceiver[T]] + } + } + + private[streaming] + class MQTTReceiver(brokerUrl: String, + topic: String, + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { + lazy protected val blockGenerator = new BlockGenerator(storageLevel) + + def onStop() { + blockGenerator.stop() + } + + def onStart() { + + blockGenerator.start() + + // Set up persistence for messages + var peristance: MqttClientPersistence = new MemoryPersistence() + + // Initializing Mqtt Client specifying brokerUrl, clientID and MqttClientPersistance + var client: MqttClient = new MqttClient(brokerUrl, "MQTTSub", peristance) + + // Connect to MqttBroker + client.connect() + + // Subscribe to Mqtt topic + client.subscribe(topic) + + // Callback automatically triggers as and when new message arrives on specified topic + var callback: MqttCallback = new MqttCallback() { + + // Handles Mqtt message + override def messageArrived(arg0: String, arg1: MqttMessage) { + blockGenerator += new String(arg1.getPayload()) + } + + override def deliveryComplete(arg0: IMqttDeliveryToken) { + } + + override def connectionLost(arg0: Throwable) { + logInfo("Connection lost " + arg0) + } + } + + // Set up callback for MqttClient + client.setCallback(callback) + } + } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index a4746f0,10ed4ef..dea0f26 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@@ -18,11 -18,9 +18,11 @@@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging - import org.apache.spark.storage.StorageLevel + import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.StreamingContext +import scala.reflect.ClassTag + import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.{ReadableByteChannel, SocketChannel} http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala index 73e1ddf,71bcb2b..aeea060 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala @@@ -19,19 -19,24 +19,25 @@@ package org.apache.spark.streaming.dstr import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, DStream, Time} +import scala.reflect.ClassTag private[streaming] - class TransformedDStream[T: ClassTag, U: ClassTag] ( - parent: DStream[T], - transformFunc: (RDD[T], Time) => RDD[U] - ) extends DStream[U](parent.ssc) { -class TransformedDStream[U: ClassManifest] ( ++class TransformedDStream[U: ClassTag] ( + parents: Seq[DStream[_]], + transformFunc: (Seq[RDD[_]], Time) => RDD[U] + ) extends DStream[U](parents.head.ssc) { - override def dependencies = List(parent) + require(parents.length > 0, "List of DStreams to transform is empty") + require(parents.map(_.ssc).distinct.size == 1, "Some of the DStreams have different contexts") + require(parents.map(_.slideDuration).distinct.size == 1, + "Some of the DStreams have different slide durations") - override def slideDuration: Duration = parent.slideDuration + override def dependencies = parents.toList + + override def slideDuration: Duration = parents.head.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { - parent.getOrCompute(validTime).map(transformFunc(_, validTime)) + val parentRDDs = parents.map(_.getOrCompute(validTime).orNull).toSeq + Some(transformFunc(parentRDDs, validTime)) } } http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala ---------------------------------------------------------------------- diff --cc streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index ee087a1,ef0f85a..fdf5371 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@@ -20,12 -20,8 +20,12 @@@ package org.apache.spark.streaming.rece import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ + +import scala.concurrent.duration._ +import scala.reflect.ClassTag - import org.apache.spark.storage.StorageLevel + import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.dstream.NetworkReceiver import java.util.concurrent.atomic.AtomicInteger http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java ---------------------------------------------------------------------- diff --cc streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 076fb53,ad4a8b9..daeb99f --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@@ -21,10 -21,9 +21,11 @@@ import com.google.common.base.Optional import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; + import kafka.serializer.StringDecoder; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala ---------------------------------------------------------------------- diff --cc streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index d5cdad4,5e384ee..42ab959 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@@ -33,15 -31,15 +33,15 @@@ trait JavaTestBase extends TestSuiteBas /** * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context. * The stream will be derived from the supplied lists of Java objects. - **/ + */ def attachTestInputStream[T]( - ssc: JavaStreamingContext, - data: JList[JList[T]], - numPartitions: Int) = { + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) @@@ -52,12 -50,11 +52,11 @@@ * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = + dstream: JavaDStreamLike[T, This, R]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val ostream = new TestOutputStream(dstream.dstream, - new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } @@@ -65,16 -62,39 +64,39 @@@ * Process all registered streams for a numBatches batches, failing if * numExpectedOutput RDD's are not generated. Generated RDD's are collected * and returned, represented as a list for each batch interval. + * + * Returns a list of items for each RDD. */ def runStreams[V]( - ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) out } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, + numExpectedOutput: Int): JList[JList[JList[V]]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] ++ implicit val cm: ClassTag[V] = ++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] + val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[JList[V]]]() + res.map{entry => + val lists = entry.map(new ArrayList[V](_)) + out.append(new ArrayList[JList[V]](lists)) + } + out + } } object JavaTestUtils extends JavaTestBase { http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---------------------------------------------------------------------- diff --cc streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index c91f9ba,be14069..126915a --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@@ -61,8 -60,11 +61,11 @@@ class TestInputStream[T: ClassTag](ssc /** * This is a output stream just for the testsuites. All the output is collected into a * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of items */ - class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) -class TestOutputStream[T: ClassManifest](parent: DStream[T], ++class TestOutputStream[T: ClassTag](parent: DStream[T], + val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected @@@ -77,6 -79,30 +80,30 @@@ } /** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each + * containing a sequence of items. + */ -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], ++class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], + val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.glom().collect().map(_.toSeq) + output += collected + }) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } + + def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten)) + } + + /** * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. */ @@@ -107,9 -133,10 +134,10 @@@ trait TestSuiteBase extends FunSuite wi * Set up required DStreams to test the DStream operation using the two sequences * of input collections. */ - def setupStreams[U: ClassManifest, V: ClassManifest]( + def setupStreams[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], - operation: DStream[U] => DStream[V] + operation: DStream[U] => DStream[V], + numPartitions: Int = numInputPartitions ): StreamingContext = { // Create StreamingContext @@@ -158,12 -187,31 +188,31 @@@ * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * returns the collected output. It will wait until `numExpectedOutput` number of * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of items for each RDD. */ - def runStreams[V: ClassManifest]( + def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { + // Flatten each RDD into a single Seq + runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) + } + + /** + * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and + * returns the collected output. It will wait until `numExpectedOutput` number of + * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ - def runStreamsWithPartitions[V: ClassManifest]( ++ def runStreamsWithPartitions[V: ClassTag]( + ssc: StreamingContext, + numBatches: Int, + numExpectedOutput: Int + ): Seq[Seq[Seq[V]]] = { assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala ---------------------------------------------------------------------- diff --cc tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala index f824c47,f824c47..f670f65 --- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala +++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala @@@ -199,7 -199,7 +199,7 @@@ object JavaAPICompletenessChecker private def toJavaMethod(method: SparkMethod): SparkMethod = { val params = method.parameters -- .filterNot(_.name == "scala.reflect.ClassManifest") ++ .filterNot(_.name == "scala.reflect.ClassTag") .map(toJavaType(_, isReturnType = false)) SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params) } @@@ -212,7 -212,7 +212,7 @@@ // internal Spark components. val excludedNames = Seq( "org.apache.spark.rdd.RDD.origin", -- "org.apache.spark.rdd.RDD.elementClassManifest", ++ "org.apache.spark.rdd.RDD.elementClassTag", "org.apache.spark.rdd.RDD.checkpointData", "org.apache.spark.rdd.RDD.partitioner", "org.apache.spark.rdd.RDD.partitions", http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---------------------------------------------------------------------- diff --cc yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index d222f41,25da9aa..4beb522 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@@ -209,9 -209,9 +209,9 @@@ private[yarn] class YarnAllocationHandl else { // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
