http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
deleted file mode 100644
index 570affa..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import kafka.api._
-import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, 
TopicAndPartition}
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-
-import org.apache.spark.SparkException
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Convenience methods for interacting with a Kafka cluster.
- * See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol";>
- * A Guide To The Kafka Protocol</a> for more details on individual api calls.
- * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
- * configuration parameters</a>.
- *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
- */
-@DeveloperApi
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
-  import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
-
-  // ConsumerConfig isn't serializable
-  @transient private var _config: SimpleConsumerConfig = null
-
-  def config: SimpleConsumerConfig = this.synchronized {
-    if (_config == null) {
-      _config = SimpleConsumerConfig(kafkaParams)
-    }
-    _config
-  }
-
-  def connect(host: String, port: Int): SimpleConsumer =
-    new SimpleConsumer(host, port, config.socketTimeoutMs,
-      config.socketReceiveBufferBytes, config.clientId)
-
-  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
-    findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
-
-  // Metadata api
-  // scalastyle:off
-  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
-  // scalastyle:on
-
-  def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
-    val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
-      0, config.clientId, Seq(topic))
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp: TopicMetadataResponse = consumer.send(req)
-      resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.find(_.partitionId == partition)
-      }.foreach { pm: PartitionMetadata =>
-        pm.leader.foreach { leader =>
-          return Right((leader.host, leader.port))
-        }
-      }
-    }
-    Left(errs)
-  }
-
-  def findLeaders(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
-    val topics = topicAndPartitions.map(_.topic)
-    val response = getPartitionMetadata(topics).right
-    val answer = response.flatMap { tms: Set[TopicMetadata] =>
-      val leaderMap = tms.flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
-          val tp = TopicAndPartition(tm.topic, pm.partitionId)
-          if (topicAndPartitions(tp)) {
-            pm.leader.map { l =>
-              tp -> (l.host -> l.port)
-            }
-          } else {
-            None
-          }
-        }
-      }.toMap
-
-      if (leaderMap.keys.size == topicAndPartitions.size) {
-        Right(leaderMap)
-      } else {
-        val missing = topicAndPartitions.diff(leaderMap.keySet)
-        val err = new Err
-        err += new SparkException(s"Couldn't find leaders for ${missing}")
-        Left(err)
-      }
-    }
-    answer
-  }
-
-  def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] 
= {
-    getPartitionMetadata(topics).right.map { r =>
-      r.flatMap { tm: TopicMetadata =>
-        tm.partitionsMetadata.map { pm: PartitionMetadata =>
-          TopicAndPartition(tm.topic, pm.partitionId)
-        }
-      }
-    }
-  }
-
-  def getPartitionMetadata(topics: Set[String]): Either[Err, 
Set[TopicMetadata]] = {
-    val req = TopicMetadataRequest(
-      TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp: TopicMetadataResponse = consumer.send(req)
-      val respErrs = resp.topicsMetadata.filter(m => m.errorCode != 
ErrorMapping.NoError)
-
-      if (respErrs.isEmpty) {
-        return Right(resp.topicsMetadata.toSet)
-      } else {
-        respErrs.foreach { m =>
-          val cause = ErrorMapping.exceptionFor(m.errorCode)
-          val msg = s"Error getting partition metadata for '${m.topic}'. Does 
the topic exist?"
-          errs += new SparkException(msg, cause)
-        }
-      }
-    }
-    Left(errs)
-  }
-
-  // Leader offset api
-  // scalastyle:off
-  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-  // scalastyle:on
-
-  def getLatestLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
-    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
-
-  def getEarliestLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
-    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
-
-  def getLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition],
-      before: Long
-    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
-    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
-      r.map { kv =>
-        // mapValues isn't serializable, see SI-7005
-        kv._1 -> kv._2.head
-      }
-    }
-  }
-
-  private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
-    m.groupBy(_._2).map { kv =>
-      kv._1 -> kv._2.keys.toSeq
-    }
-
-  def getLeaderOffsets(
-      topicAndPartitions: Set[TopicAndPartition],
-      before: Long,
-      maxNumOffsets: Int
-    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
-    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
-      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = 
flip(tpToLeader)
-      val leaders = leaderToTp.keys
-      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
-      val errs = new Err
-      withBrokers(leaders, errs) { consumer =>
-        val partitionsToGetOffsets: Seq[TopicAndPartition] =
-          leaderToTp((consumer.host, consumer.port))
-        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
-          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
-        }.toMap
-        val req = OffsetRequest(reqMap)
-        val resp = consumer.getOffsetsBefore(req)
-        val respMap = resp.partitionErrorAndOffsets
-        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
-          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
-            if (por.error == ErrorMapping.NoError) {
-              if (por.offsets.nonEmpty) {
-                result += tp -> por.offsets.map { off =>
-                  LeaderOffset(consumer.host, consumer.port, off)
-                }
-              } else {
-                errs += new SparkException(
-                  s"Empty offsets for ${tp}, is ${before} before log 
beginning?")
-              }
-            } else {
-              errs += ErrorMapping.exceptionFor(por.error)
-            }
-          }
-        }
-        if (result.keys.size == topicAndPartitions.size) {
-          return Right(result)
-        }
-      }
-      val missing = topicAndPartitions.diff(result.keySet)
-      errs += new SparkException(s"Couldn't find leader offsets for 
${missing}")
-      Left(errs)
-    }
-  }
-
-  // Consumer offset api
-  // scalastyle:off
-  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-  // scalastyle:on
-
-  // this 0 here indicates api version, in this case the original ZK backed 
api.
-  private def defaultConsumerApiVersion: Short = 0
-
-  /**
-   * Requires Kafka 0.8.1.1 or later.
-   * Defaults to the original ZooKeeper backed API version.
-   */
-  def getConsumerOffsets(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, Long]] =
-    getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
-
-  def getConsumerOffsets(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Long]] = {
-    getConsumerOffsetMetadata(groupId, topicAndPartitions, 
consumerApiVersion).right.map { r =>
-      r.map { kv =>
-        kv._1 -> kv._2.offset
-      }
-    }
-  }
-
-  /**
-   * Requires Kafka 0.8.1.1 or later.
-   * Defaults to the original ZooKeeper backed API version.
-   */
-  def getConsumerOffsetMetadata(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition]
-    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
-    getConsumerOffsetMetadata(groupId, topicAndPartitions, 
defaultConsumerApiVersion)
-
-  def getConsumerOffsetMetadata(
-      groupId: String,
-      topicAndPartitions: Set[TopicAndPartition],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
-    var result = Map[TopicAndPartition, OffsetMetadataAndError]()
-    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 
consumerApiVersion)
-    val errs = new Err
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp = consumer.fetchOffsets(req)
-      val respMap = resp.requestInfo
-      val needed = topicAndPartitions.diff(result.keySet)
-      needed.foreach { tp: TopicAndPartition =>
-        respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
-          if (ome.error == ErrorMapping.NoError) {
-            result += tp -> ome
-          } else {
-            errs += ErrorMapping.exceptionFor(ome.error)
-          }
-        }
-      }
-      if (result.keys.size == topicAndPartitions.size) {
-        return Right(result)
-      }
-    }
-    val missing = topicAndPartitions.diff(result.keySet)
-    errs += new SparkException(s"Couldn't find consumer offsets for 
${missing}")
-    Left(errs)
-  }
-
-  /**
-   * Requires Kafka 0.8.1.1 or later.
-   * Defaults to the original ZooKeeper backed API version.
-   */
-  def setConsumerOffsets(
-      groupId: String,
-      offsets: Map[TopicAndPartition, Long]
-    ): Either[Err, Map[TopicAndPartition, Short]] =
-    setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
-
-  def setConsumerOffsets(
-      groupId: String,
-      offsets: Map[TopicAndPartition, Long],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Short]] = {
-    val meta = offsets.map { kv =>
-      kv._1 -> OffsetAndMetadata(kv._2)
-    }
-    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
-  }
-
-  /**
-   * Requires Kafka 0.8.1.1 or later.
-   * Defaults to the original ZooKeeper backed API version.
-   */
-  def setConsumerOffsetMetadata(
-      groupId: String,
-      metadata: Map[TopicAndPartition, OffsetAndMetadata]
-    ): Either[Err, Map[TopicAndPartition, Short]] =
-    setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
-
-  def setConsumerOffsetMetadata(
-      groupId: String,
-      metadata: Map[TopicAndPartition, OffsetAndMetadata],
-      consumerApiVersion: Short
-    ): Either[Err, Map[TopicAndPartition, Short]] = {
-    var result = Map[TopicAndPartition, Short]()
-    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
-    val errs = new Err
-    val topicAndPartitions = metadata.keySet
-    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
-      val resp = consumer.commitOffsets(req)
-      val respMap = resp.commitStatus
-      val needed = topicAndPartitions.diff(result.keySet)
-      needed.foreach { tp: TopicAndPartition =>
-        respMap.get(tp).foreach { err: Short =>
-          if (err == ErrorMapping.NoError) {
-            result += tp -> err
-          } else {
-            errs += ErrorMapping.exceptionFor(err)
-          }
-        }
-      }
-      if (result.keys.size == topicAndPartitions.size) {
-        return Right(result)
-      }
-    }
-    val missing = topicAndPartitions.diff(result.keySet)
-    errs += new SparkException(s"Couldn't set offsets for ${missing}")
-    Left(errs)
-  }
-
-  // Try a call against potentially multiple brokers, accumulating errors
-  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
-    (fn: SimpleConsumer => Any): Unit = {
-    brokers.foreach { hp =>
-      var consumer: SimpleConsumer = null
-      try {
-        consumer = connect(hp._1, hp._2)
-        fn(consumer)
-      } catch {
-        case NonFatal(e) =>
-          errs += e
-      } finally {
-        if (consumer != null) {
-          consumer.close()
-        }
-      }
-    }
-  }
-}
-
-@DeveloperApi
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-object KafkaCluster {
-  type Err = ArrayBuffer[Throwable]
-
-  /** If the result is right, return it, otherwise throw SparkException */
-  def checkErrors[T](result: Either[Err, T]): T = {
-    result.fold(
-      errs => throw new SparkException(errs.mkString("\n")),
-      ok => ok
-    )
-  }
-
-  case class LeaderOffset(host: String, port: Int, offset: Long)
-
-  /**
-   * High-level kafka consumers connect to ZK.  ConsumerConfig assumes this 
use case.
-   * Simple consumers connect directly to brokers, but need many of the same 
configs.
-   * This subclass won't warn about missing ZK params, or presence of broker 
params.
-   */
-  class SimpleConsumerConfig private(brokers: String, originalProps: 
Properties)
-      extends ConsumerConfig(originalProps) {
-    val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
-      val hpa = hp.split(":")
-      if (hpa.size == 1) {
-        throw new SparkException(s"Broker not in the correct format of 
<host>:<port> [$brokers]")
-      }
-      (hpa(0), hpa(1).toInt)
-    }
-  }
-
-  object SimpleConsumerConfig {
-    /**
-     * Make a consumer config without requiring group.id or zookeeper.connect,
-     * since communicating with brokers also needs common settings such as 
timeout
-     */
-    def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
-      // These keys are from other pre-existing kafka configs for specifying 
brokers, accept either
-      val brokers = kafkaParams.get("metadata.broker.list")
-        .orElse(kafkaParams.get("bootstrap.servers"))
-        .getOrElse(throw new SparkException(
-          "Must specify metadata.broker.list or bootstrap.servers"))
-
-      val props = new Properties()
-      kafkaParams.foreach { case (key, value) =>
-        // prevent warnings on parameters ConsumerConfig doesn't know about
-        if (key != "metadata.broker.list" && key != "bootstrap.servers") {
-          props.put(key, value)
-        }
-      }
-
-      Seq("zookeeper.connect", "group.id").foreach { s =>
-        if (!props.containsKey(s)) {
-          props.setProperty(s, "")
-        }
-      }
-
-      new SimpleConsumerConfig(brokers, props)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
deleted file mode 100644
index 7ff3a98..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.Map
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.ThreadUtils
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration parameters.
- *                    See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    _ssc: StreamingContext,
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    useReliableReceiver: Boolean,
-    storageLevel: StorageLevel
-  ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
-
-  def getReceiver(): Receiver[(K, V)] = {
-    if (!useReliableReceiver) {
-      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-    } else {
-      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
-    }
-  }
-}
-
-private[streaming]
-class KafkaReceiver[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag](
-    kafkaParams: Map[String, String],
-    topics: Map[String, Int],
-    storageLevel: StorageLevel
-  ) extends Receiver[(K, V)](storageLevel) with Logging {
-
-  // Connection to Kafka
-  var consumerConnector: ConsumerConnector = null
-
-  def onStop() {
-    if (consumerConnector != null) {
-      consumerConnector.shutdown()
-      consumerConnector = null
-    }
-  }
-
-  def onStart() {
-
-    logInfo("Starting Kafka Consumer Stream with group: " + 
kafkaParams("group.id"))
-
-    // Kafka connection properties
-    val props = new Properties()
-    kafkaParams.foreach(param => props.put(param._1, param._2))
-
-    val zkConnect = kafkaParams("zookeeper.connect")
-    // Create the connection to the cluster
-    logInfo("Connecting to Zookeeper: " + zkConnect)
-    val consumerConfig = new ConsumerConfig(props)
-    consumerConnector = Consumer.create(consumerConfig)
-    logInfo("Connected to " + zkConnect)
-
-    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[K]]
-    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(consumerConfig.props)
-      .asInstanceOf[Decoder[V]]
-
-    // Create threads for each topic/message Stream we are listening
-    val topicMessageStreams = consumerConnector.createMessageStreams(
-      topics, keyDecoder, valueDecoder)
-
-    val executorPool =
-      ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, 
"KafkaMessageHandler")
-    try {
-      // Start the messages handler for each partition
-      topicMessageStreams.values.foreach { streams =>
-        streams.foreach { stream => executorPool.submit(new 
MessageHandler(stream)) }
-      }
-    } finally {
-      executorPool.shutdown() // Just causes threads to terminate after work 
is done
-    }
-  }
-
-  // Handles Kafka messages
-  private class MessageHandler(stream: KafkaStream[K, V])
-    extends Runnable {
-    def run() {
-      logInfo("Starting MessageHandler.")
-      try {
-        val streamIterator = stream.iterator()
-        while (streamIterator.hasNext()) {
-          val msgAndMetadata = streamIterator.next()
-          store((msgAndMetadata.key, msgAndMetadata.message))
-        }
-      } catch {
-        case e: Throwable => reportError("Error handling message; exiting", e)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
deleted file mode 100644
index 791cf0e..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.api.{FetchRequestBuilder, FetchResponse}
-import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.consumer.SimpleConsumer
-import kafka.message.{MessageAndMetadata, MessageAndOffset}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
-import org.apache.spark.internal.Logging
-import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.NextIterator
-
-/**
- * A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
- * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
- * configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
- * @param messageHandler function for translating each message into the 
desired type
- */
-private[kafka]
-class KafkaRDD[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[_]: ClassTag,
-  T <: Decoder[_]: ClassTag,
-  R: ClassTag] private[spark] (
-    sc: SparkContext,
-    kafkaParams: Map[String, String],
-    val offsetRanges: Array[OffsetRange],
-    leaders: Map[TopicAndPartition, (String, Int)],
-    messageHandler: MessageAndMetadata[K, V] => R
-  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
-  override def getPartitions: Array[Partition] = {
-    offsetRanges.zipWithIndex.map { case (o, i) =>
-        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
-        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
-    }.toArray
-  }
-
-  override def count(): Long = offsetRanges.map(_.count).sum
-
-  override def countApprox(
-      timeout: Long,
-      confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-    val c = count
-    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
-
-  override def isEmpty(): Boolean = count == 0L
-
-  override def take(num: Int): Array[R] = {
-    val nonEmptyPartitions = this.partitions
-      .map(_.asInstanceOf[KafkaRDDPartition])
-      .filter(_.count > 0)
-
-    if (num < 1 || nonEmptyPartitions.isEmpty) {
-      return new Array[R](0)
-    }
-
-    // Determine in advance how many messages need to be taken from each 
partition
-    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
-      val remain = num - result.values.sum
-      if (remain > 0) {
-        val taken = Math.min(remain, part.count)
-        result + (part.index -> taken.toInt)
-      } else {
-        result
-      }
-    }
-
-    val buf = new ArrayBuffer[R]
-    val res = context.runJob(
-      this,
-      (tc: TaskContext, it: Iterator[R]) => 
it.take(parts(tc.partitionId)).toArray,
-      parts.keys.toArray)
-    res.foreach(buf ++= _)
-    buf.toArray
-  }
-
-  override def getPreferredLocations(thePart: Partition): Seq[String] = {
-    val part = thePart.asInstanceOf[KafkaRDDPartition]
-    // TODO is additional hostname resolution necessary here
-    Seq(part.host)
-  }
-
-  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
-    s"Beginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset} " +
-      s"for topic ${part.topic} partition ${part.partition}. " +
-      "You either provided an invalid fromOffset, or the Kafka topic has been 
damaged"
-
-  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
-    s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
-    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
-    " This should not happen, and indicates that messages may have been lost"
-
-  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
-    s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
-    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
-    " This should not happen, and indicates a message may have been skipped"
-
-  override def compute(thePart: Partition, context: TaskContext): Iterator[R] 
= {
-    val part = thePart.asInstanceOf[KafkaRDDPartition]
-    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
-    if (part.fromOffset == part.untilOffset) {
-      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
-        s"skipping ${part.topic} ${part.partition}")
-      Iterator.empty
-    } else {
-      new KafkaRDDIterator(part, context)
-    }
-  }
-
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets in 
partition.
-   */
-  private class KafkaRDDIterator(
-      part: KafkaRDDPartition,
-      context: TaskContext) extends NextIterator[R] {
-
-    context.addTaskCompletionListener{ context => closeIfNeeded() }
-
-    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
-      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-    val kc = new KafkaCluster(kafkaParams)
-    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(kc.config.props)
-      .asInstanceOf[Decoder[K]]
-    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
-      .newInstance(kc.config.props)
-      .asInstanceOf[Decoder[V]]
-    val consumer = connectLeader
-    var requestOffset = part.fromOffset
-    var iter: Iterator[MessageAndOffset] = null
-
-    // The idea is to use the provided preferred host, except on task retry 
attempts,
-    // to minimize number of kafka metadata requests
-    private def connectLeader: SimpleConsumer = {
-      if (context.attemptNumber > 0) {
-        kc.connectLeader(part.topic, part.partition).fold(
-          errs => throw new SparkException(
-            s"Couldn't connect to leader for topic ${part.topic} 
${part.partition}: " +
-              errs.mkString("\n")),
-          consumer => consumer
-        )
-      } else {
-        kc.connect(part.host, part.port)
-      }
-    }
-
-    private def handleFetchErr(resp: FetchResponse) {
-      if (resp.hasError) {
-        val err = resp.errorCode(part.topic, part.partition)
-        if (err == ErrorMapping.LeaderNotAvailableCode ||
-          err == ErrorMapping.NotLeaderForPartitionCode) {
-          logError(s"Lost leader for topic ${part.topic} partition 
${part.partition}, " +
-            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
-          Thread.sleep(kc.config.refreshLeaderBackoffMs)
-        }
-        // Let normal rdd retry sort out reconnect attempts
-        throw ErrorMapping.exceptionFor(err)
-      }
-    }
-
-    private def fetchBatch: Iterator[MessageAndOffset] = {
-      val req = new FetchRequestBuilder()
-        .clientId(consumer.clientId)
-        .addFetch(part.topic, part.partition, requestOffset, 
kc.config.fetchMessageMaxBytes)
-        .build()
-      val resp = consumer.fetch(req)
-      handleFetchErr(resp)
-      // kafka may return a batch that starts before the requested offset
-      resp.messageSet(part.topic, part.partition)
-        .iterator
-        .dropWhile(_.offset < requestOffset)
-    }
-
-    override def close(): Unit = {
-      if (consumer != null) {
-        consumer.close()
-      }
-    }
-
-    override def getNext(): R = {
-      if (iter == null || !iter.hasNext) {
-        iter = fetchBatch
-      }
-      if (!iter.hasNext) {
-        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
-        finished = true
-        null.asInstanceOf[R]
-      } else {
-        val item = iter.next()
-        if (item.offset >= part.untilOffset) {
-          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, 
part))
-          finished = true
-          null.asInstanceOf[R]
-        } else {
-          requestOffset = item.nextOffset
-          messageHandler(new MessageAndMetadata(
-            part.topic, part.partition, item.message, item.offset, keyDecoder, 
valueDecoder))
-        }
-      }
-    }
-  }
-}
-
-private[kafka]
-object KafkaRDD {
-  import KafkaCluster.LeaderOffset
-
-  /**
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   * configuration parameters</a>.
-   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
-   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
-   * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
-   *  starting point of the batch
-   * @param untilOffsets per-topic/partition Kafka offsets defining the 
(exclusive)
-   *  ending point of the batch
-   * @param messageHandler function for translating each message into the 
desired type
-   */
-  def apply[
-    K: ClassTag,
-    V: ClassTag,
-    U <: Decoder[_]: ClassTag,
-    T <: Decoder[_]: ClassTag,
-    R: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      fromOffsets: Map[TopicAndPartition, Long],
-      untilOffsets: Map[TopicAndPartition, LeaderOffset],
-      messageHandler: MessageAndMetadata[K, V] => R
-    ): KafkaRDD[K, V, U, T, R] = {
-    val leaders = untilOffsets.map { case (tp, lo) =>
-        tp -> ((lo.host, lo.port))
-    }
-
-    val offsetRanges = fromOffsets.map { case (tp, fo) =>
-        val uo = untilOffsets(tp)
-        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
-    }.toArray
-
-    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
deleted file mode 100644
index 02917be..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.Partition
-
-/**
- * @param topic kafka topic name
- * @param partition kafka partition id
- * @param fromOffset inclusive starting offset
- * @param untilOffset exclusive ending offset
- * @param host preferred kafka host, i.e. the leader at the time the rdd was 
created
- * @param port preferred kafka host's port
- */
-private[kafka]
-class KafkaRDDPartition(
-  val index: Int,
-  val topic: String,
-  val partition: Int,
-  val fromOffset: Long,
-  val untilOffset: Long,
-  val host: String,
-  val port: Int
-) extends Partition {
-  /** Number of messages this partition refers to */
-  def count(): Long = untilOffset - fromOffset
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
deleted file mode 100644
index ef19685..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.{File, IOException}
-import java.lang.{Integer => JInt}
-import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import kafka.admin.AdminUtils
-import kafka.api.Request
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.StringEncoder
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-import org.apache.commons.lang3.RandomUtils
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
-
-/**
- * This is a helper class for Kafka test suites. This has the functionality to 
set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- *
- * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
- */
-private[kafka] class KafkaTestUtils extends Logging {
-
-  // Zookeeper related configurations
-  private val zkHost = "localhost"
-  private var zkPort: Int = 0
-  private val zkConnectionTimeout = 60000
-  private val zkSessionTimeout = 6000
-
-  private var zookeeper: EmbeddedZookeeper = _
-
-  private var zkClient: ZkClient = _
-
-  // Kafka broker related configurations
-  private val brokerHost = "localhost"
-  // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random 
port
-  private var brokerPort = RandomUtils.nextInt(1024, 65536)
-  private var brokerConf: KafkaConfig = _
-
-  // Kafka broker server
-  private var server: KafkaServer = _
-
-  // Kafka producer
-  private var producer: Producer[String, String] = _
-
-  // Flag to test whether the system is correctly started
-  private var zkReady = false
-  private var brokerReady = false
-
-  def zkAddress: String = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
-    s"$zkHost:$zkPort"
-  }
-
-  def brokerAddress: String = {
-    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
-    s"$brokerHost:$brokerPort"
-  }
-
-  def zookeeperClient: ZkClient = {
-    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
-    Option(zkClient).getOrElse(
-      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
-  }
-
-  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
-  private def setupEmbeddedZookeeper(): Unit = {
-    // Zookeeper server startup
-    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
-    // Get the actual zookeeper binding port
-    zkPort = zookeeper.actualPort
-    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout,
-      ZKStringSerializer)
-    zkReady = true
-  }
-
-  // Set up the Embedded Kafka server
-  private def setupEmbeddedKafkaServer(): Unit = {
-    assert(zkReady, "Zookeeper should be set up beforehand")
-
-    // Kafka broker startup
-    Utils.startServiceOnPort(brokerPort, port => {
-      brokerPort = port
-      brokerConf = new KafkaConfig(brokerConfiguration)
-      server = new KafkaServer(brokerConf)
-      server.startup()
-      (server, brokerPort)
-    }, new SparkConf(), "KafkaBroker")
-
-    brokerReady = true
-  }
-
-  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
-  def setup(): Unit = {
-    setupEmbeddedZookeeper()
-    setupEmbeddedKafkaServer()
-  }
-
-  /** Teardown the whole servers, including Kafka broker and Zookeeper */
-  def teardown(): Unit = {
-    brokerReady = false
-    zkReady = false
-
-    if (producer != null) {
-      producer.close()
-      producer = null
-    }
-
-    if (server != null) {
-      server.shutdown()
-      server.awaitShutdown()
-      server = null
-    }
-
-    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
-    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
-    // throws an exception.
-    brokerConf.logDirs.foreach { f =>
-      try {
-        Utils.deleteRecursively(new File(f))
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-    }
-
-    if (zkClient != null) {
-      zkClient.close()
-      zkClient = null
-    }
-
-    if (zookeeper != null) {
-      zookeeper.shutdown()
-      zookeeper = null
-    }
-  }
-
-  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-    AdminUtils.createTopic(zkClient, topic, partitions, 1)
-    // wait until metadata is propagated
-    (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, 
p) }
-  }
-
-  /** Single-argument version for backwards compatibility */
-  def createTopic(topic: String): Unit = createTopic(topic, 1)
-
-  /** Java-friendly function for sending messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
-    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /** Send the messages to the Kafka broker */
-  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
-    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
-    sendMessages(topic, messages)
-  }
-
-  /** Send the array of messages to the Kafka broker */
-  def sendMessages(topic: String, messages: Array[String]): Unit = {
-    producer = new Producer[String, String](new 
ProducerConfig(producerConfiguration))
-    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) 
}: _*)
-    producer.close()
-    producer = null
-  }
-
-  private def brokerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("broker.id", "0")
-    props.put("host.name", "localhost")
-    props.put("port", brokerPort.toString)
-    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
-    props.put("zookeeper.connect", zkAddress)
-    props.put("log.flush.interval.messages", "1")
-    props.put("replica.socket.timeout.ms", "1500")
-    props
-  }
-
-  private def producerConfiguration: Properties = {
-    val props = new Properties()
-    props.put("metadata.broker.list", brokerAddress)
-    props.put("serializer.class", classOf[StringEncoder].getName)
-    // wait for all in-sync replicas to ack sends
-    props.put("request.required.acks", "-1")
-    props
-  }
-
-  // A simplified version of scalatest eventually, rewritten here to avoid 
adding extra test
-  // dependency
-  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
-    def makeAttempt(): Either[Throwable, T] = {
-      try {
-        Right(func)
-      } catch {
-        case e if NonFatal(e) => Left(e)
-      }
-    }
-
-    val startTime = System.currentTimeMillis()
-    @tailrec
-    def tryAgain(attempt: Int): T = {
-      makeAttempt() match {
-        case Right(result) => result
-        case Left(e) =>
-          val duration = System.currentTimeMillis() - startTime
-          if (duration < timeout.milliseconds) {
-            Thread.sleep(interval.milliseconds)
-          } else {
-            throw new TimeoutException(e.getMessage)
-          }
-
-          tryAgain(attempt + 1)
-      }
-    }
-
-    tryAgain(1)
-  }
-
-  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
-    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
-      case Some(partitionState) =>
-        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
-        ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
-          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
-          leaderAndInSyncReplicas.isr.size >= 1
-
-      case _ =>
-        false
-    }
-    eventually(Time(10000), Time(100)) {
-      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
-    }
-  }
-
-  private class EmbeddedZookeeper(val zkConnect: String) {
-    val snapshotDir = Utils.createTempDir()
-    val logDir = Utils.createTempDir()
-
-    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
-    val (ip, port) = {
-      val splits = zkConnect.split(":")
-      (splits(0), splits(1).toInt)
-    }
-    val factory = new NIOServerCnxnFactory()
-    factory.configure(new InetSocketAddress(ip, port), 16)
-    factory.startup(zookeeper)
-
-    val actualPort = factory.getLocalPort
-
-    def shutdown() {
-      factory.shutdown()
-      // The directories are not closed even if the ZooKeeper server is shut 
down.
-      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test 
failures
-      // on Windows if the directory deletion failure throws an exception.
-      try {
-        Utils.deleteRecursively(snapshotDir)
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-      try {
-        Utils.deleteRecursively(logDir)
-      } catch {
-        case e: IOException if Utils.isWindows =>
-          logWarning(e.getMessage)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
deleted file mode 100644
index 36082e9..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ /dev/null
@@ -1,806 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong, Number => JNumber}
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList, Locale, Map => JMap, Set => JSet}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
-import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.api.python.SerDeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java._
-import org.apache.spark.streaming.dstream.{DStream, InputDStream, 
ReceiverInputDStream}
-import org.apache.spark.streaming.util.WriteAheadLogUtils
-
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-object KafkaUtils {
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param ssc       StreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
-   * @param groupId   The group id for this consumer
-   * @param topics    Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread
-   * @param storageLevel  Storage level to use for storing the received objects
-   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      ssc: StreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: Map[String, Int],
-      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
-    ): ReceiverInputDStream[(String, String)] = {
-    val kafkaParams = Map[String, String](
-      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
-      "zookeeper.connection.timeout.ms" -> "10000")
-    createStream[String, String, StringDecoder, StringDecoder](
-      ssc, kafkaParams, topics, storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param ssc         StreamingContext object
-   * @param kafkaParams Map of kafka configuration parameters,
-   *                    see http://kafka.apache.org/08/configuration.html
-   * @param topics      Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
-   *                    in its own thread.
-   * @param storageLevel Storage level to use for storing the received objects
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam U type of Kafka message key decoder
-   * @tparam T type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: 
Decoder[_]: ClassTag](
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      topics: Map[String, Int],
-      storageLevel: StorageLevel
-    ): ReceiverInputDStream[(K, V)] = {
-    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
-    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, 
storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * Storage level of the data will be the default 
StorageLevel.MEMORY_AND_DISK_SER_2.
-   * @param jssc      JavaStreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
-   * @param groupId   The group id for this consumer
-   * @param topics    Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt]
-    ): JavaPairReceiverInputDStream[String, String] = {
-    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param jssc      JavaStreamingContext object
-   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
-   * @param groupId   The group id for this consumer.
-   * @param topics    Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
-   *                  in its own thread.
-   * @param storageLevel RDD storage level.
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream(
-      jssc: JavaStreamingContext,
-      zkQuorum: String,
-      groupId: String,
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairReceiverInputDStream[String, String] = {
-    createStream(jssc.ssc, zkQuorum, groupId, 
Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /**
-   * Create an input stream that pulls messages from Kafka Brokers.
-   * @param jssc      JavaStreamingContext object
-   * @param keyTypeClass Key type of DStream
-   * @param valueTypeClass value type of Dstream
-   * @param keyDecoderClass Type of kafka key decoder
-   * @param valueDecoderClass Type of kafka value decoder
-   * @param kafkaParams Map of kafka configuration parameters,
-   *                    see http://kafka.apache.org/08/configuration.html
-   * @param topics  Map of (topic_name to numPartitions) to consume. Each 
partition is consumed
-   *                in its own thread
-   * @param storageLevel RDD storage level.
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam U type of Kafka message key decoder
-   * @tparam T type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
-      jssc: JavaStreamingContext,
-      keyTypeClass: Class[K],
-      valueTypeClass: Class[V],
-      keyDecoderClass: Class[U],
-      valueDecoderClass: Class[T],
-      kafkaParams: JMap[String, String],
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel
-    ): JavaPairReceiverInputDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
-
-    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
-    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
-
-    createStream[K, V, U, T](
-      jssc.ssc,
-      kafkaParams.asScala.toMap,
-      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
-      storageLevel)
-  }
-
-  /** get leaders for the given offset ranges, or throw an exception */
-  private def leadersForRanges(
-      kc: KafkaCluster,
-      offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] 
= {
-    val topics = offsetRanges.map(o => TopicAndPartition(o.topic, 
o.partition)).toSet
-    val leaders = kc.findLeaders(topics)
-    KafkaCluster.checkErrors(leaders)
-  }
-
-  /** Make sure offsets are available in kafka, or throw an exception */
-  private def checkOffsets(
-      kc: KafkaCluster,
-      offsetRanges: Array[OffsetRange]): Unit = {
-    val topics = offsetRanges.map(_.topicAndPartition).toSet
-    val result = for {
-      low <- kc.getEarliestLeaderOffsets(topics).right
-      high <- kc.getLatestLeaderOffsets(topics).right
-    } yield {
-      offsetRanges.filterNot { o =>
-        low(o.topicAndPartition).offset <= o.fromOffset &&
-        o.untilOffset <= high(o.topicAndPartition).offset
-      }
-    }
-    val badRanges = KafkaCluster.checkErrors(result)
-    if (!badRanges.isEmpty) {
-      throw new SparkException("Offsets not available on leader: " + 
badRanges.mkString(","))
-    }
-  }
-
-  private[kafka] def getFromOffsets(
-      kc: KafkaCluster,
-      kafkaParams: Map[String, String],
-      topics: Set[String]
-    ): Map[TopicAndPartition, Long] = {
-    val reset = 
kafkaParams.get("auto.offset.reset").map(_.toLowerCase(Locale.ROOT))
-    val result = for {
-      topicPartitions <- kc.getPartitions(topics).right
-      leaderOffsets <- (if (reset == Some("smallest")) {
-        kc.getEarliestLeaderOffsets(topicPartitions)
-      } else {
-        kc.getLatestLeaderOffsets(topicPartitions)
-      }).right
-    } yield {
-      leaderOffsets.map { case (tp, lo) =>
-          (tp, lo.offset)
-      }
-    }
-    KafkaCluster.checkErrors(result)
-  }
-
-  /**
-   * Create an RDD from Kafka using offset ranges for each topic and partition.
-   *
-   * @param sc SparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return RDD of (Kafka message key, Kafka message value)
-   */
-  def createRDD[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      offsetRanges: Array[OffsetRange]
-    ): RDD[(K, V)] = sc.withScope {
-    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
-    val kc = new KafkaCluster(kafkaParams)
-    val leaders = leadersForRanges(kc, offsetRanges)
-    checkOffsets(kc, offsetRanges)
-    new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
-  }
-
-  /**
-   * Create an RDD from Kafka using offset ranges for each topic and 
partition. This allows you
-   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
-   * as the metadata.
-   *
-   * @param sc SparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
-   *   in which case leaders will be looked up on the driver.
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return RDD of R
-   */
-  def createRDD[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag,
-    R: ClassTag](
-      sc: SparkContext,
-      kafkaParams: Map[String, String],
-      offsetRanges: Array[OffsetRange],
-      leaders: Map[TopicAndPartition, Broker],
-      messageHandler: MessageAndMetadata[K, V] => R
-    ): RDD[R] = sc.withScope {
-    val kc = new KafkaCluster(kafkaParams)
-    val leaderMap = if (leaders.isEmpty) {
-      leadersForRanges(kc, offsetRanges)
-    } else {
-      // This could be avoided by refactoring KafkaRDD.leaders and 
KafkaCluster to use Broker
-      leaders.map {
-        case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
-      }
-    }
-    val cleanedHandler = sc.clean(messageHandler)
-    checkOffsets(kc, offsetRanges)
-    new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, 
cleanedHandler)
-  }
-
-  /**
-   * Create an RDD from Kafka using offset ranges for each topic and partition.
-   *
-   * @param jsc JavaSparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param keyClass type of Kafka message key
-   * @param valueClass type of Kafka message value
-   * @param keyDecoderClass type of Kafka message key decoder
-   * @param valueDecoderClass type of Kafka message value decoder
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return RDD of (Kafka message key, Kafka message value)
-   */
-  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
-      jsc: JavaSparkContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      kafkaParams: JMap[String, String],
-      offsetRanges: Array[OffsetRange]
-    ): JavaPairRDD[K, V] = jsc.sc.withScope {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    new JavaPairRDD(createRDD[K, V, KD, VD](
-      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
-  }
-
-  /**
-   * Create an RDD from Kafka using offset ranges for each topic and 
partition. This allows you
-   * specify the Kafka leader to connect to (to optimize fetching) and access 
the message as well
-   * as the metadata.
-   *
-   * @param jsc JavaSparkContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param offsetRanges Each OffsetRange in the batch corresponds to a
-   *   range of offsets for a given Kafka topic/partition
-   * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges.  
May be an empty map,
-   *   in which case leaders will be looked up on the driver.
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return RDD of R
-   */
-  def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
-      jsc: JavaSparkContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      recordClass: Class[R],
-      kafkaParams: JMap[String, String],
-      offsetRanges: Array[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker],
-      messageHandler: JFunction[MessageAndMetadata[K, V], R]
-    ): JavaRDD[R] = jsc.sc.withScope {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
-    val leaderMap = Map(leaders.asScala.toSeq: _*)
-    createRDD[K, V, KD, VD, R](
-      jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, 
messageHandler.call(_))
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the `StreamingContext`. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param ssc StreamingContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *    configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
-   *    host1:port1,host2:port2 form.
-   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
-   *    starting point of the stream
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return DStream of R
-   */
-  def createDirectStream[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag,
-    R: ClassTag] (
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      fromOffsets: Map[TopicAndPartition, Long],
-      messageHandler: MessageAndMetadata[K, V] => R
-  ): InputDStream[R] = {
-    val cleanedHandler = ssc.sc.clean(messageHandler)
-    new DirectKafkaInputDStream[K, V, KD, VD, R](
-      ssc, kafkaParams, fromOffsets, cleanedHandler)
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the `StreamingContext`. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param ssc StreamingContext object
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
-   *   to determine where the stream starts (defaults to "largest")
-   * @param topics Names of the topics to consume
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createDirectStream[
-    K: ClassTag,
-    V: ClassTag,
-    KD <: Decoder[K]: ClassTag,
-    VD <: Decoder[V]: ClassTag] (
-      ssc: StreamingContext,
-      kafkaParams: Map[String, String],
-      topics: Set[String]
-  ): InputDStream[(K, V)] = {
-    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, 
mmd.message)
-    val kc = new KafkaCluster(kafkaParams)
-    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
-    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
-      ssc, kafkaParams, fromOffsets, messageHandler)
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the `StreamingContext`. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param jssc JavaStreamingContext object
-   * @param keyClass Class of the keys in the Kafka records
-   * @param valueClass Class of the values in the Kafka records
-   * @param keyDecoderClass Class of the key decoder
-   * @param valueDecoderClass Class of the value decoder
-   * @param recordClass Class of the records in DStream
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   * @param fromOffsets Per-topic/partition Kafka offsets defining the 
(inclusive)
-   *    starting point of the stream
-   * @param messageHandler Function for translating each message and metadata 
into the desired type
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @tparam R type returned by messageHandler
-   * @return DStream of R
-   */
-  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
-      jssc: JavaStreamingContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      recordClass: Class[R],
-      kafkaParams: JMap[String, String],
-      fromOffsets: JMap[TopicAndPartition, JLong],
-      messageHandler: JFunction[MessageAndMetadata[K, V], R]
-    ): JavaInputDStream[R] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
-    val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
-    createDirectStream[K, V, KD, VD, R](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
-      cleanedHandler
-    )
-  }
-
-  /**
-   * Create an input stream that directly pulls messages from Kafka Brokers
-   * without using any receiver. This stream can guarantee that each message
-   * from Kafka is included in transformations exactly once (see points below).
-   *
-   * Points to note:
-   *  - No receivers: This stream does not use any receiver. It directly 
queries Kafka
-   *  - Offsets: This does not use Zookeeper to store offsets. The consumed 
offsets are tracked
-   *    by the stream itself. For interoperability with Kafka monitoring tools 
that depend on
-   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
-   *    You can access the offsets used in each batch from the generated RDDs 
(see
-   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
-   *  - Failure Recovery: To recover from driver failures, you have to enable 
checkpointing
-   *    in the `StreamingContext`. The information on consumed offset can be
-   *    recovered from the checkpoint. See the programming guide for details 
(constraints, etc.).
-   *  - End-to-end semantics: This stream ensures that every records is 
effectively received and
-   *    transformed exactly once, but gives no guarantees on whether the 
transformed data are
-   *    outputted exactly once. For end-to-end exactly-once semantics, you 
have to either ensure
-   *    that the output operation is idempotent, or use transactions to output 
records atomically.
-   *    See the programming guide for more details.
-   *
-   * @param jssc JavaStreamingContext object
-   * @param keyClass Class of the keys in the Kafka records
-   * @param valueClass Class of the values in the Kafka records
-   * @param keyDecoderClass Class of the key decoder
-   * @param valueDecoderClass Class type of the value decoder
-   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
-   *   configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers"
-   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
-   *   host1:port1,host2:port2 form.
-   *   If not starting from a checkpoint, "auto.offset.reset" may be set to 
"largest" or "smallest"
-   *   to determine where the stream starts (defaults to "largest")
-   * @param topics Names of the topics to consume
-   * @tparam K type of Kafka message key
-   * @tparam V type of Kafka message value
-   * @tparam KD type of Kafka message key decoder
-   * @tparam VD type of Kafka message value decoder
-   * @return DStream of (Kafka message key, Kafka message value)
-   */
-  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
-      jssc: JavaStreamingContext,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      keyDecoderClass: Class[KD],
-      valueDecoderClass: Class[VD],
-      kafkaParams: JMap[String, String],
-      topics: JSet[String]
-    ): JavaPairInputDStream[K, V] = {
-    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
-    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
-    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
-    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
-    createDirectStream[K, V, KD, VD](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Set(topics.asScala.toSeq: _*)
-    )
-  }
-}
-
-/**
- * This is a helper class that wraps the KafkaUtils.createStream() into more
- * Python-friendly class and function so that it can be easily
- * instantiated and called from Python's KafkaUtils.
- *
- * The zero-arg constructor helps instantiate this class from the Class object
- * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
- * takes care of known parameters instead of passing them from Python
- */
-private[kafka] class KafkaUtilsPythonHelper {
-  import KafkaUtilsPythonHelper._
-
-  def createStream(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JMap[String, JInt],
-      storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], 
Array[Byte]] = {
-    KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder](
-      jssc,
-      classOf[Array[Byte]],
-      classOf[Array[Byte]],
-      classOf[DefaultDecoder],
-      classOf[DefaultDecoder],
-      kafkaParams,
-      topics,
-      storageLevel)
-  }
-
-  def createRDDWithoutMessageHandler(
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], 
Array[Byte])] = {
-    val messageHandler =
-      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
-    new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler))
-  }
-
-  def createRDDWithMessageHandler(
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
-    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
-      new PythonMessageAndMetadata(
-        mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
-    val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, 
messageHandler).
-        mapPartitions(picklerIterator)
-    new JavaRDD(rdd)
-  }
-
-  private def createRDD[V: ClassTag](
-      jsc: JavaSparkContext,
-      kafkaParams: JMap[String, String],
-      offsetRanges: JList[OffsetRange],
-      leaders: JMap[TopicAndPartition, Broker],
-      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
RDD[V] = {
-    KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
-      jsc.sc,
-      kafkaParams.asScala.toMap,
-      offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
-      leaders.asScala.toMap,
-      messageHandler
-    )
-  }
-
-  def createDirectStreamWithoutMessageHandler(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JNumber]): 
JavaDStream[(Array[Byte], Array[Byte])] = {
-    val messageHandler =
-      (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, 
mmd.message)
-    new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler))
-  }
-
-  def createDirectStreamWithMessageHandler(
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JNumber]): JavaDStream[Array[Byte]] 
= {
-    val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
-      new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, 
mmd.key(), mmd.message())
-    val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, 
messageHandler).
-      mapPartitions(picklerIterator)
-    new JavaDStream(stream)
-  }
-
-  private def createDirectStream[V: ClassTag](
-      jssc: JavaStreamingContext,
-      kafkaParams: JMap[String, String],
-      topics: JSet[String],
-      fromOffsets: JMap[TopicAndPartition, JNumber],
-      messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): 
DStream[V] = {
-
-    val currentFromOffsets = if (!fromOffsets.isEmpty) {
-      val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
-      if (topicsFromOffsets != topics.asScala.toSet) {
-        throw new IllegalStateException(
-          s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
-          s"do not equal to the topic from offsets: 
${topicsFromOffsets.mkString(" ")}")
-      }
-      Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
-    } else {
-      val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
-      KafkaUtils.getFromOffsets(
-        kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
-    }
-
-    KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, 
DefaultDecoder, V](
-      jssc.ssc,
-      Map(kafkaParams.asScala.toSeq: _*),
-      Map(currentFromOffsets.toSeq: _*),
-      messageHandler)
-  }
-
-  def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, 
untilOffset: JLong
-    ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, 
untilOffset)
-
-  def createTopicAndPartition(topic: String, partition: JInt): 
TopicAndPartition =
-    TopicAndPartition(topic, partition)
-
-  def createBroker(host: String, port: JInt): Broker = Broker(host, port)
-
-  def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
-    val parentRDDs = rdd.getNarrowAncestors
-    val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, 
_, _, _]])
-
-    require(
-      kafkaRDDs.length == 1,
-      "Cannot get offset ranges, as there may be multiple Kafka RDDs or no 
Kafka RDD associated" +
-        "with this RDD, please call this method only on a Kafka RDD.")
-
-    val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
-    kafkaRDD.offsetRanges.toSeq.asJava
-  }
-}
-
-private object KafkaUtilsPythonHelper {
-  private var initialized = false
-
-  def initialize(): Unit = {
-    SerDeUtil.initialize()
-    synchronized {
-      if (!initialized) {
-        new PythonMessageAndMetadataPickler().register()
-        initialized = true
-      }
-    }
-  }
-
-  initialize()
-
-  def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
-    new SerDeUtil.AutoBatchedPickler(iter)
-  }
-
-  case class PythonMessageAndMetadata(
-      topic: String,
-      partition: JInt,
-      offset: JLong,
-      key: Array[Byte],
-      message: Array[Byte])
-
-  class PythonMessageAndMetadataPickler extends IObjectPickler {
-    private val module = "pyspark.streaming.kafka"
-
-    def register(): Unit = {
-      Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
-      Pickler.registerCustomPickler(this.getClass, this)
-    }
-
-    def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
-      if (obj == this) {
-        out.write(Opcodes.GLOBAL)
-        
out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
-      } else {
-        pickler.save(this)
-        val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
-        out.write(Opcodes.MARK)
-        pickler.save(msgAndMetaData.topic)
-        pickler.save(msgAndMetaData.partition)
-        pickler.save(msgAndMetaData.offset)
-        pickler.save(msgAndMetaData.key)
-        pickler.save(msgAndMetaData.message)
-        out.write(Opcodes.TUPLE)
-        out.write(Opcodes.REDUCE)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
deleted file mode 100644
index 6dab5f9..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import kafka.common.TopicAndPartition
-
-/**
- * Represents any object that has a collection of [[OffsetRange]]s. This can 
be used to access the
- * offset ranges in RDDs generated by the direct Kafka DStream (see
- * `KafkaUtils.createDirectStream()`).
- * {{{
- *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
- *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- *      ...
- *   }
- * }}}
- */
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-trait HasOffsetRanges {
-  def offsetRanges: Array[OffsetRange]
-}
-
-/**
- * Represents a range of offsets from a single Kafka TopicAndPartition. 
Instances of this class
- * can be created with `OffsetRange.create()`.
- * @param topic Kafka topic name
- * @param partition Kafka partition id
- * @param fromOffset Inclusive starting offset
- * @param untilOffset Exclusive ending offset
- */
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-final class OffsetRange private(
-    val topic: String,
-    val partition: Int,
-    val fromOffset: Long,
-    val untilOffset: Long) extends Serializable {
-  import OffsetRange.OffsetRangeTuple
-
-  /** Kafka TopicAndPartition object, for convenience */
-  def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, 
partition)
-
-  /** Number of messages this OffsetRange refers to */
-  def count(): Long = untilOffset - fromOffset
-
-  override def equals(obj: Any): Boolean = obj match {
-    case that: OffsetRange =>
-      this.topic == that.topic &&
-        this.partition == that.partition &&
-        this.fromOffset == that.fromOffset &&
-        this.untilOffset == that.untilOffset
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    toTuple.hashCode()
-  }
-
-  override def toString(): String = {
-    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset])"
-  }
-
-  /** this is to avoid ClassNotFoundException during checkpoint restore */
-  private[streaming]
-  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
-}
-
-/**
- * Companion object the provides methods to create instances of 
[[OffsetRange]].
- */
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-object OffsetRange {
-  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
-    new OffsetRange(topic, partition, fromOffset, untilOffset)
-
-  def create(
-      topicAndPartition: TopicAndPartition,
-      fromOffset: Long,
-      untilOffset: Long): OffsetRange =
-    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
-
-  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
-    new OffsetRange(topic, partition, fromOffset, untilOffset)
-
-  def apply(
-      topicAndPartition: TopicAndPartition,
-      fromOffset: Long,
-      untilOffset: Long): OffsetRange =
-    new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, 
fromOffset, untilOffset)
-
-  /** this is to avoid ClassNotFoundException during checkpoint restore */
-  private[kafka]
-  type OffsetRangeTuple = (String, Int, Long, Long)
-
-  private[kafka]
-  def apply(t: OffsetRangeTuple) =
-    new OffsetRange(t._1, t._2, t._3, t._4)
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to