[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer 
API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger <c...@koeninger.org>

Closes #11863 from koeninger/kafka-0.9.

(cherry picked from commit dedbceec1ef33ccd88101016de969a1ef3e3e142)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134f116
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134f116
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134f116

Branch: refs/heads/branch-2.0
Commit: 3134f116a3565c3a299fa2e7094acd7304d64280
Parents: a548523
Author: cody koeninger <c...@koeninger.org>
Authored: Wed Jun 29 23:21:03 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Jun 29 23:21:15 2016 -0700

----------------------------------------------------------------------
 external/kafka-0-10-assembly/pom.xml            | 176 ++++++
 external/kafka-0-10/pom.xml                     |  98 +++
 .../kafka010/CachedKafkaConsumer.scala          | 189 ++++++
 .../streaming/kafka010/ConsumerStrategy.scala   | 314 ++++++++++
 .../kafka010/DirectKafkaInputDStream.scala      | 318 ++++++++++
 .../spark/streaming/kafka010/KafkaRDD.scala     | 232 +++++++
 .../streaming/kafka010/KafkaRDDPartition.scala  |  45 ++
 .../streaming/kafka010/KafkaTestUtils.scala     | 277 +++++++++
 .../spark/streaming/kafka010/KafkaUtils.scala   | 175 ++++++
 .../streaming/kafka010/LocationStrategy.scala   |  77 +++
 .../spark/streaming/kafka010/OffsetRange.scala  | 153 +++++
 .../spark/streaming/kafka010/package-info.java  |  21 +
 .../spark/streaming/kafka010/package.scala      |  23 +
 .../kafka010/JavaConsumerStrategySuite.java     |  84 +++
 .../kafka010/JavaDirectKafkaStreamSuite.java    | 180 ++++++
 .../streaming/kafka010/JavaKafkaRDDSuite.java   | 122 ++++
 .../kafka010/JavaLocationStrategySuite.java     |  58 ++
 .../src/test/resources/log4j.properties         |  28 +
 .../kafka010/DirectKafkaStreamSuite.scala       | 612 +++++++++++++++++++
 .../streaming/kafka010/KafkaRDDSuite.scala      | 169 +++++
 pom.xml                                         |   2 +
 project/SparkBuild.scala                        |  12 +-
 22 files changed, 3359 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-assembly/pom.xml 
b/external/kafka-0-10-assembly/pom.xml
new file mode 100644
index 0000000..f2468d1
--- /dev/null
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -0,0 +1,176 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-10-assembly_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Integration for Kafka 0.10 Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-kafka-0-10-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!--
+      Demote already included in the Spark assembly.
+    -->
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.jpountz.lz4</groupId>
+      <artifactId>lz4</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>${avro.mapred.classifier}</classifier>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.java.dev.jets3t</groupId>
+      <artifactId>jets3t</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+  
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+  
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-shade-plugin</artifactId>
+      <configuration>
+        <shadedArtifactAttached>false</shadedArtifactAttached>
+        <artifactSet>
+          <includes>
+            <include>*:*</include>
+          </includes>
+        </artifactSet>
+        <filters>
+          <filter>
+            <artifact>*:*</artifact>
+            <excludes>
+              <exclude>META-INF/*.SF</exclude>
+              <exclude>META-INF/*.DSA</exclude>
+              <exclude>META-INF/*.RSA</exclude>
+            </excludes>
+          </filter>
+        </filters>
+      </configuration>
+      <executions>
+        <execution>
+          <phase>package</phase>
+          <goals>
+            <goal>shade</goal>
+          </goals>
+          <configuration>
+            <transformers>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                <resource>reference.conf</resource>
+              </transformer>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                <resource>log4j.properties</resource>
+              </transformer>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+            </transformers>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+</build>
+</project>
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
new file mode 100644
index 0000000..50395f6
--- /dev/null
+++ b/external/kafka-0-10/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-kafka-0-10</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Integration for Kafka 0.10</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>0.10.0.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.jopt-simple</groupId>
+      <artifactId>jopt-simple</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
new file mode 100644
index 0000000..fa3ea61
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, 
KafkaConsumer }
+import org.apache.kafka.common.{ KafkaException, TopicPartition }
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+
+
+/**
+ * Consumer of single topicpartition, intended for cached reuse.
+ * Underlying consumer is not threadsafe, so neither is this,
+ * but processing the same topicpartition and group id in multiple threads is 
usually bad anyway.
+ */
+private[kafka010]
+class CachedKafkaConsumer[K, V] private(
+  val groupId: String,
+  val topic: String,
+  val partition: Int,
+  val kafkaParams: ju.Map[String, Object]) extends Logging {
+
+  assert(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG),
+    "groupId used for cache key must match the groupId in kafkaParams")
+
+  val topicPartition = new TopicPartition(topic, partition)
+
+  protected val consumer = {
+    val c = new KafkaConsumer[K, V](kafkaParams)
+    val tps = new ju.ArrayList[TopicPartition]()
+    tps.add(topicPartition)
+    c.assign(tps)
+    c
+  }
+
+  // TODO if the buffer was kept around as a random-access structure,
+  // could possibly optimize re-calculating of an RDD in the same batch
+  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var nextOffset = -2L
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Get the record for the given offset, waiting up to timeout ms if IO is 
necessary.
+   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
+   */
+  def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
+    logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested 
$offset")
+    if (offset != nextOffset) {
+      logInfo(s"Initial fetch for $groupId $topic $partition $offset")
+      seek(offset)
+      poll(timeout)
+    }
+
+    if (!buffer.hasNext()) { poll(timeout) }
+    assert(buffer.hasNext(),
+      s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+    var record = buffer.next()
+
+    if (record.offset != offset) {
+      logInfo(s"Buffer miss for $groupId $topic $partition $offset")
+      seek(offset)
+      poll(timeout)
+      assert(buffer.hasNext(),
+        s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
+      record = buffer.next()
+      assert(record.offset == offset,
+        s"Got wrong record for $groupId $topic $partition even after seeking 
to offset $offset")
+    }
+
+    nextOffset = offset + 1
+    record
+  }
+
+  private def seek(offset: Long): Unit = {
+    logDebug(s"Seeking to $topicPartition $offset")
+    consumer.seek(topicPartition, offset)
+  }
+
+  private def poll(timeout: Long): Unit = {
+    val p = consumer.poll(timeout)
+    val r = p.records(topicPartition)
+    logDebug(s"Polled ${p.partitions()}  ${r.size}")
+    buffer = r.iterator
+  }
+
+}
+
+private[kafka010]
+object CachedKafkaConsumer extends Logging {
+
+  private case class CacheKey(groupId: String, topic: String, partition: Int)
+
+  // Don't want to depend on guava, don't want a cleanup thread, use a simple 
LinkedHashMap
+  private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = 
null
+
+  /** Must be called before get, once per JVM, to configure the cache. Further 
calls are ignored */
+  def init(
+      initialCapacity: Int,
+      maxCapacity: Int,
+      loadFactor: Float): Unit = CachedKafkaConsumer.synchronized {
+    if (null == cache) {
+      logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor")
+      cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]](
+        initialCapacity, loadFactor, true) {
+        override def removeEldestEntry(
+          entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = 
{
+          if (this.size > maxCapacity) {
+            try {
+              entry.getValue.consumer.close()
+            } catch {
+              case x: KafkaException =>
+                logError("Error closing oldest Kafka consumer", x)
+            }
+            true
+          } else {
+            false
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a cached consumer for groupId, assigned to topic and partition.
+   * If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   */
+  def get[K, V](
+      groupId: String,
+      topic: String,
+      partition: Int,
+      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+    CachedKafkaConsumer.synchronized {
+      val k = CacheKey(groupId, topic, partition)
+      val v = cache.get(k)
+      if (null == v) {
+        logInfo(s"Cache miss for $k")
+        logDebug(cache.keySet.toString)
+        val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
+        cache.put(k, c)
+        c
+      } else {
+        // any given topicpartition should have a consistent key and value type
+        v.asInstanceOf[CachedKafkaConsumer[K, V]]
+      }
+    }
+
+  /**
+   * Get a fresh new instance, unassociated with the global cache.
+   * Caller is responsible for closing
+   */
+  def getUncached[K, V](
+      groupId: String,
+      topic: String,
+      partition: Int,
+      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] =
+    new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams)
+
+  /** remove consumer for given groupId, topic, and partition, if it exists */
+  def remove(groupId: String, topic: String, partition: Int): Unit = {
+    val k = CacheKey(groupId, topic, partition)
+    logInfo(s"Removing $k from cache")
+    val v = CachedKafkaConsumer.synchronized {
+      cache.remove(k)
+    }
+    if (null != v) {
+      v.close()
+      logInfo(s"Removed $k from cache")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
new file mode 100644
index 0000000..079a07d
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ * :: Experimental ::
+ * Choice of how to create and configure underlying Kafka Consumers on driver 
and executors.
+ * Kafka 0.10 consumers can require additional, sometimes complex, setup after 
object
+ *  instantiation. This interface encapsulates that process, and allows it to 
be checkpointed.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+@Experimental
+trait ConsumerStrategy[K, V] {
+  /**
+   * Kafka <a 
href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on executors. Requires 
"bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  def executorKafkaParams: ju.Map[String, Object]
+
+  /**
+   * Must return a fully configured Kafka Consumer, including subscribed or 
assigned topics.
+   * This consumer will be used on the driver to query for offsets only, not 
messages.
+   * @param currentOffsets A map from TopicPartition to offset, indicating how 
far the driver
+   * has successfully read.  Will be empty on initial start, possibly 
non-empty on restart from
+   * checkpoint.
+   */
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V]
+}
+
+/**
+ * :: Experimental ::
+ * Subscribe to a collection of topics.
+ * @param topics collection of topics to subscribe
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+ * configuration parameters</a> to be used on driver. The same params will be 
used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Subscribe[K, V] private(
+    topics: ju.Collection[java.lang.String],
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, Long]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.subscribe(topics)
+    if (currentOffsets.isEmpty) {
+      offsets.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object for creating [[Subscribe]] strategy
+ */
+@Experimental
+object Subscribe {
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def apply[K, V](
+      topics: Iterable[java.lang.String],
+      kafkaParams: collection.Map[String, Object],
+      offsets: collection.Map[TopicPartition, Long]): Subscribe[K, V] = {
+    Subscribe[K, V](
+      new ju.ArrayList(topics.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def apply[K, V](
+      topics: Iterable[java.lang.String],
+      kafkaParams: collection.Map[String, Object]): Subscribe[K, V] = {
+    Subscribe[K, V](
+      new ju.ArrayList(topics.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def create[K, V](
+      topics: ju.Collection[java.lang.String],
+      kafkaParams: ju.Map[String, Object],
+      offsets: ju.Map[TopicPartition, Long]): Subscribe[K, V] = {
+    Subscribe[K, V](topics, kafkaParams, offsets)
+  }
+
+  /**
+   *  :: Experimental ::
+   * Subscribe to a collection of topics.
+   * @param topics collection of topics to subscribe
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def create[K, V](
+      topics: ju.Collection[java.lang.String],
+      kafkaParams: ju.Map[String, Object]): Subscribe[K, V] = {
+    Subscribe[K, V](topics, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+}
+
+/**
+ * :: Experimental ::
+ * Assign a fixed collection of TopicPartitions
+ * @param topicPartitions collection of TopicPartitions to assign
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+ * configuration parameters</a> to be used on driver. The same params will be 
used on executors,
+ * with minor automatic modifications applied.
+ *  Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+ * TopicPartition, the committed offset (if applicable) or kafka param
+ * auto.offset.reset will be used.
+ */
+@Experimental
+case class Assign[K, V] private(
+    topicPartitions: ju.Collection[TopicPartition],
+    kafkaParams: ju.Map[String, Object],
+    offsets: ju.Map[TopicPartition, Long]
+  ) extends ConsumerStrategy[K, V] {
+
+  def executorKafkaParams: ju.Map[String, Object] = kafkaParams
+
+  def onStart(currentOffsets: Map[TopicPartition, Long]): Consumer[K, V] = {
+    val consumer = new KafkaConsumer[K, V](kafkaParams)
+    consumer.assign(topicPartitions)
+    if (currentOffsets.isEmpty) {
+      offsets.asScala.foreach { case (topicPartition, offset) =>
+          consumer.seek(topicPartition, offset)
+      }
+    }
+
+    consumer
+  }
+}
+
+/**
+ *  :: Experimental ::
+ * Companion object for creating [[Assign]] strategy
+ */
+@Experimental
+object Assign {
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def apply[K, V](
+      topicPartitions: Iterable[TopicPartition],
+      kafkaParams: collection.Map[String, Object],
+      offsets: collection.Map[TopicPartition, Long]): Assign[K, V] = {
+    Assign[K, V](
+      new ju.ArrayList(topicPartitions.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      new ju.HashMap[TopicPartition, Long](offsets.asJava))
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def apply[K, V](
+      topicPartitions: Iterable[TopicPartition],
+      kafkaParams: collection.Map[String, Object]): Assign[K, V] = {
+    Assign[K, V](
+      new ju.ArrayList(topicPartitions.asJavaCollection),
+      new ju.HashMap[String, Object](kafkaParams.asJava),
+      ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsets: offsets to begin at on initial startup.  If no offset is 
given for a
+   * TopicPartition, the committed offset (if applicable) or kafka param
+   * auto.offset.reset will be used.
+   */
+  @Experimental
+  def create[K, V](
+      topicPartitions: ju.Collection[TopicPartition],
+      kafkaParams: ju.Map[String, Object],
+      offsets: ju.Map[TopicPartition, Long]): Assign[K, V] = {
+    Assign[K, V](topicPartitions, kafkaParams, offsets)
+  }
+
+  /**
+   *  :: Experimental ::
+   * Assign a fixed collection of TopicPartitions
+   * @param topicPartitions collection of TopicPartitions to assign
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a> to be used on driver. The same params will 
be used on executors,
+   * with minor automatic modifications applied.
+   *  Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   */
+  @Experimental
+  def create[K, V](
+      topicPartitions: ju.Collection[TopicPartition],
+      kafkaParams: ju.Map[String, Object]): Assign[K, V] = {
+    Assign[K, V](topicPartitions, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, Long]())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000..acd1841
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.{ PartitionInfo, TopicPartition }
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the 
maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+ *   see [[LocationStrategy]] for more details.
+ * @param executorKafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
+ * configuration parameters</a>.
+ *   Requires  "bootstrap.servers" to be set with Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param consumerStrategy In most cases, pass in [[Subscribe]],
+ *   see [[ConsumerStrategy]] for more details
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class DirectKafkaInputDStream[K, V](
+    _ssc: StreamingContext,
+    locationStrategy: LocationStrategy,
+    consumerStrategy: ConsumerStrategy[K, V]
+  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with 
CanCommitOffsets {
+
+  val executorKafkaParams = {
+    val ekp = new ju.HashMap[String, 
Object](consumerStrategy.executorKafkaParams)
+    KafkaUtils.fixKafkaParams(ekp)
+    ekp
+  }
+
+  protected var currentOffsets = Map[TopicPartition, Long]()
+
+  @transient private var kc: Consumer[K, V] = null
+  def consumer(): Consumer[K, V] = this.synchronized {
+    if (null == kc) {
+      kc = consumerStrategy.onStart(currentOffsets)
+    }
+    kc
+  }
+
+  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] 
= {
+    logError("Kafka ConsumerRecord is not serializable. " +
+      "Use .map to extract fields before calling .persist or .window")
+    super.persist(newLevel)
+  }
+
+  protected def getBrokers = {
+    val c = consumer
+    val result = new ju.HashMap[TopicPartition, String]()
+    val hosts = new ju.HashMap[TopicPartition, String]()
+    val assignments = c.assignment().iterator()
+    while (assignments.hasNext()) {
+      val tp: TopicPartition = assignments.next()
+      if (null == hosts.get(tp)) {
+        val infos = c.partitionsFor(tp.topic).iterator()
+        while (infos.hasNext()) {
+          val i = infos.next()
+          hosts.put(new TopicPartition(i.topic(), i.partition()), 
i.leader.host())
+        }
+      }
+      result.put(tp, hosts.get(tp))
+    }
+    result
+  }
+
+  protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
+    locationStrategy match {
+      case PreferBrokers => getBrokers
+      case PreferConsistent => ju.Collections.emptyMap[TopicPartition, 
String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+  }
+
+  // Keep this consistent with how other streams are named (e.g. "Flume 
polling stream [2]")
+  private[streaming] override def name: String = s"Kafka 0.10 direct stream 
[$id]"
+
+  protected[streaming] override val checkpointData =
+    new DirectKafkaInputDStreamCheckpointData
+
+
+  /**
+   * Asynchronously maintains & sends new rate limits to the receiver through 
the receiver tracker.
+   */
+  override protected[streaming] val rateController: Option[RateController] = {
+    if (RateController.isBackPressureEnabled(ssc.conf)) {
+      Some(new DirectKafkaRateController(id,
+        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+    } else {
+      None
+    }
+  }
+
+  private val maxRateLimitPerPartition: Int = 
context.sparkContext.getConf.getInt(
+    "spark.streaming.kafka.maxRatePerPartition", 0)
+
+  protected[streaming] def maxMessagesPerPartition(
+    offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
+    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+    // calculate a per-partition rate limit based on current lag
+    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
+      case Some(rate) =>
+        val lagPerPartition = offsets.map { case (tp, offset) =>
+          tp -> Math.max(offset - currentOffsets(tp), 0)
+        }
+        val totalLag = lagPerPartition.values.sum
+
+        lagPerPartition.map { case (tp, lag) =>
+          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          tp -> (if (maxRateLimitPerPartition > 0) {
+            Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+        }
+      case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+    }
+
+    if (effectiveRateLimitPerPartition.values.sum > 0) {
+      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000
+      Some(effectiveRateLimitPerPartition.map {
+        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+      })
+    } else {
+      None
+    }
+  }
+
+  /**
+   * Returns the latest (highest) available offsets, taking new partitions 
into account.
+   */
+  protected def latestOffsets(): Map[TopicPartition, Long] = {
+    val c = consumer
+    c.poll(0)
+    val parts = c.assignment().asScala
+
+    // make sure new partitions are reflected in currentOffsets
+    val newPartitions = parts.diff(currentOffsets.keySet)
+    // position for new partitions determined by auto.offset.reset if no commit
+    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
c.position(tp)).toMap
+    // don't want to consume messages, so pause
+    c.pause(newPartitions.asJava)
+    // find latest available offsets
+    c.seekToEnd(currentOffsets.keySet.asJava)
+    parts.map(tp => tp -> c.position(tp)).toMap
+  }
+
+  // limits the maximum number of messages per partition
+  protected def clamp(
+    offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+
+    maxMessagesPerPartition(offsets).map { mmp =>
+      mmp.map { case (tp, messages) =>
+          val uo = offsets(tp)
+          tp -> Math.min(currentOffsets(tp) + messages, uo)
+      }
+    }.getOrElse(offsets)
+  }
+
+  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
+    val untilOffsets = clamp(latestOffsets())
+    val offsetRanges = untilOffsets.map { case (tp, uo) =>
+      val fo = currentOffsets(tp)
+      OffsetRange(tp.topic, tp.partition, fo, uo)
+    }
+    val rdd = new KafkaRDD[K, V](
+      context.sparkContext, executorKafkaParams, offsetRanges.toArray, 
getPreferredHosts, true)
+
+    // Report the record number and metadata of this batch interval to 
InputInfoTracker.
+    val description = offsetRanges.filter { offsetRange =>
+      // Don't display empty ranges.
+      offsetRange.fromOffset != offsetRange.untilOffset
+    }.map { offsetRange =>
+      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+    }.mkString("\n")
+    // Copy offsetRanges to immutable.List to prevent from being modified by 
the user
+    val metadata = Map(
+      "offsets" -> offsetRanges.toList,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+    currentOffsets = untilOffsets
+    commitAll()
+    Some(rdd)
+  }
+
+  override def start(): Unit = {
+    val c = consumer
+    c.poll(0)
+    if (currentOffsets.isEmpty) {
+      currentOffsets = c.assignment().asScala.map { tp =>
+        tp -> c.position(tp)
+      }.toMap
+    }
+
+    // don't actually want to consume any messages, so pause all partitions
+    c.pause(currentOffsets.keySet.asJava)
+  }
+
+  override def stop(): Unit = this.synchronized {
+    if (kc != null) {
+      kc.close()
+    }
+  }
+
+  protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
+  protected val commitCallback = new AtomicReference[OffsetCommitCallback]
+
+  /**
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
+   */
+  def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
+    commitAsync(offsetRanges, null)
+  }
+
+  /**
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
+   * @param callback Only the most recently provided callback will be used at 
commit.
+   */
+  def commitAsync(offsetRanges: Array[OffsetRange], callback: 
OffsetCommitCallback): Unit = {
+    commitCallback.set(callback)
+    commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
+  }
+
+  protected def commitAll(): Unit = {
+    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
+    val it = commitQueue.iterator()
+    while (it.hasNext) {
+      val osr = it.next
+      val tp = osr.topicPartition
+      val x = m.get(tp)
+      val offset = if (null == x) { osr.untilOffset } else { 
Math.max(x.offset, osr.untilOffset) }
+      m.put(tp, new OffsetAndMetadata(offset))
+    }
+    if (!m.isEmpty) {
+      consumer.commitAsync(m, commitCallback.get)
+    }
+  }
+
+  private[streaming]
+  class DirectKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) {
+    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] 
= {
+      data.asInstanceOf[mutable.HashMap[Time, 
Array[OffsetRange.OffsetRangeTuple]]]
+    }
+
+    override def update(time: Time): Unit = {
+      batchForTime.clear()
+      generatedRDDs.foreach { kv =>
+        val a = kv._2.asInstanceOf[KafkaRDD[K, 
V]].offsetRanges.map(_.toTuple).toArray
+        batchForTime += kv._1 -> a
+      }
+    }
+
+    override def cleanup(time: Time): Unit = { }
+
+    override def restore(): Unit = {
+      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+         generatedRDDs += t -> new KafkaRDD[K, V](
+           context.sparkContext,
+           executorKafkaParams,
+           b.map(OffsetRange(_)),
+           getPreferredHosts,
+           // during restore, it's possible same partition will be consumed 
from multiple
+           // threads, so dont use cache
+           false
+         )
+      }
+    }
+  }
+
+  /**
+   * A RateController to retrieve the rate from RateEstimator.
+   */
+  private[streaming] class DirectKafkaRateController(id: Int, estimator: 
RateEstimator)
+    extends RateController(id, estimator) {
+    override def publish(rate: Long): Unit = ()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
new file mode 100644
index 0000000..c15c163
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka
+ * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+ * configuration parameters</a>. Requires "bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
+ * @param preferredHosts map from TopicPartition to preferred host for 
processing that partition.
+ * In most cases, use [[DirectKafkaInputDStream.preferConsistent]]
+ * Use [[DirectKafkaInputDStream.preferBrokers]] if your executors are on same 
nodes as brokers.
+ * @param useConsumerCache whether to use a consumer from a per-jvm cache
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+private[spark] class KafkaRDD[K, V](
+    sc: SparkContext,
+    val kafkaParams: ju.Map[String, Object],
+    val offsetRanges: Array[OffsetRange],
+    val preferredHosts: ju.Map[TopicPartition, String],
+    useConsumerCache: Boolean
+) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges 
{
+
+  assert("none" ==
+    
kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
+    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
+      " must be set to none for executor kafka params, else messages may not 
match offsetRange")
+
+  assert(false ==
+    
kafkaParams.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).asInstanceOf[Boolean],
+    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG +
+      " must be set to false for executor kafka params, else offsets may 
commit before processing")
+
+  // TODO is it necessary to have separate configs for initial poll time vs 
ongoing poll time?
+  private val pollTimeout = 
conf.getLong("spark.streaming.kafka.consumer.poll.ms", 256)
+  private val cacheInitialCapacity =
+    conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
+  private val cacheMaxCapacity =
+    conf.getInt("spark.streaming.kafka.consumer.cache.maxCapacity", 64)
+  private val cacheLoadFactor =
+    conf.getDouble("spark.streaming.kafka.consumer.cache.loadFactor", 
0.75).toFloat
+
+  override def persist(newLevel: StorageLevel): this.type = {
+    logError("Kafka ConsumerRecord is not serializable. " +
+      "Use .map to extract fields before calling .persist or .window")
+    super.persist(newLevel)
+  }
+
+  override def getPartitions: Array[Partition] = {
+    offsetRanges.zipWithIndex.map { case (o, i) =>
+        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset)
+    }.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(
+      timeout: Long,
+      confidence: Double = 0.95
+  ): PartialResult[BoundedDouble] = {
+    val c = count
+    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+  }
+
+  override def isEmpty(): Boolean = count == 0L
+
+  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
+    val nonEmptyPartitions = this.partitions
+      .map(_.asInstanceOf[KafkaRDDPartition])
+      .filter(_.count > 0)
+
+    if (num < 1 || nonEmptyPartitions.isEmpty) {
+      return new Array[ConsumerRecord[K, V]](0)
+    }
+
+    // Determine in advance how many messages need to be taken from each 
partition
+    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
+      val remain = num - result.values.sum
+      if (remain > 0) {
+        val taken = Math.min(remain, part.count)
+        result + (part.index -> taken.toInt)
+      } else {
+        result
+      }
+    }
+
+    val buf = new ArrayBuffer[ConsumerRecord[K, V]]
+    val res = context.runJob(
+      this,
+      (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+      it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+    )
+    res.foreach(buf ++= _)
+    buf.toArray
+  }
+
+  private def executors(): Array[ExecutorCacheTaskLocation] = {
+    val bm = sparkContext.env.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compareExecutors)
+  }
+
+  protected[kafka010] def compareExecutors(
+      a: ExecutorCacheTaskLocation,
+      b: ExecutorCacheTaskLocation): Boolean =
+    if (a.host == b.host) {
+      a.executorId > b.executorId
+    } else {
+      a.host > b.host
+    }
+
+  /**
+   * Non-negative modulus, from java 8 math
+   */
+  private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+    // The intention is best-effort consistent executor for a given 
topicpartition,
+    // so that caching consumers can be effective.
+    // TODO what about hosts specified by ip vs name
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    val allExecs = executors()
+    val tp = part.topicPartition
+    val prefHost = preferredHosts.get(tp)
+    val prefExecs = if (null == prefHost) allExecs else allExecs.filter(_.host 
== prefHost)
+    val execs = if (prefExecs.isEmpty) allExecs else prefExecs
+    if (execs.isEmpty) {
+      Seq()
+    } else {
+      // execs is sorted, tp.hashCode depends only on topic and partition, so 
consistent index
+      val index = this.floorMod(tp.hashCode, execs.length)
+      val chosen = execs(index)
+      Seq(chosen.toString)
+    }
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+    s"Beginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset} " +
+      s"for topic ${part.topic} partition ${part.partition}. " +
+      "You either provided an invalid fromOffset, or the Kafka topic has been 
damaged"
+
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[ConsumerRecord[K, V]] = {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+    if (part.fromOffset == part.untilOffset) {
+      logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
+        s"skipping ${part.topic} ${part.partition}")
+      Iterator.empty
+    } else {
+      new KafkaRDDIterator(part, context)
+    }
+  }
+
+  /**
+   * An iterator that fetches messages directly from Kafka for the offsets in 
partition.
+   * Uses a cached consumer where possible to take advantage of prefetching
+   */
+  private class KafkaRDDIterator(
+      part: KafkaRDDPartition,
+      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
+
+    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
+      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+    context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+    val consumer = if (useConsumerCache) {
+      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
+      if (context.attemptNumber > 1) {
+        // just in case the prior attempt failures were cache related
+        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
+      }
+      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
+    } else {
+      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+    }
+
+    var requestOffset = part.fromOffset
+
+    def closeIfNeeded(): Unit = {
+      if (!useConsumerCache && consumer != null) {
+        consumer.close
+      }
+    }
+
+    override def hasNext(): Boolean = requestOffset < part.untilOffset
+
+    override def next(): ConsumerRecord[K, V] = {
+      assert(hasNext(), "Can't call getNext() once untilOffset has been 
reached")
+      val r = consumer.get(requestOffset, pollTimeout)
+      requestOffset += 1
+      r
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
new file mode 100644
index 0000000..95569b1
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDDPartition.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.Partition
+
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ */
+private[kafka010]
+class KafkaRDDPartition(
+  val index: Int,
+  val topic: String,
+  val partition: Int,
+  val fromOffset: Long,
+  val untilOffset: Long
+) extends Partition {
+  /** Number of messages this partition refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  /** Kafka TopicPartition object, for convenience */
+  def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
new file mode 100644
index 0000000..13c0843
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.ZkUtils
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to 
set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
+ */
+private[kafka010] class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkUtils: ZkUtils = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 9092
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: Producer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkUtils = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
+    Option(zkUtils).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      (server, port)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
+
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p =>
+      waitUntilMetadataIsPropagated(topic, p)
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String): Unit = {
+    createTopic(topic, 1)
+  }
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Unit = {
+    producer = new Producer[String, String](new 
ProducerConfig(producerConfiguration))
+    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) 
}: _*)
+    producer.close()
+    producer = null
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerAddress)
+    props.put("serializer.class", classOf[StringEncoder].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("request.required.acks", "-1")
+    props
+  }
+
+  // A simplified version of scalatest eventually, rewritten here to avoid 
adding extra test
+  // dependency
+  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+    def makeAttempt(): Either[Throwable, T] = {
+      try {
+        Right(func)
+      } catch {
+        case e if NonFatal(e) => Left(e)
+      }
+    }
+
+    val startTime = System.currentTimeMillis()
+    @tailrec
+    def tryAgain(attempt: Int): T = {
+      makeAttempt() match {
+        case Right(result) => result
+        case Left(e) =>
+          val duration = System.currentTimeMillis() - startTime
+          if (duration < timeout.milliseconds) {
+            Thread.sleep(interval.milliseconds)
+          } else {
+            throw new TimeoutException(e.getMessage)
+          }
+
+          tryAgain(attempt + 1)
+      }
+    }
+
+    tryAgain(1)
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        zkUtils.getLeaderForPartition(topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(Time(10000), Time(100)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      Utils.deleteRecursively(snapshotDir)
+      Utils.deleteRecursively(logDir)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
new file mode 100644
index 0000000..c052499
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
+import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{ JavaInputDStream, 
JavaStreamingContext }
+import org.apache.spark.streaming.dstream._
+
+/**
+ * :: Experimental ::
+ * Companion object for constructing Kafka streams and RDDs
+ */
+@Experimental
+object KafkaUtils extends Logging {
+  /**
+   * :: Experimental ::
+   * Scala constructor for a batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a>. Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createRDD[K, V](
+      sc: SparkContext,
+      kafkaParams: ju.Map[String, Object],
+      offsetRanges: Array[OffsetRange],
+      locationStrategy: LocationStrategy
+    ): RDD[ConsumerRecord[K, V]] = {
+    val preferredHosts = locationStrategy match {
+      case PreferBrokers =>
+        throw new AssertionError(
+          "If you want to prefer brokers, you must provide a mapping using 
PreferFixed " +
+          "A single KafkaRDD does not have a driver consumer and cannot look 
up brokers for you.")
+      case PreferConsistent => ju.Collections.emptyMap[TopicPartition, 
String]()
+      case PreferFixed(hostMap) => hostMap
+    }
+    val kp = new ju.HashMap[String, Object](kafkaParams)
+    fixKafkaParams(kp)
+    val osr = offsetRanges.clone()
+
+    new KafkaRDD[K, V](sc, kp, osr, preferredHosts, true)
+  }
+
+  /**
+   * :: Experimental ::
+   * Java constructor for a batch-oriented interface for consuming from Kafka.
+   * Starting and ending offsets are specified in advance,
+   * so that you can control exactly-once semantics.
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param kafkaParams Kafka
+   * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs";>
+   * configuration parameters</a>. Requires "bootstrap.servers" to be set
+   * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+   * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createRDD[K, V](
+      jsc: JavaSparkContext,
+      kafkaParams: ju.Map[String, Object],
+      offsetRanges: Array[OffsetRange],
+      locationStrategy: LocationStrategy
+    ): JavaRDD[ConsumerRecord[K, V]] = {
+
+    new JavaRDD(createRDD[K, V](jsc.sc, kafkaParams, offsetRanges, 
locationStrategy))
+  }
+
+  /**
+   * :: Experimental ::
+   * Scala constructor for a DStream where
+   * each given Kafka topic/partition corresponds to an RDD partition.
+   * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+   *  of messages
+   * per second that each '''partition''' will accept.
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @param consumerStrategy In most cases, pass in [[Subscribe]],
+   *   see [[ConsumerStrategy]] for more details
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createDirectStream[K, V](
+      ssc: StreamingContext,
+      locationStrategy: LocationStrategy,
+      consumerStrategy: ConsumerStrategy[K, V]
+    ): InputDStream[ConsumerRecord[K, V]] = {
+    new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
+  }
+
+  /**
+   * :: Experimental ::
+   * Java constructor for a DStream where
+   * each given Kafka topic/partition corresponds to an RDD partition.
+   * @param keyClass Class of the keys in the Kafka records
+   * @param valueClass Class of the values in the Kafka records
+   * @param locationStrategy In most cases, pass in [[PreferConsistent]],
+   *   see [[LocationStrategy]] for more details.
+   * @param consumerStrategy In most cases, pass in [[Subscribe]],
+   *   see [[ConsumerStrategy]] for more details
+   * @tparam K type of Kafka message key
+   * @tparam V type of Kafka message value
+   */
+  @Experimental
+  def createDirectStream[K, V](
+      jssc: JavaStreamingContext,
+      locationStrategy: LocationStrategy,
+      consumerStrategy: ConsumerStrategy[K, V]
+    ): JavaInputDStream[ConsumerRecord[K, V]] = {
+    new JavaInputDStream(
+      createDirectStream[K, V](
+        jssc.ssc, locationStrategy, consumerStrategy))
+  }
+
+  /**
+   * Tweak kafka params to prevent issues on executors
+   */
+  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, 
Object]): Unit = {
+    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to 
false for executor")
+    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: 
java.lang.Boolean)
+
+    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none 
for executor")
+    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+
+    // driver and executor should be in different consumer groups
+    val groupId = "spark-executor-" + 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
+    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to 
${groupId}")
+    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+
+    // possible workaround for KAFKA-3135
+    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
+    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
+      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 
see KAFKA-3135")
+      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
new file mode 100644
index 0000000..df62030
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import java.{ util => ju }
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+
+/**
+ *  :: Experimental ::
+ * Choice of how to schedule consumers for a given TopicPartition on an 
executor.
+ * Kafka 0.10 consumers prefetch messages, so it's important for performance
+ * to keep cached consumers on appropriate executors, not recreate them for 
every partition.
+ * Choice of location is only a preference, not an absolute; partitions may be 
scheduled elsewhere.
+ */
+@Experimental
+sealed trait LocationStrategy
+
+/**
+ *  :: Experimental ::
+ * Use this only if your executors are on the same nodes as your Kafka brokers.
+ */
+@Experimental
+case object PreferBrokers extends LocationStrategy {
+  def create: PreferBrokers.type = this
+}
+
+/**
+ *  :: Experimental ::
+ * Use this in most cases, it will consistently distribute partitions across 
all executors.
+ */
+@Experimental
+case object PreferConsistent extends LocationStrategy {
+  def create: PreferConsistent.type = this
+}
+
+/**
+ *  :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+case class PreferFixed private(hostMap: ju.Map[TopicPartition, String]) 
extends LocationStrategy
+
+/**
+ *  :: Experimental ::
+ * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
+ * Any TopicPartition not specified in the map will use a consistent location.
+ */
+@Experimental
+object PreferFixed {
+  def apply(hostMap: collection.Map[TopicPartition, String]): PreferFixed = {
+    PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
+  }
+  def create(hostMap: ju.Map[TopicPartition, String]): PreferFixed =
+    PreferFixed(hostMap)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
new file mode 100644
index 0000000..c66d3c9
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.clients.consumer.OffsetCommitCallback
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This can 
be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *      ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ *  :: Experimental ::
+ * Represents any object that can commit a collection of [[OffsetRange]]s.
+ * The direct Kafka DStream implements this interface (see
+ * [[KafkaUtils.createDirectStream]]).
+ * {{{
+ *   val stream = KafkaUtils.createDirectStream(...)
+ *     ...
+ *   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new 
OffsetCommitCallback() {
+ *     def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: 
Exception) {
+ *        if (null != e) {
+ *           // error
+ *        } else {
+ *         // success
+ *       }
+ *     }
+ *   })
+ * }}}
+ */
+@Experimental
+trait CanCommitOffsets {
+  /**
+   *  :: Experimental ::
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * This is only needed if you intend to store offsets in Kafka, instead of 
your own store.
+   * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
+   */
+  @Experimental
+  def commitAsync(offsetRanges: Array[OffsetRange]): Unit
+
+  /**
+   *  :: Experimental ::
+   * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
+   * This is only needed if you intend to store offsets in Kafka, instead of 
your own store.
+   * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
+   * @param callback Only the most recently provided callback will be used at 
commit.
+   */
+  @Experimental
+  def commitAsync(offsetRanges: Array[OffsetRange], callback: 
OffsetCommitCallback): Unit
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicPartition. Instances 
of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ */
+final class OffsetRange private(
+    val topic: String,
+    val partition: Int,
+    val fromOffset: Long,
+    val untilOffset: Long) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  /** Kafka TopicPartition object, for convenience */
+  def topicPartition(): TopicPartition = new TopicPartition(topic, partition)
+
+  /** Number of messages this OffsetRange refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  override def equals(obj: Any): Boolean = obj match {
+    case that: OffsetRange =>
+      this.topic == that.topic &&
+        this.partition == that.partition &&
+        this.fromOffset == that.fromOffset &&
+        this.untilOffset == that.untilOffset
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    toTuple.hashCode()
+  }
+
+  override def toString(): String = {
+    s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset 
-> $untilOffset])"
+  }
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
+}
+
+/**
+ * Companion object the provides methods to create instances of 
[[OffsetRange]].
+ */
+object OffsetRange {
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def create(
+      topicPartition: TopicPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicPartition.topic, topicPartition.partition, 
fromOffset, untilOffset)
+
+  def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
+    new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def apply(
+      topicPartition: TopicPartition,
+      fromOffset: Long,
+      untilOffset: Long): OffsetRange =
+    new OffsetRange(topicPartition.topic, topicPartition.partition, 
fromOffset, untilOffset)
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[kafka010]
+  type OffsetRangeTuple = (String, Int, Long, Long)
+
+  private[kafka010]
+  def apply(t: OffsetRangeTuple) =
+    new OffsetRange(t._1, t._2, t._3, t._4)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
new file mode 100644
index 0000000..ebfcf87
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package org.apache.spark.streaming.kafka010;

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
new file mode 100644
index 0000000..2bfc1e8
--- /dev/null
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+/**
+ * Spark Integration for Kafka 0.10
+ */
+package object kafka


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to