http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
new file mode 100644
index 0000000..edaafb9
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.OutputStream
+import java.lang.{Integer => JInt, Long => JLong}
+import java.nio.charset.StandardCharsets
+import java.util.{List => JList, Map => JMap, Set => JSet}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
+import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
+
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java._
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, 
ReceiverInputDStream}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+
+object KafkaUtils {
+  /**
+   * Create an input stream that pulls messages from Kafka Brokers.
+   * @param ssc       StreamingContext object
+   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
+   * @param groupId   The group id for this consumer
+   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+   *                  in its own thread
+   * @param storageLevel  Storage level to use for storing the received objects
+   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createStream(
+      ssc: StreamingContext,
+      zkQuorum: String,
+      groupId: String,
+      topics: Map[String, Int],
+      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
+    ): ReceiverInputDStream[(String, String)] = {
+    val kafkaParams = Map[String, String](
+      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+      "zookeeper.connection.timeout.ms" -> "10000")
+    createStream[String, String, StringDecoder, StringDecoder](
+      ssc, kafkaParams, topics, storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages from Kafka Brokers.
+   * @param ssc         StreamingContext object
+   * @param kafkaParams Map of kafka configuration parameters,
+   *                    see http://kafka.apache.org/08/configuration.html
+   * @param topics      Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+   *                    in its own thread.
+   * @param storageLevel Storage level to use for storing the received objects
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam U type of Kafka message key decoder
+   * @tparam T type of Kafka message value decoder
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: 
Decoder[_]: ClassTag](
+      ssc: StreamingContext,
+      kafkaParams: Map[String, String],
+      topics: Map[String, Int],
+      storageLevel: StorageLevel
+    ): ReceiverInputDStream[(K, V)] = {
+    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
+    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, 
storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages from Kafka Brokers.
+   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
+   * @param jssc      JavaStreamingContext object
+   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
+   * @param groupId   The group id for this consumer
+   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+   *                  in its own thread
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      zkQuorum: String,
+      groupId: String,
+      topics: JMap[String, JInt]
+    ): JavaPairReceiverInputDStream[String, String] = {
+    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /**
+   * Create an input stream that pulls messages from Kafka Brokers.
+   * @param jssc      JavaStreamingContext object
+   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
+   * @param groupId   The group id for this consumer.
+   * @param topics    Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+   *                  in its own thread.
+   * @param storageLevel RDD storage level.
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createStream(
+      jssc: JavaStreamingContext,
+      zkQuorum: String,
+      groupId: String,
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel
+    ): JavaPairReceiverInputDStream[String, String] = {
+    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+      storageLevel)
+  }
+
+  /**
+   * Create an input stream that pulls messages from Kafka Brokers.
+   * @param jssc      JavaStreamingContext object
+   * @param keyTypeClass Key type of DStream
+   * @param valueTypeClass value type of Dstream
+   * @param keyDecoderClass Type of kafka key decoder
+   * @param valueDecoderClass Type of kafka value decoder
+   * @param kafkaParams Map of kafka configuration parameters,
+   *                    see http://kafka.apache.org/08/configuration.html
+   * @param topics  Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+   *                in its own thread
+   * @param storageLevel RDD storage level.
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam U type of Kafka message key decoder
+   * @tparam T type of Kafka message value decoder
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
+      jssc: JavaStreamingContext,
+      keyTypeClass: Class[K],
+      valueTypeClass: Class[V],
+      keyDecoderClass: Class[U],
+      valueDecoderClass: Class[T],
+      kafkaParams: JMap[String, String],
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel
+    ): JavaPairReceiverInputDStream[K, V] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
+
+    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
+    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
+
+    createStream[K, V, U, T](
+      jssc.ssc,
+      kafkaParams.asScala.toMap,
+      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
+      storageLevel)
+  }
+
+  /** get leaders for the given offset ranges, or throw an exception */
+  private def leadersForRanges(
+      kc: KafkaCluster,
+      offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] 
= {
+    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
+    val leaders = kc.findLeaders(topics)
+    KafkaCluster.checkErrors(leaders)
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
+      kc: KafkaCluster,
+      offsetRanges: Array[OffsetRange]): Unit = {
+    val topics = offsetRanges.map(_.topicAndPartition).toSet
+    val result = for {
+      low <- kc.getEarliestLeaderOffsets(topics).right
+      high <- kc.getLatestLeaderOffsets(topics).right
+    } yield {
+      offsetRanges.filterNot { o =>
+        low(o.topicAndPartition).offset <= o.fromOffset &&
+        o.untilOffset <= high(o.topicAndPartition).offset
+      }
+    }
+    val badRanges = KafkaCluster.checkErrors(result)
+    if (!badRanges.isEmpty) {
+      throw new SparkException("Offsets not available on leader: " + 
badRanges.mkString(","))
+    }
+  }
+
+  private[kafka] def getFromOffsets(
+      kc: KafkaCluster,
+      kafkaParams: Map[String, String],
+      topics: Set[String]
+    ): Map[TopicAndPartition, Long] = {
+    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
+    val result = for {
+      topicPartitions <- kc.getPartitions(topics).right
+      leaderOffsets <- (if (reset == Some("smallest")) {
+        kc.getEarliestLeaderOffsets(topicPartitions)
+      } else {
+        kc.getLatestLeaderOffsets(topicPartitions)
+      }).right
+    } yield {
+      leaderOffsets.map { case (tp, lo) =>
+          (tp, lo.offset)
+      }
+    }
+    KafkaCluster.checkErrors(result)
+  }
+
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and partition.
+   *
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @return RDD of (Kafka message key, Kafka message value)
+   */
+  def createRDD[
+    K: ClassTag,
+    V: ClassTag,
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag](
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      offsetRanges: Array[OffsetRange]
+    ): RDD[(K, V)] = sc.withScope {
+    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
+    val kc = new KafkaCluster(kafkaParams)
+    val leaders = leadersForRanges(kc, offsetRanges)
+    checkOffsets(kc, offsetRanges)
+    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
+  }
+
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and partition. 
This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
+   * as the metadata.
+   *
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
+   *   in which case leaders will be looked up on the driver.
+   * @param messageHandler Function for translating each message and metadata 
into the desired type
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @tparam R type returned by messageHandler
+   * @return RDD of R
+   */
+  def createRDD[
+    K: ClassTag,
+    V: ClassTag,
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag,
+    R: ClassTag](
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      offsetRanges: Array[OffsetRange],
+      leaders: Map[TopicAndPartition, Broker],
+      messageHandler: MessageAndMetadata[K, V] => R
+    ): RDD[R] = sc.withScope {
+    val kc = new KafkaCluster(kafkaParams)
+    val leaderMap = if (leaders.isEmpty) {
+      leadersForRanges(kc, offsetRanges)
+    } else {
+      // This could be avoided by refactoring KafkaRDD.leaders and 
KafkaCluster to use Broker
+      leaders.map {
+        case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
+      }
+    }
+    val cleanedHandler = sc.clean(messageHandler)
+    checkOffsets(kc, offsetRanges)
+    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, 
cleanedHandler)
+  }
+
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and partition.
+   *
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param keyClass type of Kafka message key
+   * @param valueClass type of Kafka message value
+   * @param keyDecoderClass type of Kafka message key decoder
+   * @param valueDecoderClass type of Kafka message value decoder
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @return RDD of (Kafka message key, Kafka message value)
+   */
+  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+      jsc: JavaSparkContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      kafkaParams: JMap[String, String],
+      offsetRanges: Array[OffsetRange]
+    ): JavaPairRDD[K, V] = jsc.sc.withScope {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    new JavaPairRDD(createRDD[K, V, KD, VD](
+      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
+  }
+
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and partition. 
This allows you
+   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
+   * as the metadata.
+   *
+   * @param jsc JavaSparkContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param offsetRanges Each OffsetRange in the batch corresponds to a
+   *   range of offsets for a given Kafka topic/partition
+   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
+   *   in which case leaders will be looked up on the driver.
+   * @param messageHandler Function for translating each message and metadata 
into the desired type
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @tparam R type returned by messageHandler
+   * @return RDD of R
+   */
+  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+      jsc: JavaSparkContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      recordClass: Class[R],
+      kafkaParams: JMap[String, String],
+      offsetRanges: Array[OffsetRange],
+      leaders: JMap[TopicAndPartition, Broker],
+      messageHandler: JFunction[MessageAndMetadata[K, V], R]
+    ): JavaRDD[R] = jsc.sc.withScope {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    val leaderMap = Map(leaders.asScala.toSeq: _*)
+    createRDD[K, V, KD, VD, R](
+      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, 
messageHandler.call(_))
+  }
+
+  /**
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs 
(see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
+   *    that the output operation is idempotent, or use transactions to output 
records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
+   *    host1:port1,host2:port2 form.
+   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
+   *    starting point of the stream
+   * @param messageHandler Function for translating each message and metadata 
into the desired type
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @tparam R type returned by messageHandler
+   * @return DStream of R
+   */
+  def createDirectStream[
+    K: ClassTag,
+    V: ClassTag,
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag,
+    R: ClassTag] (
+      ssc: StreamingContext,
+      kafkaParams: Map[String, String],
+      fromOffsets: Map[TopicAndPartition, Long],
+      messageHandler: MessageAndMetadata[K, V] => R
+  ): InputDStream[R] = {
+    val cleanedHandler = ssc.sc.clean(messageHandler)
+    new DirectKafkaInputDStream[K, V, KD, VD, R](
+      ssc, kafkaParams, fromOffsets, cleanedHandler)
+  }
+
+  /**
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs 
(see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
+   *    that the output operation is idempotent, or use transactions to output 
records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param ssc StreamingContext object
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
+   *   to determine where the stream starts (defaults to "largest")
+   * @param topics Names of the topics to consume
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createDirectStream[
+    K: ClassTag,
+    V: ClassTag,
+    KD <: Decoder[K]: ClassTag,
+    VD <: Decoder[V]: ClassTag] (
+      ssc: StreamingContext,
+      kafkaParams: Map[String, String],
+      topics: Set[String]
+  ): InputDStream[(K, V)] = {
+    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
+    val kc = new KafkaCluster(kafkaParams)
+    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
+    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
+      ssc, kafkaParams, fromOffsets, messageHandler)
+  }
+
+  /**
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs 
(see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
+   *    that the output operation is idempotent, or use transactions to output 
records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param jssc JavaStreamingContext object
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param keyDecoderClass Class of the key decoder
+   * @param valueDecoderClass Class of the value decoder
+   * @param recordClass Class of the records in DStream
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
+   *    starting point of the stream
+   * @param messageHandler Function for translating each message and metadata 
into the desired type
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @tparam R type returned by messageHandler
+   * @return DStream of R
+   */
+  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
+      jssc: JavaStreamingContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      recordClass: Class[R],
+      kafkaParams: JMap[String, String],
+      fromOffsets: JMap[TopicAndPartition, JLong],
+      messageHandler: JFunction[MessageAndMetadata[K, V], R]
+    ): JavaInputDStream[R] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
+    createDirectStream[K, V, KD, VD, R](
+      jssc.ssc,
+      Map(kafkaParams.asScala.toSeq: _*),
+      Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
+      cleanedHandler
+    )
+  }
+
+  /**
+   * Create an input stream that directly pulls messages from Kafka Brokers
+   * without using any receiver. This stream can guarantee that each message
+   * from Kafka is included in transformations exactly once (see points below).
+   *
+   * Points to note:
+   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
+   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
+   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
+   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
+   *    You can access the offsets used in each batch from the generated RDDs 
(see
+   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
+   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
+   *    in the [[StreamingContext]]. The information on consumed offset can be
+   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
+   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
+   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
+   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
+   *    that the output operation is idempotent, or use transactions to output 
records atomically.
+   *    See the programming guide for more details.
+   *
+   * @param jssc JavaStreamingContext object
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param keyDecoderClass Class of the key decoder
+   * @param valueDecoderClass Class type of the value decoder
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
+   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
+   *   host1:port1,host2:port2 form.
+   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
+   *   to determine where the stream starts (defaults to "largest")
+   * @param topics Names of the topics to consume
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   * @tparam KD type of Kafka message key decoder
+   * @tparam VD type of Kafka message value decoder
+   * @return DStream of (Kafka message key, Kafka message value)
+   */
+  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
+      jssc: JavaStreamingContext,
+      keyClass: Class[K],
+      valueClass: Class[V],
+      keyDecoderClass: Class[KD],
+      valueDecoderClass: Class[VD],
+      kafkaParams: JMap[String, String],
+      topics: JSet[String]
+    ): JavaPairInputDStream[K, V] = {
+    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
+    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
+    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
+    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
+    createDirectStream[K, V, KD, VD](
+      jssc.ssc,
+      Map(kafkaParams.asScala.toSeq: _*),
+      Set(topics.asScala.toSeq: _*)
+    )
+  }
+}
+
+/**
+ * This is a helper class that wraps the KafkaUtils.createStream() into more
+ * Python-friendly class and function so that it can be easily
+ * instantiated and called from Python's KafkaUtils.
+ *
+ * The zero-arg constructor helps instantiate this class from the Class object
+ * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
+ * takes care of known parameters instead of passing them from Python
+ */
+private[kafka] class KafkaUtilsPythonHelper {
+  import KafkaUtilsPythonHelper._
+
+  def createStream(
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JMap[String, JInt],
+      storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], 
Array[Byte]] = {
+    KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder](
+      jssc,
+      classOf[Array[Byte]],
+      classOf[Array[Byte]],
+      classOf[DefaultDecoder],
+      classOf[DefaultDecoder],
+      kafkaParams,
+      topics,
+      storageLevel)
+  }
+
+  def createRDDWithoutMessageHandler(
+      jsc: JavaSparkContext,
+      kafkaParams: JMap[String, String],
+      offsetRanges: JList[OffsetRange],
+      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], 
Array[Byte])] = {
+    val messageHandler =
+      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
+    new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler))
+  }
+
+  def createRDDWithMessageHandler(
+      jsc: JavaSparkContext,
+      kafkaParams: JMap[String, String],
+      offsetRanges: JList[OffsetRange],
+      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
+    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+      new PythonMessageAndMetadata(
+        mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
+    val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler).
+        mapPartitions(picklerIterator)
+    new JavaRDD(rdd)
+  }
+
+  private def createRDD[V: ClassTag](
+      jsc: JavaSparkContext,
+      kafkaParams: JMap[String, String],
+      offsetRanges: JList[OffsetRange],
+      leaders: JMap[TopicAndPartition, Broker],
+      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
RDD[V] = {
+    KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
+      jsc.sc,
+      kafkaParams.asScala.toMap,
+      offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
+      leaders.asScala.toMap,
+      messageHandler
+    )
+  }
+
+  def createDirectStreamWithoutMessageHandler(
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JSet[String],
+      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], 
Array[Byte])] = {
+    val messageHandler =
+      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
+    new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler))
+  }
+
+  def createDirectStreamWithMessageHandler(
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JSet[String],
+      fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = 
{
+    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
+      new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, 
mmd.key(), mmd.message())
+    val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler).
+      mapPartitions(picklerIterator)
+    new JavaDStream(stream)
+  }
+
+  private def createDirectStream[V: ClassTag](
+      jssc: JavaStreamingContext,
+      kafkaParams: JMap[String, String],
+      topics: JSet[String],
+      fromOffsets: JMap[TopicAndPartition, JLong],
+      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
DStream[V] = {
+
+    val currentFromOffsets = if (!fromOffsets.isEmpty) {
+      val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
+      if (topicsFromOffsets != topics.asScala.toSet) {
+        throw new IllegalStateException(
+          s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
+          s"do not equal to the topic from offsets: 
${topicsFromOffsets.mkString(" ")}")
+      }
+      Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
+    } else {
+      val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
+      KafkaUtils.getFromOffsets(
+        kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
+    }
+
+    KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
+      jssc.ssc,
+      Map(kafkaParams.asScala.toSeq: _*),
+      Map(currentFromOffsets.toSeq: _*),
+      messageHandler)
+  }
+
+  def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, 
untilOffset: JLong
+    ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, 
untilOffset)
+
+  def createTopicAndPartition(topic: String, partition: JInt): 
TopicAndPartition =
+    TopicAndPartition(topic, partition)
+
+  def createBroker(host: String, port: JInt): Broker = Broker(host, port)
+
+  def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
+    val parentRDDs = rdd.getNarrowAncestors
+    val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, 
_, _, _]])
+
+    require(
+      kafkaRDDs.length == 1,
+      "Cannot get offset ranges, as there may be multiple Kafka RDDs or no 
Kafka RDD associated" +
+        "with this RDD, please call this method only on a Kafka RDD.")
+
+    val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
+    kafkaRDD.offsetRanges.toSeq.asJava
+  }
+}
+
+private object KafkaUtilsPythonHelper {
+  private var initialized = false
+
+  def initialize(): Unit = {
+    SerDeUtil.initialize()
+    synchronized {
+      if (!initialized) {
+        new PythonMessageAndMetadataPickler().register()
+        initialized = true
+      }
+    }
+  }
+
+  initialize()
+
+  def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
+    new SerDeUtil.AutoBatchedPickler(iter)
+  }
+
+  case class PythonMessageAndMetadata(
+      topic: String,
+      partition: JInt,
+      offset: JLong,
+      key: Array[Byte],
+      message: Array[Byte])
+
+  class PythonMessageAndMetadataPickler extends IObjectPickler {
+    private val module = "pyspark.streaming.kafka"
+
+    def register(): Unit = {
+      Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
+      Pickler.registerCustomPickler(this.getClass, this)
+    }
+
+    def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
+      if (obj == this) {
+        out.write(Opcodes.GLOBAL)
+        
out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
+      } else {
+        pickler.save(this)
+        val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
+        out.write(Opcodes.MARK)
+        pickler.save(msgAndMetaData.topic)
+        pickler.save(msgAndMetaData.partition)
+        pickler.save(msgAndMetaData.offset)
+        pickler.save(msgAndMetaData.key)
+        pickler.save(msgAndMetaData.message)
+        out.write(Opcodes.TUPLE)
+        out.write(Opcodes.REDUCE)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
new file mode 100644
index 0000000..d9b856e
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import kafka.common.TopicAndPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can 
be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *      ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. 
Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+    val topic: String,
+    val partition: Int,
+    val fromOffset: Long,
+    val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** Kafka TopicAndPartition object, for convenience */
+  def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, 
partition)
+
+  /** Number of messages this OffsetRange refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  override def equals(obj: Any): Boolean = obj match {
+    case that: OffsetRange =>
+      this.topic == that.topic &&
+        this.partition == that.partition &&
+        this.fromOffset == that.fromOffset &&
+        this.untilOffset == that.untilOffset
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    toTuple.hashCode()
+  }
+
+  override def toString(): String = {
+    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset])"
+  }
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of 
[[OffsetRange]].
+ */
+object OffsetRange {
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def create(
+      topicAndPartition: TopicAndPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
+
+  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def apply(
+      topicAndPartition: TopicAndPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[kafka]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  private[kafka]
+  def apply(t: OffsetRangeTuple) =
+    new OffsetRange(t._1, t._2, t._3, t._4)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
new file mode 100644
index 0000000..39abe3c
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+
+import scala.collection.{mutable, Map}
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, 
ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, 
BlockGeneratorListener, Receiver}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into 
BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference 
compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates the 
offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be 
updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver can be 
eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn 
off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration manually 
within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
+    kafkaParams: Map[String, String],
+    topics: Map[String, Int],
+    storageLevel: StorageLevel)
+    extends Receiver[(K, V)](storageLevel) with Logging {
+
+  private val groupId = kafkaParams("group.id")
+  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+  private def conf = SparkEnv.get.conf
+
+  /** High level consumer to connect to Kafka. */
+  private var consumerConnector: ConsumerConnector = null
+
+  /** zkClient to connect to Zookeeper to commit the offsets. */
+  private var zkClient: ZkClient = null
+
+  /**
+   * A HashMap to manage the offset for each topic/partition, this HashMap is 
called in
+   * synchronized block, so mutable HashMap will not meet concurrency issue.
+   */
+  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, 
Long] = null
+
+  /** A concurrent HashMap to store the stream block id and related offset 
snapshot. */
+  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]] = null
+
+  /**
+   * Manage the BlockGenerator in receiver itself for better managing block 
store and offset
+   * commit.
+   */
+  private var blockGenerator: BlockGenerator = null
+
+  /** Thread pool running the handlers for receiving message from multiple 
topics and partitions. */
+  private var messageHandlerThreadPool: ThreadPoolExecutor = null
+
+  override def onStart(): Unit = {
+    logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
+
+    // Initialize the topic-partition / offset hash map.
+    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
+
+    // Initialize the stream block id / offset snapshot hash map.
+    blockOffsetMap = new ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]]()
+
+    // Initialize the block generator for storing Kafka message.
+    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
+
+    if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && 
kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
+      logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in 
ReliableKafkaReceiver, " +
+        "otherwise we will manually set it to false to turn off auto offset 
commit in Kafka")
+    }
+
+    val props = new Properties()
+    kafkaParams.foreach(param => props.put(param._1, param._2))
+    // Manually set "auto.commit.enable" to "false" no matter user explicitly 
set it to true,
+    // we have to make sure this property is set to false to turn off auto 
commit mechanism in
+    // Kafka.
+    props.setProperty(AUTO_OFFSET_COMMIT, "false")
+
+    val consumerConfig = new ConsumerConfig(props)
+
+    assert(!consumerConfig.autoCommitEnable)
+
+    logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
+    consumerConnector = Consumer.create(consumerConfig)
+    logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
+
+    zkClient = new ZkClient(consumerConfig.zkConnect, 
consumerConfig.zkSessionTimeoutMs,
+      consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+    messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
+      topics.values.sum, "KafkaMessageHandler")
+
+    blockGenerator.start()
+
+    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[K]]
+
+    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[V]]
+
+    val topicMessageStreams = consumerConnector.createMessageStreams(
+      topics, keyDecoder, valueDecoder)
+
+    topicMessageStreams.values.foreach { streams =>
+      streams.foreach { stream =>
+        messageHandlerThreadPool.submit(new MessageHandler(stream))
+      }
+    }
+  }
+
+  override def onStop(): Unit = {
+    if (messageHandlerThreadPool != null) {
+      messageHandlerThreadPool.shutdown()
+      messageHandlerThreadPool = null
+    }
+
+    if (consumerConnector != null) {
+      consumerConnector.shutdown()
+      consumerConnector = null
+    }
+
+    if (zkClient != null) {
+      zkClient.close()
+      zkClient = null
+    }
+
+    if (blockGenerator != null) {
+      blockGenerator.stop()
+      blockGenerator = null
+    }
+
+    if (topicPartitionOffsetMap != null) {
+      topicPartitionOffsetMap.clear()
+      topicPartitionOffsetMap = null
+    }
+
+    if (blockOffsetMap != null) {
+      blockOffsetMap.clear()
+      blockOffsetMap = null
+    }
+  }
+
+  /** Store a Kafka message and the associated metadata as a tuple. */
+  private def storeMessageAndMetadata(
+      msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
+    val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, 
msgAndMetadata.partition)
+    val data = (msgAndMetadata.key, msgAndMetadata.message)
+    val metadata = (topicAndPartition, msgAndMetadata.offset)
+    blockGenerator.addDataWithCallback(data, metadata)
+  }
+
+  /** Update stored offset */
+  private def updateOffset(topicAndPartition: TopicAndPartition, offset: 
Long): Unit = {
+    topicPartitionOffsetMap.put(topicAndPartition, offset)
+  }
+
+  /**
+   * Remember the current offsets for each topic and partition. This is called 
when a block is
+   * generated.
+   */
+  private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
+    // Get a snapshot of current offset map and store with related block id.
+    val offsetSnapshot = topicPartitionOffsetMap.toMap
+    blockOffsetMap.put(blockId, offsetSnapshot)
+    topicPartitionOffsetMap.clear()
+  }
+
+  /**
+   * Store the ready-to-be-stored block and commit the related offsets to 
zookeeper. This method
+   * will try a fixed number of times to push the block. If the push fails, 
the receiver is stopped.
+   */
+  private def storeBlockAndCommitOffset(
+      blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+    var count = 0
+    var pushed = false
+    var exception: Exception = null
+    while (!pushed && count <= 3) {
+      try {
+        store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+        pushed = true
+      } catch {
+        case ex: Exception =>
+          count += 1
+          exception = ex
+      }
+    }
+    if (pushed) {
+      Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+      blockOffsetMap.remove(blockId)
+    } else {
+      stop("Error while storing block into Spark", exception)
+    }
+  }
+
+  /**
+   * Commit the offset of Kafka's topic/partition, the commit mechanism follow 
Kafka 0.8.x's
+   * metadata schema in Zookeeper.
+   */
+  private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
+    if (zkClient == null) {
+      val thrown = new IllegalStateException("Zookeeper client is unexpectedly 
null")
+      stop("Zookeeper client is not initialized before commit offsets to ZK", 
thrown)
+      return
+    }
+
+    for ((topicAndPart, offset) <- offsetMap) {
+      try {
+        val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
+        val zkPath = 
s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
+
+        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
+      } catch {
+        case e: Exception =>
+          logWarning(s"Exception during commit offset $offset for topic" +
+            s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
+      }
+
+      logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
+        s"partition ${topicAndPart.partition}")
+    }
+  }
+
+  /** Class to handle received Kafka message. */
+  private final class MessageHandler(stream: KafkaStream[K, V]) extends 
Runnable {
+    override def run(): Unit = {
+      while (!isStopped) {
+        try {
+          val streamIterator = stream.iterator()
+          while (streamIterator.hasNext) {
+            storeMessageAndMetadata(streamIterator.next)
+          }
+        } catch {
+          case e: Exception =>
+            reportError("Error handling message", e)
+        }
+      }
+    }
+  }
+
+  /** Class to handle blocks generated by the block generator. */
+  private final class GeneratedBlockHandler extends BlockGeneratorListener {
+
+    def onAddData(data: Any, metadata: Any): Unit = {
+      // Update the offset of the data that was added to the generator
+      if (metadata != null) {
+        val (topicAndPartition, offset) = 
metadata.asInstanceOf[(TopicAndPartition, Long)]
+        updateOffset(topicAndPartition, offset)
+      }
+    }
+
+    def onGenerateBlock(blockId: StreamBlockId): Unit = {
+      // Remember the offsets of topics/partitions when a block has been 
generated
+      rememberBlockOffsets(blockId)
+    }
+
+    def onPushBlock(blockId: StreamBlockId, arrayBuffer: 
mutable.ArrayBuffer[_]): Unit = {
+      // Store block and commit the blocks offset
+      storeBlockAndCommitOffset(blockId, arrayBuffer)
+    }
+
+    def onError(message: String, throwable: Throwable): Unit = {
+      reportError(message, throwable)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
new file mode 100644
index 0000000..2e5ab0f
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka receiver for spark streaming.
+ */
+package org.apache.spark.streaming.kafka;

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
new file mode 100644
index 0000000..47c5187
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Kafka receiver for spark streaming,
+ */
+package object kafka

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 0000000..fa6b0db
--- /dev/null
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.Tuple2;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaDirectKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+    kafkaTestUtils = new KafkaTestUtils();
+    kafkaTestUtils.setup();
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
+  }
+
+  @After
+  public void tearDown() {
+    if (ssc != null) {
+      ssc.stop();
+      ssc = null;
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown();
+      kafkaTestUtils = null;
+    }
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+    final String topic1 = "topic1";
+    final String topic2 = "topic2";
+    // hold a reference to the current offset ranges, so it can be used 
downstream
+    final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
+
+    String[] topic1data = createTopicAndSendData(topic1);
+    String[] topic2data = createTopicAndSendData(topic2);
+
+    Set<String> sent = new HashSet<>();
+    sent.addAll(Arrays.asList(topic1data));
+    sent.addAll(Arrays.asList(topic2data));
+
+    Map<String, String> kafkaParams = new HashMap<>();
+    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+    kafkaParams.put("auto.offset.reset", "smallest");
+
+    JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
+        ssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        topicToSet(topic1)
+    ).transformToPair(
+        // Make sure you can get offset ranges from the rdd
+        new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
+          @Override
+          public JavaPairRDD<String, String> call(JavaPairRDD<String, String> 
rdd) {
+            OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+            offsetRanges.set(offsets);
+            Assert.assertEquals(topic1, offsets[0].topic());
+            return rdd;
+          }
+        }
+    ).map(
+        new Function<Tuple2<String, String>, String>() {
+          @Override
+          public String call(Tuple2<String, String> kv) {
+            return kv._2();
+          }
+        }
+    );
+
+    JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
+        ssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        topicOffsetToMap(topic2, 0L),
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) {
+            return msgAndMd.message();
+          }
+        }
+    );
+    JavaDStream<String> unifiedStream = stream1.union(stream2);
+
+    final Set<String> result = Collections.synchronizedSet(new 
HashSet<String>());
+    unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
+          @Override
+          public void call(JavaRDD<String> rdd) {
+            result.addAll(rdd.collect());
+            for (OffsetRange o : offsetRanges.get()) {
+              System.out.println(
+                o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
+              );
+            }
+          }
+        }
+    );
+    ssc.start();
+    long startTime = System.currentTimeMillis();
+    boolean matches = false;
+    while (!matches && System.currentTimeMillis() - startTime < 20000) {
+      matches = sent.size() == result.size();
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(sent, result);
+    ssc.stop();
+  }
+
+  private static Set<String> topicToSet(String topic) {
+    Set<String> topicSet = new HashSet<>();
+    topicSet.add(topic);
+    return topicSet;
+  }
+
+  private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, 
Long offsetToStart) {
+    Map<TopicAndPartition, Long> topicMap = new HashMap<>();
+    topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
+    return topicMap;
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    kafkaTestUtils.createTopic(topic, 1);
+    kafkaTestUtils.sendMessages(topic, data);
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
new file mode 100644
index 0000000..c41b629
--- /dev/null
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.Tuple2;
+
+import kafka.common.TopicAndPartition;
+import kafka.message.MessageAndMetadata;
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+public class JavaKafkaRDDSuite implements Serializable {
+  private transient JavaSparkContext sc = null;
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+    kafkaTestUtils = new KafkaTestUtils();
+    kafkaTestUtils.setup();
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    sc = new JavaSparkContext(sparkConf);
+  }
+
+  @After
+  public void tearDown() {
+    if (sc != null) {
+      sc.stop();
+      sc = null;
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown();
+      kafkaTestUtils = null;
+    }
+  }
+
+  @Test
+  public void testKafkaRDD() throws InterruptedException {
+    String topic1 = "topic1";
+    String topic2 = "topic2";
+
+    createTopicAndSendData(topic1);
+    createTopicAndSendData(topic2);
+
+    Map<String, String> kafkaParams = new HashMap<>();
+    kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
+
+    OffsetRange[] offsetRanges = {
+      OffsetRange.create(topic1, 0, 0, 1),
+      OffsetRange.create(topic2, 0, 0, 1)
+    };
+
+    Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
+    Map<TopicAndPartition, Broker> leaders = new HashMap<>();
+    String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
+    Broker broker = Broker.create(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
+    leaders.put(new TopicAndPartition(topic1, 0), broker);
+    leaders.put(new TopicAndPartition(topic2, 0), broker);
+
+    JavaRDD<String> rdd1 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        offsetRanges
+    ).map(
+        new Function<Tuple2<String, String>, String>() {
+          @Override
+          public String call(Tuple2<String, String> kv) {
+            return kv._2();
+          }
+        }
+    );
+
+    JavaRDD<String> rdd2 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        offsetRanges,
+        emptyLeaders,
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) {
+            return msgAndMd.message();
+          }
+        }
+    );
+
+    JavaRDD<String> rdd3 = KafkaUtils.createRDD(
+        sc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        String.class,
+        kafkaParams,
+        offsetRanges,
+        leaders,
+        new Function<MessageAndMetadata<String, String>, String>() {
+          @Override
+          public String call(MessageAndMetadata<String, String> msgAndMd) {
+            return msgAndMd.message();
+          }
+        }
+    );
+
+    // just making sure the java user apis work; the scala tests handle logic 
corner cases
+    long count1 = rdd1.count();
+    long count2 = rdd2.count();
+    long count3 = rdd3.count();
+    Assert.assertTrue(count1 > 0);
+    Assert.assertEquals(count1, count2);
+    Assert.assertEquals(count1, count3);
+  }
+
+  private  String[] createTopicAndSendData(String topic) {
+    String[] data = { topic + "-1", topic + "-2", topic + "-3"};
+    kafkaTestUtils.createTopic(topic, 1);
+    kafkaTestUtils.sendMessages(topic, data);
+    return data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
new file mode 100644
index 0000000..868df64
--- /dev/null
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.Tuple2;
+
+import kafka.serializer.StringDecoder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+public class JavaKafkaStreamSuite implements Serializable {
+  private transient JavaStreamingContext ssc = null;
+  private transient Random random = new Random();
+  private transient KafkaTestUtils kafkaTestUtils = null;
+
+  @Before
+  public void setUp() {
+    kafkaTestUtils = new KafkaTestUtils();
+    kafkaTestUtils.setup();
+    SparkConf sparkConf = new SparkConf()
+      .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+    ssc = new JavaStreamingContext(sparkConf, new Duration(500));
+  }
+
+  @After
+  public void tearDown() {
+    if (ssc != null) {
+      ssc.stop();
+      ssc = null;
+    }
+
+    if (kafkaTestUtils != null) {
+      kafkaTestUtils.teardown();
+      kafkaTestUtils = null;
+    }
+  }
+
+  @Test
+  public void testKafkaStream() throws InterruptedException {
+    String topic = "topic1";
+    Map<String, Integer> topics = new HashMap<>();
+    topics.put(topic, 1);
+
+    Map<String, Integer> sent = new HashMap<>();
+    sent.put("a", 5);
+    sent.put("b", 3);
+    sent.put("c", 10);
+
+    kafkaTestUtils.createTopic(topic, 1);
+    kafkaTestUtils.sendMessages(topic, sent);
+
+    Map<String, String> kafkaParams = new HashMap<>();
+    kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
+    kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
+    kafkaParams.put("auto.offset.reset", "smallest");
+
+    JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
+      String.class,
+      String.class,
+      StringDecoder.class,
+      StringDecoder.class,
+      kafkaParams,
+      topics,
+      StorageLevel.MEMORY_ONLY_SER());
+
+    final Map<String, Long> result = Collections.synchronizedMap(new 
HashMap<String, Long>());
+
+    JavaDStream<String> words = stream.map(
+      new Function<Tuple2<String, String>, String>() {
+        @Override
+        public String call(Tuple2<String, String> tuple2) {
+          return tuple2._2();
+        }
+      }
+    );
+
+    words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, 
Long>>() {
+        @Override
+        public void call(JavaPairRDD<String, Long> rdd) {
+          List<Tuple2<String, Long>> ret = rdd.collect();
+          for (Tuple2<String, Long> r : ret) {
+            if (result.containsKey(r._1())) {
+              result.put(r._1(), result.get(r._1()) + r._2());
+            } else {
+              result.put(r._1(), r._2());
+            }
+          }
+        }
+      }
+    );
+
+    ssc.start();
+
+    long startTime = System.currentTimeMillis();
+    boolean sizeMatches = false;
+    while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
+      sizeMatches = sent.size() == result.size();
+      Thread.sleep(200);
+    }
+    Assert.assertEquals(sent.size(), result.size());
+    for (Map.Entry<String, Integer> e : sent.entrySet()) {
+      Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/89e67d66/external/kafka-0-8/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/src/test/resources/log4j.properties 
b/external/kafka-0-8/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fd51f8f
--- /dev/null
+++ b/external/kafka-0-8/src/test/resources/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from Jetty, because it's a bit verbose
+log4j.logger.org.spark_project.jetty=WARN
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to