[SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact

## What changes were proposed in this pull request?
Renaming the streaming-kafka artifact to include kafka version, in anticipation 
of needing a different artifact for later kafka versions

## How was this patch tested?
Unit tests

Author: cody koeninger <[email protected]>

Closes #12946 from koeninger/SPARK-15085.

(cherry picked from commit 89e67d6667d5f8be9c6fb6c120fbcd350ae2950d)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56e1e2f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56e1e2f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56e1e2f1

Branch: refs/heads/branch-2.0
Commit: 56e1e2f1706c857f72f519e51c51e39e30638eb6
Parents: e3703c4
Author: cody koeninger <[email protected]>
Authored: Wed May 11 12:15:41 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed May 11 12:15:48 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |   4 +-
 dev/audit-release/audit_release.py              |   2 +-
 dev/run-tests.py                                |   2 +-
 dev/sparktestsupport/modules.py                 |   8 +-
 docs/streaming-kafka-integration.md             |  14 +-
 docs/streaming-programming-guide.md             |   4 +-
 examples/pom.xml                                |   2 +-
 external/kafka-0-8-assembly/pom.xml             | 176 ++++
 external/kafka-0-8/pom.xml                      |  98 +++
 .../apache/spark/streaming/kafka/Broker.scala   |  66 ++
 .../kafka/DirectKafkaInputDStream.scala         | 227 ++++++
 .../spark/streaming/kafka/KafkaCluster.scala    | 425 ++++++++++
 .../streaming/kafka/KafkaInputDStream.scala     | 142 ++++
 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 +++++++
 .../streaming/kafka/KafkaRDDPartition.scala     |  42 +
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 275 +++++++
 .../spark/streaming/kafka/KafkaUtils.scala      | 805 +++++++++++++++++++
 .../spark/streaming/kafka/OffsetRange.scala     | 109 +++
 .../streaming/kafka/ReliableKafkaReceiver.scala | 302 +++++++
 .../spark/streaming/kafka/package-info.java     |  21 +
 .../apache/spark/streaming/kafka/package.scala  |  23 +
 .../kafka/JavaDirectKafkaStreamSuite.java       | 175 ++++
 .../streaming/kafka/JavaKafkaRDDSuite.java      | 156 ++++
 .../streaming/kafka/JavaKafkaStreamSuite.java   | 135 ++++
 .../src/test/resources/log4j.properties         |  28 +
 .../kafka/DirectKafkaStreamSuite.scala          | 531 ++++++++++++
 .../streaming/kafka/KafkaClusterSuite.scala     |  81 ++
 .../spark/streaming/kafka/KafkaRDDSuite.scala   | 175 ++++
 .../streaming/kafka/KafkaStreamSuite.scala      |  84 ++
 .../kafka/ReliableKafkaStreamSuite.scala        | 148 ++++
 external/kafka-assembly/pom.xml                 | 176 ----
 external/kafka/pom.xml                          |  98 ---
 .../apache/spark/streaming/kafka/Broker.scala   |  66 --
 .../kafka/DirectKafkaInputDStream.scala         | 227 ------
 .../spark/streaming/kafka/KafkaCluster.scala    | 425 ----------
 .../streaming/kafka/KafkaInputDStream.scala     | 142 ----
 .../apache/spark/streaming/kafka/KafkaRDD.scala | 269 -------
 .../streaming/kafka/KafkaRDDPartition.scala     |  42 -
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 275 -------
 .../spark/streaming/kafka/KafkaUtils.scala      | 805 -------------------
 .../spark/streaming/kafka/OffsetRange.scala     | 109 ---
 .../streaming/kafka/ReliableKafkaReceiver.scala | 302 -------
 .../spark/streaming/kafka/package-info.java     |  21 -
 .../apache/spark/streaming/kafka/package.scala  |  23 -
 .../kafka/JavaDirectKafkaStreamSuite.java       | 175 ----
 .../streaming/kafka/JavaKafkaRDDSuite.java      | 156 ----
 .../streaming/kafka/JavaKafkaStreamSuite.java   | 135 ----
 .../kafka/src/test/resources/log4j.properties   |  28 -
 .../kafka/DirectKafkaStreamSuite.scala          | 531 ------------
 .../streaming/kafka/KafkaClusterSuite.scala     |  81 --
 .../spark/streaming/kafka/KafkaRDDSuite.scala   | 175 ----
 .../streaming/kafka/KafkaStreamSuite.scala      |  84 --
 .../kafka/ReliableKafkaStreamSuite.scala        | 148 ----
 pom.xml                                         |   4 +-
 project/MimaBuild.scala                         |  10 +-
 project/SparkBuild.scala                        |   8 +-
 python/pyspark/streaming/kafka.py               |   6 +-
 python/pyspark/streaming/tests.py               |   6 +-
 58 files changed, 4532 insertions(+), 4524 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 9075e3e..78606e0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -955,9 +955,9 @@ private[spark] object SparkSubmitUtils {
     // Add scala exclusion rule
     md.addExcludeRule(createExclusion("*:scala-library:*", ivySettings, 
ivyConfName))
 
-    // We need to specify each component explicitly, otherwise we miss 
spark-streaming-kafka and
+    // We need to specify each component explicitly, otherwise we miss 
spark-streaming-kafka-0-8 and
     // other spark-streaming utility components. Underscore is there to 
differentiate between
-    // spark-streaming_2.1x and spark-streaming-kafka-assembly_2.1x
+    // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x
     val components = Seq("catalyst_", "core_", "graphx_", "hive_", "mllib_", 
"repl_",
       "sql_", "streaming_", "yarn_", "network-common_", "network-shuffle_", 
"network-yarn_")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/dev/audit-release/audit_release.py
----------------------------------------------------------------------
diff --git a/dev/audit-release/audit_release.py 
b/dev/audit-release/audit_release.py
index ee72da4..b28e7a4 100755
--- a/dev/audit-release/audit_release.py
+++ b/dev/audit-release/audit_release.py
@@ -116,7 +116,7 @@ original_dir = os.getcwd()
 # dependencies within those projects.
 modules = [
     "spark-core", "spark-mllib", "spark-streaming", "spark-repl",
-    "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka",
+    "spark-graphx", "spark-streaming-flume", "spark-streaming-kafka-0-8",
     "spark-catalyst", "spark-sql", "spark-hive", "spark-streaming-kinesis-asl"
 ]
 modules = map(lambda m: "%s_%s" % (m, SCALA_BINARY_VERSION), modules)

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 291f821..7b32697 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -336,7 +336,7 @@ def build_spark_sbt(hadoop_version):
     # Enable all of the profiles for the build:
     build_profiles = get_hadoop_profiles(hadoop_version) + 
modules.root.build_profile_flags
     sbt_goals = ["package",
-                 "streaming-kafka-assembly/assembly",
+                 "streaming-kafka-0-8-assembly/assembly",
                  "streaming-flume-assembly/assembly",
                  "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 5640928..f81db8e 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -223,14 +223,14 @@ streaming_kinesis_asl = Module(
 
 
 streaming_kafka = Module(
-    name="streaming-kafka",
+    name="streaming-kafka-0-8",
     dependencies=[streaming],
     source_file_regexes=[
-        "external/kafka",
-        "external/kafka-assembly",
+        "external/kafka-0-8",
+        "external/kafka-0-8-assembly",
     ],
     sbt_test_goals=[
-        "streaming-kafka/test",
+        "streaming-kafka-0-8/test",
     ]
 )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 015a2f1..0f1e322 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -14,7 +14,7 @@ Next, we discuss how to use this approach in your streaming 
application.
 1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
 
                groupId = org.apache.spark
-               artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
                version = {{site.SPARK_VERSION_SHORT}}
 
        For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
@@ -64,14 +64,14 @@ Next, we discuss how to use this approach in your streaming 
application.
 
 3. **Deploying:** As with any Spark applications, `spark-submit` is used to 
launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
 
-       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into 
the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide). 
+       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` 
and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
 
-       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can 
be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is, 
+       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
can be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is,
 
-           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
 
-       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-assembly` from the 
-       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
+       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-0-8-assembly` from the
+       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
 This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
@@ -91,7 +91,7 @@ Next, we discuss how to use this approach in your streaming 
application.
 1. **Linking:** This approach is supported only in Scala/Java application. 
Link your SBT/Maven project with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
 
                groupId = org.apache.spark
-               artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
                version = {{site.SPARK_VERSION_SHORT}}
 
 2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 7f6c0ed..9ca9b18 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -416,7 +416,7 @@ some of the common ones are as follows.
 
 <table class="table">
 <tr><th>Source</th><th>Artifact</th></tr>
-<tr><td> Kafka </td><td> spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} 
</td></tr>
+<tr><td> Kafka </td><td> 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}} </td></tr>
 <tr><td> Flume </td><td> spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} 
</td></tr>
 <tr><td> 
Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} 
[Amazon Software License] </td></tr>
 <tr><td></td><td></td></tr>
@@ -1892,7 +1892,7 @@ To run a Spark Streaming applications, you need to have 
the following.
   if your application uses [advanced sources](#advanced-sources) (e.g. Kafka, 
Flume),
   then you will have to package the extra artifact they link to, along with 
their dependencies,
   in the JAR that is used to deploy the application. For example, an 
application using `KafkaUtils`
-  will have to include `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` 
and all its
+  will have to include 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and all its
   transitive dependencies in the application JAR.
 
 - *Configuring sufficient memory for the executors* - Since the received data 
must be stored in

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 4423d0f..771da5b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -87,7 +87,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+      
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
       <scope>provided</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-8-assembly/pom.xml 
b/external/kafka-0-8-assembly/pom.xml
new file mode 100644
index 0000000..3cc288a
--- /dev/null
+++ b/external/kafka-0-8-assembly/pom.xml
@@ -0,0 +1,176 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Project External Kafka Assembly</name>
+  <url>http://spark.apache.org/</url>
+
+  <properties>
+    <sbt.project.name>streaming-kafka-0-8-assembly</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <!--
+      Demote already included in the Spark assembly.
+    -->
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.jpountz.lz4</groupId>
+      <artifactId>lz4</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-mapred</artifactId>
+      <classifier>${avro.mapred.classifier}</classifier>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.java.dev.jets3t</groupId>
+      <artifactId>jets3t</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+  
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+  
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-shade-plugin</artifactId>
+      <configuration>
+        <shadedArtifactAttached>false</shadedArtifactAttached>
+        <artifactSet>
+          <includes>
+            <include>*:*</include>
+          </includes>
+        </artifactSet>
+        <filters>
+          <filter>
+            <artifact>*:*</artifact>
+            <excludes>
+              <exclude>META-INF/*.SF</exclude>
+              <exclude>META-INF/*.DSA</exclude>
+              <exclude>META-INF/*.RSA</exclude>
+            </excludes>
+          </filter>
+        </filters>
+      </configuration>
+      <executions>
+        <execution>
+          <phase>package</phase>
+          <goals>
+            <goal>shade</goal>
+          </goals>
+          <configuration>
+            <transformers>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                <resource>reference.conf</resource>
+              </transformer>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                <resource>log4j.properties</resource>
+              </transformer>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
+              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+            </transformers>
+          </configuration>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+</build>
+</project>
+

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
new file mode 100644
index 0000000..cccfda3
--- /dev/null
+++ b/external/kafka-0-8/pom.xml
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
+  <properties>
+    <sbt.project.name>streaming-kafka-0-8</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Integration for Kafka 0.8</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_${scala.binary.version}</artifactId>
+      <version>0.8.2.1</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>net.sf.jopt-simple</groupId>
+      <artifactId>jopt-simple</artifactId>
+      <version>3.2</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
new file mode 100644
index 0000000..9159051
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * Represents the host and port info for a Kafka broker.
+ * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID.
+ */
+final class Broker private(
+    /** Broker's hostname */
+    val host: String,
+    /** Broker's port */
+    val port: Int) extends Serializable {
+  override def equals(obj: Any): Boolean = obj match {
+    case that: Broker =>
+      this.host == that.host &&
+      this.port == that.port
+    case _ => false
+  }
+
+  override def hashCode: Int = {
+    41 * (41 + host.hashCode) + port
+  }
+
+  override def toString(): String = {
+    s"Broker($host, $port)"
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Companion object that provides methods to create instances of [[Broker]].
+ */
+@Experimental
+object Broker {
+  def create(host: String, port: Int): Broker =
+    new Broker(host, port)
+
+  def apply(host: String, port: Int): Broker =
+    new Broker(host, port)
+
+  def unapply(broker: Broker): Option[(String, Int)] = {
+    if (broker == null) {
+      None
+    } else {
+      Some((broker.host, broker.port))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
new file mode 100644
index 0000000..fb58ed7
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the 
maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+ * configuration parameters</a>.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+ *  starting point of the stream
+ * @param messageHandler function for translating each message into the 
desired type
+ */
+private[streaming]
+class DirectKafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[K]: ClassTag,
+  T <: Decoder[V]: ClassTag,
+  R: ClassTag](
+    _ssc: StreamingContext,
+    val kafkaParams: Map[String, String],
+    val fromOffsets: Map[TopicAndPartition, Long],
+    messageHandler: MessageAndMetadata[K, V] => R
+  ) extends InputDStream[R](_ssc) with Logging {
+  val maxRetries = context.sparkContext.getConf.getInt(
+    "spark.streaming.kafka.maxRetries", 1)
+
+  // Keep this consistent with how other streams are named (e.g. "Flume 
polling stream [2]")
+  private[streaming] override def name: String = s"Kafka direct stream [$id]"
+
+  protected[streaming] override val checkpointData =
+    new DirectKafkaInputDStreamCheckpointData
+
+
+  /**
+   * Asynchronously maintains & sends new rate limits to the receiver through 
the receiver tracker.
+   */
+  override protected[streaming] val rateController: Option[RateController] = {
+    if (RateController.isBackPressureEnabled(ssc.conf)) {
+      Some(new DirectKafkaRateController(id,
+        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
+    } else {
+      None
+    }
+  }
+
+  protected val kc = new KafkaCluster(kafkaParams)
+
+  private val maxRateLimitPerPartition: Int = 
context.sparkContext.getConf.getInt(
+      "spark.streaming.kafka.maxRatePerPartition", 0)
+
+  protected[streaming] def maxMessagesPerPartition(
+      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, 
Long]] = {
+    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+
+    // calculate a per-partition rate limit based on current lag
+    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
+      case Some(rate) =>
+        val lagPerPartition = offsets.map { case (tp, offset) =>
+          tp -> Math.max(offset - currentOffsets(tp), 0)
+        }
+        val totalLag = lagPerPartition.values.sum
+
+        lagPerPartition.map { case (tp, lag) =>
+          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+          tp -> (if (maxRateLimitPerPartition > 0) {
+            Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
+        }
+      case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition }
+    }
+
+    if (effectiveRateLimitPerPartition.values.sum > 0) {
+      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000
+      Some(effectiveRateLimitPerPartition.map {
+        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+      })
+    } else {
+      None
+    }
+  }
+
+  protected var currentOffsets = fromOffsets
+
+  @tailrec
+  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
+    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
+    // Either.fold would confuse @tailrec, do it manually
+    if (o.isLeft) {
+      val err = o.left.get.toString
+      if (retries <= 0) {
+        throw new SparkException(err)
+      } else {
+        log.error(err)
+        Thread.sleep(kc.config.refreshLeaderBackoffMs)
+        latestLeaderOffsets(retries - 1)
+      }
+    } else {
+      o.right.get
+    }
+  }
+
+  // limits the maximum number of messages per partition
+  protected def clamp(
+    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): 
Map[TopicAndPartition, LeaderOffset] = {
+    val offsets = leaderOffsets.mapValues(lo => lo.offset)
+
+    maxMessagesPerPartition(offsets).map { mmp =>
+      mmp.map { case (tp, messages) =>
+        val lo = leaderOffsets(tp)
+        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, 
lo.offset))
+      }
+    }.getOrElse(leaderOffsets)
+  }
+
+  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
+    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
+    val rdd = KafkaRDD[K, V, U, T, R](
+      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, 
messageHandler)
+
+    // Report the record number and metadata of this batch interval to 
InputInfoTracker.
+    val offsetRanges = currentOffsets.map { case (tp, fo) =>
+      val uo = untilOffsets(tp)
+      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+    }
+    val description = offsetRanges.filter { offsetRange =>
+      // Don't display empty ranges.
+      offsetRange.fromOffset != offsetRange.untilOffset
+    }.map { offsetRange =>
+      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
+        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+    }.mkString("\n")
+    // Copy offsetRanges to immutable.List to prevent from being modified by 
the user
+    val metadata = Map(
+      "offsets" -> offsetRanges.toList,
+      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
+    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
+    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
+
+    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
+    Some(rdd)
+  }
+
+  override def start(): Unit = {
+  }
+
+  def stop(): Unit = {
+  }
+
+  private[streaming]
+  class DirectKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) {
+    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] 
= {
+      data.asInstanceOf[mutable.HashMap[Time, 
Array[OffsetRange.OffsetRangeTuple]]]
+    }
+
+    override def update(time: Time) {
+      batchForTime.clear()
+      generatedRDDs.foreach { kv =>
+        val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, 
R]].offsetRanges.map(_.toTuple).toArray
+        batchForTime += kv._1 -> a
+      }
+    }
+
+    override def cleanup(time: Time) { }
+
+    override def restore() {
+      // this is assuming that the topics don't change during execution, which 
is true currently
+      val topics = fromOffsets.keySet
+      val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
+
+      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
+         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
+         generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+           context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, 
messageHandler)
+      }
+    }
+  }
+
+  /**
+   * A RateController to retrieve the rate from RateEstimator.
+   */
+  private[streaming] class DirectKafkaRateController(id: Int, estimator: 
RateEstimator)
+    extends RateController(id, estimator) {
+    override def publish(rate: Long): Unit = ()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
new file mode 100644
index 0000000..726b5d8
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import kafka.api._
+import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, 
TopicAndPartition}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Convenience methods for interacting with a Kafka cluster.
+ * See <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol";>
+ * A Guide To The Kafka Protocol</a> for more details on individual api calls.
+ * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+ * configuration parameters</a>.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form
+ */
+@DeveloperApi
+class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
+  import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
+
+  // ConsumerConfig isn't serializable
+  @transient private var _config: SimpleConsumerConfig = null
+
+  def config: SimpleConsumerConfig = this.synchronized {
+    if (_config == null) {
+      _config = SimpleConsumerConfig(kafkaParams)
+    }
+    _config
+  }
+
+  def connect(host: String, port: Int): SimpleConsumer =
+    new SimpleConsumer(host, port, config.socketTimeoutMs,
+      config.socketReceiveBufferBytes, config.clientId)
+
+  def connectLeader(topic: String, partition: Int): Either[Err, 
SimpleConsumer] =
+    findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
+
+  // Metadata api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
+  // scalastyle:on
+
+  def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
+    val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
+      0, config.clientId, Seq(topic))
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp: TopicMetadataResponse = consumer.send(req)
+      resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.find(_.partitionId == partition)
+      }.foreach { pm: PartitionMetadata =>
+        pm.leader.foreach { leader =>
+          return Right((leader.host, leader.port))
+        }
+      }
+    }
+    Left(errs)
+  }
+
+  def findLeaders(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
+    val topics = topicAndPartitions.map(_.topic)
+    val response = getPartitionMetadata(topics).right
+    val answer = response.flatMap { tms: Set[TopicMetadata] =>
+      val leaderMap = tms.flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
+          val tp = TopicAndPartition(tm.topic, pm.partitionId)
+          if (topicAndPartitions(tp)) {
+            pm.leader.map { l =>
+              tp -> (l.host -> l.port)
+            }
+          } else {
+            None
+          }
+        }
+      }.toMap
+
+      if (leaderMap.keys.size == topicAndPartitions.size) {
+        Right(leaderMap)
+      } else {
+        val missing = topicAndPartitions.diff(leaderMap.keySet)
+        val err = new Err
+        err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
+        Left(err)
+      }
+    }
+    answer
+  }
+
+  def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] 
= {
+    getPartitionMetadata(topics).right.map { r =>
+      r.flatMap { tm: TopicMetadata =>
+        tm.partitionsMetadata.map { pm: PartitionMetadata =>
+          TopicAndPartition(tm.topic, pm.partitionId)
+        }
+      }
+    }
+  }
+
+  def getPartitionMetadata(topics: Set[String]): Either[Err, 
Set[TopicMetadata]] = {
+    val req = TopicMetadataRequest(
+      TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp: TopicMetadataResponse = consumer.send(req)
+      val respErrs = resp.topicsMetadata.filter(m => m.errorCode != 
ErrorMapping.NoError)
+
+      if (respErrs.isEmpty) {
+        return Right(resp.topicsMetadata.toSet)
+      } else {
+        respErrs.foreach { m =>
+          val cause = ErrorMapping.exceptionFor(m.errorCode)
+          val msg = s"Error getting partition metadata for '${m.topic}'. Does 
the topic exist?"
+          errs.append(new SparkException(msg, cause))
+        }
+      }
+    }
+    Left(errs)
+  }
+
+  // Leader offset api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
+  // scalastyle:on
+
+  def getLatestLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
+
+  def getEarliestLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
+    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
+
+  def getLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition],
+      before: Long
+    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
+    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
+      r.map { kv =>
+        // mapValues isn't serializable, see SI-7005
+        kv._1 -> kv._2.head
+      }
+    }
+  }
+
+  private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
+    m.groupBy(_._2).map { kv =>
+      kv._1 -> kv._2.keys.toSeq
+    }
+
+  def getLeaderOffsets(
+      topicAndPartitions: Set[TopicAndPartition],
+      before: Long,
+      maxNumOffsets: Int
+    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
+    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
+      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = 
flip(tpToLeader)
+      val leaders = leaderToTp.keys
+      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
+      val errs = new Err
+      withBrokers(leaders, errs) { consumer =>
+        val partitionsToGetOffsets: Seq[TopicAndPartition] =
+          leaderToTp((consumer.host, consumer.port))
+        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
+          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
+        }.toMap
+        val req = OffsetRequest(reqMap)
+        val resp = consumer.getOffsetsBefore(req)
+        val respMap = resp.partitionErrorAndOffsets
+        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
+          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
+            if (por.error == ErrorMapping.NoError) {
+              if (por.offsets.nonEmpty) {
+                result += tp -> por.offsets.map { off =>
+                  LeaderOffset(consumer.host, consumer.port, off)
+                }
+              } else {
+                errs.append(new SparkException(
+                  s"Empty offsets for ${tp}, is ${before} before log 
beginning?"))
+              }
+            } else {
+              errs.append(ErrorMapping.exceptionFor(por.error))
+            }
+          }
+        }
+        if (result.keys.size == topicAndPartitions.size) {
+          return Right(result)
+        }
+      }
+      val missing = topicAndPartitions.diff(result.keySet)
+      errs.append(new SparkException(s"Couldn't find leader offsets for 
${missing}"))
+      Left(errs)
+    }
+  }
+
+  // Consumer offset api
+  // scalastyle:off
+  // 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
+  // scalastyle:on
+
+  // this 0 here indicates api version, in this case the original ZK backed 
api.
+  private def defaultConsumerApiVersion: Short = 0
+
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
+  def getConsumerOffsets(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, Long]] =
+    getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
+
+  def getConsumerOffsets(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition],
+      consumerApiVersion: Short
+    ): Either[Err, Map[TopicAndPartition, Long]] = {
+    getConsumerOffsetMetadata(groupId, topicAndPartitions, 
consumerApiVersion).right.map { r =>
+      r.map { kv =>
+        kv._1 -> kv._2.offset
+      }
+    }
+  }
+
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
+  def getConsumerOffsetMetadata(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition]
+    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
+    getConsumerOffsetMetadata(groupId, topicAndPartitions, 
defaultConsumerApiVersion)
+
+  def getConsumerOffsetMetadata(
+      groupId: String,
+      topicAndPartitions: Set[TopicAndPartition],
+      consumerApiVersion: Short
+    ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
+    var result = Map[TopicAndPartition, OffsetMetadataAndError]()
+    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 
consumerApiVersion)
+    val errs = new Err
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp = consumer.fetchOffsets(req)
+      val respMap = resp.requestInfo
+      val needed = topicAndPartitions.diff(result.keySet)
+      needed.foreach { tp: TopicAndPartition =>
+        respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
+          if (ome.error == ErrorMapping.NoError) {
+            result += tp -> ome
+          } else {
+            errs.append(ErrorMapping.exceptionFor(ome.error))
+          }
+        }
+      }
+      if (result.keys.size == topicAndPartitions.size) {
+        return Right(result)
+      }
+    }
+    val missing = topicAndPartitions.diff(result.keySet)
+    errs.append(new SparkException(s"Couldn't find consumer offsets for 
${missing}"))
+    Left(errs)
+  }
+
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
+  def setConsumerOffsets(
+      groupId: String,
+      offsets: Map[TopicAndPartition, Long]
+    ): Either[Err, Map[TopicAndPartition, Short]] =
+    setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
+
+  def setConsumerOffsets(
+      groupId: String,
+      offsets: Map[TopicAndPartition, Long],
+      consumerApiVersion: Short
+    ): Either[Err, Map[TopicAndPartition, Short]] = {
+    val meta = offsets.map { kv =>
+      kv._1 -> OffsetAndMetadata(kv._2)
+    }
+    setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
+  }
+
+  /** Requires Kafka >= 0.8.1.1.  Defaults to the original ZooKeeper backed 
api version. */
+  def setConsumerOffsetMetadata(
+      groupId: String,
+      metadata: Map[TopicAndPartition, OffsetAndMetadata]
+    ): Either[Err, Map[TopicAndPartition, Short]] =
+    setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
+
+  def setConsumerOffsetMetadata(
+      groupId: String,
+      metadata: Map[TopicAndPartition, OffsetAndMetadata],
+      consumerApiVersion: Short
+    ): Either[Err, Map[TopicAndPartition, Short]] = {
+    var result = Map[TopicAndPartition, Short]()
+    val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
+    val errs = new Err
+    val topicAndPartitions = metadata.keySet
+    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
+      val resp = consumer.commitOffsets(req)
+      val respMap = resp.commitStatus
+      val needed = topicAndPartitions.diff(result.keySet)
+      needed.foreach { tp: TopicAndPartition =>
+        respMap.get(tp).foreach { err: Short =>
+          if (err == ErrorMapping.NoError) {
+            result += tp -> err
+          } else {
+            errs.append(ErrorMapping.exceptionFor(err))
+          }
+        }
+      }
+      if (result.keys.size == topicAndPartitions.size) {
+        return Right(result)
+      }
+    }
+    val missing = topicAndPartitions.diff(result.keySet)
+    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
+    Left(errs)
+  }
+
+  // Try a call against potentially multiple brokers, accumulating errors
+  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
+    (fn: SimpleConsumer => Any): Unit = {
+    brokers.foreach { hp =>
+      var consumer: SimpleConsumer = null
+      try {
+        consumer = connect(hp._1, hp._2)
+        fn(consumer)
+      } catch {
+        case NonFatal(e) =>
+          errs.append(e)
+      } finally {
+        if (consumer != null) {
+          consumer.close()
+        }
+      }
+    }
+  }
+}
+
+@DeveloperApi
+object KafkaCluster {
+  type Err = ArrayBuffer[Throwable]
+
+  /** If the result is right, return it, otherwise throw SparkException */
+  def checkErrors[T](result: Either[Err, T]): T = {
+    result.fold(
+      errs => throw new SparkException(errs.mkString("\n")),
+      ok => ok
+    )
+  }
+
+  case class LeaderOffset(host: String, port: Int, offset: Long)
+
+  /**
+   * High-level kafka consumers connect to ZK.  ConsumerConfig assumes this 
use case.
+   * Simple consumers connect directly to brokers, but need many of the same 
configs.
+   * This subclass won't warn about missing ZK params, or presence of broker 
params.
+   */
+  class SimpleConsumerConfig private(brokers: String, originalProps: 
Properties)
+      extends ConsumerConfig(originalProps) {
+    val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
+      val hpa = hp.split(":")
+      if (hpa.size == 1) {
+        throw new SparkException(s"Broker not in the correct format of 
<host>:<port> [$brokers]")
+      }
+      (hpa(0), hpa(1).toInt)
+    }
+  }
+
+  object SimpleConsumerConfig {
+    /**
+     * Make a consumer config without requiring group.id or zookeeper.connect,
+     * since communicating with brokers also needs common settings such as 
timeout
+     */
+    def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
+      // These keys are from other pre-existing kafka configs for specifying 
brokers, accept either
+      val brokers = kafkaParams.get("metadata.broker.list")
+        .orElse(kafkaParams.get("bootstrap.servers"))
+        .getOrElse(throw new SparkException(
+          "Must specify metadata.broker.list or bootstrap.servers"))
+
+      val props = new Properties()
+      kafkaParams.foreach { case (key, value) =>
+        // prevent warnings on parameters ConsumerConfig doesn't know about
+        if (key != "metadata.broker.list" && key != "bootstrap.servers") {
+          props.put(key, value)
+        }
+      }
+
+      Seq("zookeeper.connect", "group.id").foreach { s =>
+        if (!props.containsKey(s)) {
+          props.setProperty(s, "")
+        }
+      }
+
+      new SimpleConsumerConfig(brokers, props)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
new file mode 100644
index 0000000..3713bda
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+
+import scala.collection.Map
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Input stream that pulls messages from a Kafka Broker.
+ *
+ * @param kafkaParams Map of kafka configuration parameters.
+ *                    See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
+ * in its own thread.
+ * @param storageLevel RDD storage level.
+ */
+private[streaming]
+class KafkaInputDStream[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
+    _ssc: StreamingContext,
+    kafkaParams: Map[String, String],
+    topics: Map[String, Int],
+    useReliableReceiver: Boolean,
+    storageLevel: StorageLevel
+  ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
+
+  def getReceiver(): Receiver[(K, V)] = {
+    if (!useReliableReceiver) {
+      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+    } else {
+      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+    }
+  }
+}
+
+private[streaming]
+class KafkaReceiver[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag](
+    kafkaParams: Map[String, String],
+    topics: Map[String, Int],
+    storageLevel: StorageLevel
+  ) extends Receiver[(K, V)](storageLevel) with Logging {
+
+  // Connection to Kafka
+  var consumerConnector: ConsumerConnector = null
+
+  def onStop() {
+    if (consumerConnector != null) {
+      consumerConnector.shutdown()
+      consumerConnector = null
+    }
+  }
+
+  def onStart() {
+
+    logInfo("Starting Kafka Consumer Stream with group: " + 
kafkaParams("group.id"))
+
+    // Kafka connection properties
+    val props = new Properties()
+    kafkaParams.foreach(param => props.put(param._1, param._2))
+
+    val zkConnect = kafkaParams("zookeeper.connect")
+    // Create the connection to the cluster
+    logInfo("Connecting to Zookeeper: " + zkConnect)
+    val consumerConfig = new ConsumerConfig(props)
+    consumerConnector = Consumer.create(consumerConfig)
+    logInfo("Connected to " + zkConnect)
+
+    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[K]]
+    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(consumerConfig.props)
+      .asInstanceOf[Decoder[V]]
+
+    // Create threads for each topic/message Stream we are listening
+    val topicMessageStreams = consumerConnector.createMessageStreams(
+      topics, keyDecoder, valueDecoder)
+
+    val executorPool =
+      ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, 
"KafkaMessageHandler")
+    try {
+      // Start the messages handler for each partition
+      topicMessageStreams.values.foreach { streams =>
+        streams.foreach { stream => executorPool.submit(new 
MessageHandler(stream)) }
+      }
+    } finally {
+      executorPool.shutdown() // Just causes threads to terminate after work 
is done
+    }
+  }
+
+  // Handles Kafka messages
+  private class MessageHandler(stream: KafkaStream[K, V])
+    extends Runnable {
+    def run() {
+      logInfo("Starting MessageHandler.")
+      try {
+        val streamIterator = stream.iterator()
+        while (streamIterator.hasNext()) {
+          val msgAndMetadata = streamIterator.next()
+          store((msgAndMetadata.key, msgAndMetadata.message))
+        }
+      } catch {
+        case e: Throwable => reportError("Error handling message; exiting", e)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
new file mode 100644
index 0000000..d4881b1
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.api.{FetchRequestBuilder, FetchResponse}
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.consumer.SimpleConsumer
+import kafka.message.{MessageAndMetadata, MessageAndOffset}
+import kafka.serializer.Decoder
+import kafka.utils.VerifiableProperties
+
+import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.NextIterator
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+ * configuration parameters</a>. Requires "metadata.broker.list" or 
"bootstrap.servers" to be set
+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging to 
this RDD
+ * @param messageHandler function for translating each message into the 
desired type
+ */
+private[kafka]
+class KafkaRDD[
+  K: ClassTag,
+  V: ClassTag,
+  U <: Decoder[_]: ClassTag,
+  T <: Decoder[_]: ClassTag,
+  R: ClassTag] private[spark] (
+    sc: SparkContext,
+    kafkaParams: Map[String, String],
+    val offsetRanges: Array[OffsetRange],
+    leaders: Map[TopicAndPartition, (String, Int)],
+    messageHandler: MessageAndMetadata[K, V] => R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+  override def getPartitions: Array[Partition] = {
+    offsetRanges.zipWithIndex.map { case (o, i) =>
+        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
+        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, 
o.untilOffset, host, port)
+    }.toArray
+  }
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(
+      timeout: Long,
+      confidence: Double = 0.95
+  ): PartialResult[BoundedDouble] = {
+    val c = count
+    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+  }
+
+  override def isEmpty(): Boolean = count == 0L
+
+  override def take(num: Int): Array[R] = {
+    val nonEmptyPartitions = this.partitions
+      .map(_.asInstanceOf[KafkaRDDPartition])
+      .filter(_.count > 0)
+
+    if (num < 1 || nonEmptyPartitions.isEmpty) {
+      return new Array[R](0)
+    }
+
+    // Determine in advance how many messages need to be taken from each 
partition
+    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
+      val remain = num - result.values.sum
+      if (remain > 0) {
+        val taken = Math.min(remain, part.count)
+        result + (part.index -> taken.toInt)
+      } else {
+        result
+      }
+    }
+
+    val buf = new ArrayBuffer[R]
+    val res = context.runJob(
+      this,
+      (tc: TaskContext, it: Iterator[R]) => 
it.take(parts(tc.partitionId)).toArray,
+      parts.keys.toArray)
+    res.foreach(buf ++= _)
+    buf.toArray
+  }
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    // TODO is additional hostname resolution necessary here
+    Seq(part.host)
+  }
+
+  private def errBeginAfterEnd(part: KafkaRDDPartition): String =
+    s"Beginning offset ${part.fromOffset} is after the ending offset 
${part.untilOffset} " +
+      s"for topic ${part.topic} partition ${part.partition}. " +
+      "You either provided an invalid fromOffset, or the Kafka topic has been 
damaged"
+
+  private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
+    s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
+    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
+    " This should not happen, and indicates that messages may have been lost"
+
+  private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): 
String =
+    s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
+    s"for topic ${part.topic} partition ${part.partition} start 
${part.fromOffset}." +
+    " This should not happen, and indicates a message may have been skipped"
+
+  override def compute(thePart: Partition, context: TaskContext): Iterator[R] 
= {
+    val part = thePart.asInstanceOf[KafkaRDDPartition]
+    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+    if (part.fromOffset == part.untilOffset) {
+      log.info(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
+        s"skipping ${part.topic} ${part.partition}")
+      Iterator.empty
+    } else {
+      new KafkaRDDIterator(part, context)
+    }
+  }
+
+  private class KafkaRDDIterator(
+      part: KafkaRDDPartition,
+      context: TaskContext) extends NextIterator[R] {
+
+    context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
+      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+
+    val kc = new KafkaCluster(kafkaParams)
+    val keyDecoder = 
classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(kc.config.props)
+      .asInstanceOf[Decoder[K]]
+    val valueDecoder = 
classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+      .newInstance(kc.config.props)
+      .asInstanceOf[Decoder[V]]
+    val consumer = connectLeader
+    var requestOffset = part.fromOffset
+    var iter: Iterator[MessageAndOffset] = null
+
+    // The idea is to use the provided preferred host, except on task retry 
attempts,
+    // to minimize number of kafka metadata requests
+    private def connectLeader: SimpleConsumer = {
+      if (context.attemptNumber > 0) {
+        kc.connectLeader(part.topic, part.partition).fold(
+          errs => throw new SparkException(
+            s"Couldn't connect to leader for topic ${part.topic} 
${part.partition}: " +
+              errs.mkString("\n")),
+          consumer => consumer
+        )
+      } else {
+        kc.connect(part.host, part.port)
+      }
+    }
+
+    private def handleFetchErr(resp: FetchResponse) {
+      if (resp.hasError) {
+        val err = resp.errorCode(part.topic, part.partition)
+        if (err == ErrorMapping.LeaderNotAvailableCode ||
+          err == ErrorMapping.NotLeaderForPartitionCode) {
+          log.error(s"Lost leader for topic ${part.topic} partition 
${part.partition}, " +
+            s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
+          Thread.sleep(kc.config.refreshLeaderBackoffMs)
+        }
+        // Let normal rdd retry sort out reconnect attempts
+        throw ErrorMapping.exceptionFor(err)
+      }
+    }
+
+    private def fetchBatch: Iterator[MessageAndOffset] = {
+      val req = new FetchRequestBuilder()
+        .addFetch(part.topic, part.partition, requestOffset, 
kc.config.fetchMessageMaxBytes)
+        .build()
+      val resp = consumer.fetch(req)
+      handleFetchErr(resp)
+      // kafka may return a batch that starts before the requested offset
+      resp.messageSet(part.topic, part.partition)
+        .iterator
+        .dropWhile(_.offset < requestOffset)
+    }
+
+    override def close(): Unit = {
+      if (consumer != null) {
+        consumer.close()
+      }
+    }
+
+    override def getNext(): R = {
+      if (iter == null || !iter.hasNext) {
+        iter = fetchBatch
+      }
+      if (!iter.hasNext) {
+        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
+        finished = true
+        null.asInstanceOf[R]
+      } else {
+        val item = iter.next()
+        if (item.offset >= part.untilOffset) {
+          assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, 
part))
+          finished = true
+          null.asInstanceOf[R]
+        } else {
+          requestOffset = item.nextOffset
+          messageHandler(new MessageAndMetadata(
+            part.topic, part.partition, item.message, item.offset, keyDecoder, 
valueDecoder))
+        }
+      }
+    }
+  }
+}
+
+private[kafka]
+object KafkaRDD {
+  import KafkaCluster.LeaderOffset
+
+  /**
+   * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
+   * configuration parameters</a>.
+   *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
+   *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+   * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+   *  starting point of the batch
+   * @param untilOffsets per-topic/partition Kafka offsets defining the 
(exclusive)
+   *  ending point of the batch
+   * @param messageHandler function for translating each message into the 
desired type
+   */
+  def apply[
+    K: ClassTag,
+    V: ClassTag,
+    U <: Decoder[_]: ClassTag,
+    T <: Decoder[_]: ClassTag,
+    R: ClassTag](
+      sc: SparkContext,
+      kafkaParams: Map[String, String],
+      fromOffsets: Map[TopicAndPartition, Long],
+      untilOffsets: Map[TopicAndPartition, LeaderOffset],
+      messageHandler: MessageAndMetadata[K, V] => R
+    ): KafkaRDD[K, V, U, T, R] = {
+    val leaders = untilOffsets.map { case (tp, lo) =>
+        tp -> (lo.host, lo.port)
+    }.toMap
+
+    val offsetRanges = fromOffsets.map { case (tp, fo) =>
+        val uo = untilOffsets(tp)
+        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
+    }.toArray
+
+    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, 
messageHandler)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
new file mode 100644
index 0000000..02917be
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import org.apache.spark.Partition
+
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ * @param host preferred kafka host, i.e. the leader at the time the rdd was 
created
+ * @param port preferred kafka host's port
+ */
+private[kafka]
+class KafkaRDDPartition(
+  val index: Int,
+  val topic: String,
+  val partition: Int,
+  val fromOffset: Long,
+  val untilOffset: Long,
+  val host: String,
+  val port: Int
+) extends Partition {
+  /** Number of messages this partition refers to */
+  def count(): Long = untilOffset - fromOffset
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/56e1e2f1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
new file mode 100644
index 0000000..d9d4240
--- /dev/null
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.io.File
+import java.lang.{Integer => JInt}
+import java.net.InetSocketAddress
+import java.util.{Map => JMap, Properties}
+import java.util.concurrent.TimeoutException
+
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+import kafka.admin.AdminUtils
+import kafka.api.Request
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer.StringEncoder
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
+
+/**
+ * This is a helper class for Kafka test suites. This has the functionality to 
set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ *
+ * The reason to put Kafka test utility class in src is to test Python related 
Kafka APIs.
+ */
+private[kafka] class KafkaTestUtils extends Logging {
+
+  // Zookeeper related configurations
+  private val zkHost = "localhost"
+  private var zkPort: Int = 0
+  private val zkConnectionTimeout = 60000
+  private val zkSessionTimeout = 6000
+
+  private var zookeeper: EmbeddedZookeeper = _
+
+  private var zkClient: ZkClient = _
+
+  // Kafka broker related configurations
+  private val brokerHost = "localhost"
+  private var brokerPort = 9092
+  private var brokerConf: KafkaConfig = _
+
+  // Kafka broker server
+  private var server: KafkaServer = _
+
+  // Kafka producer
+  private var producer: Producer[String, String] = _
+
+  // Flag to test whether the system is correctly started
+  private var zkReady = false
+  private var brokerReady = false
+
+  def zkAddress: String = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
+    s"$zkHost:$zkPort"
+  }
+
+  def brokerAddress: String = {
+    assert(brokerReady, "Kafka not setup yet or already torn down, cannot get 
broker address")
+    s"$brokerHost:$brokerPort"
+  }
+
+  def zookeeperClient: ZkClient = {
+    assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper client")
+    Option(zkClient).getOrElse(
+      throw new IllegalStateException("Zookeeper client is not yet 
initialized"))
+  }
+
+  // Set up the Embedded Zookeeper server and get the proper Zookeeper port
+  private def setupEmbeddedZookeeper(): Unit = {
+    // Zookeeper server startup
+    zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+    // Get the actual zookeeper binding port
+    zkPort = zookeeper.actualPort
+    zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout,
+      ZKStringSerializer)
+    zkReady = true
+  }
+
+  // Set up the Embedded Kafka server
+  private def setupEmbeddedKafkaServer(): Unit = {
+    assert(zkReady, "Zookeeper should be set up beforehand")
+
+    // Kafka broker startup
+    Utils.startServiceOnPort(brokerPort, port => {
+      brokerPort = port
+      brokerConf = new KafkaConfig(brokerConfiguration)
+      server = new KafkaServer(brokerConf)
+      server.startup()
+      (server, port)
+    }, new SparkConf(), "KafkaBroker")
+
+    brokerReady = true
+  }
+
+  /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
+  def setup(): Unit = {
+    setupEmbeddedZookeeper()
+    setupEmbeddedKafkaServer()
+  }
+
+  /** Teardown the whole servers, including Kafka broker and Zookeeper */
+  def teardown(): Unit = {
+    brokerReady = false
+    zkReady = false
+
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
+
+    if (server != null) {
+      server.shutdown()
+      server = null
+    }
+
+    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+
+    if (zkClient != null) {
+      zkClient.close()
+      zkClient = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
+    }
+  }
+
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+    AdminUtils.createTopic(zkClient, topic, partitions, 1)
+    // wait until metadata is propagated
+    (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, 
p) }
+  }
+
+  /** Single-argument version for backwards compatibility */
+  def createTopic(topic: String): Unit = createTopic(topic, 1)
+
+  /** Java-friendly function for sending messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
+    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+  }
+
+  /** Send the messages to the Kafka broker */
+  def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
+    val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) 
}.toArray
+    sendMessages(topic, messages)
+  }
+
+  /** Send the array of messages to the Kafka broker */
+  def sendMessages(topic: String, messages: Array[String]): Unit = {
+    producer = new Producer[String, String](new 
ProducerConfig(producerConfiguration))
+    producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) 
}: _*)
+    producer.close()
+    producer = null
+  }
+
+  private def brokerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("broker.id", "0")
+    props.put("host.name", "localhost")
+    props.put("port", brokerPort.toString)
+    props.put("log.dir", Utils.createTempDir().getAbsolutePath)
+    props.put("zookeeper.connect", zkAddress)
+    props.put("log.flush.interval.messages", "1")
+    props.put("replica.socket.timeout.ms", "1500")
+    props
+  }
+
+  private def producerConfiguration: Properties = {
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerAddress)
+    props.put("serializer.class", classOf[StringEncoder].getName)
+    // wait for all in-sync replicas to ack sends
+    props.put("request.required.acks", "-1")
+    props
+  }
+
+  // A simplified version of scalatest eventually, rewritten here to avoid 
adding extra test
+  // dependency
+  def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
+    def makeAttempt(): Either[Throwable, T] = {
+      try {
+        Right(func)
+      } catch {
+        case e if NonFatal(e) => Left(e)
+      }
+    }
+
+    val startTime = System.currentTimeMillis()
+    @tailrec
+    def tryAgain(attempt: Int): T = {
+      makeAttempt() match {
+        case Right(result) => result
+        case Left(e) =>
+          val duration = System.currentTimeMillis() - startTime
+          if (duration < timeout.milliseconds) {
+            Thread.sleep(interval.milliseconds)
+          } else {
+            throw new TimeoutException(e.getMessage)
+          }
+
+          tryAgain(attempt + 1)
+      }
+    }
+
+    tryAgain(1)
+  }
+
+  private def waitUntilMetadataIsPropagated(topic: String, partition: Int): 
Unit = {
+    def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, 
partition) match {
+      case Some(partitionState) =>
+        val leaderAndInSyncReplicas = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
+
+        ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
+          Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
+          leaderAndInSyncReplicas.isr.size >= 1
+
+      case _ =>
+        false
+    }
+    eventually(Time(10000), Time(100)) {
+      assert(isPropagated, s"Partition [$topic, $partition] metadata not 
propagated after timeout")
+    }
+  }
+
+  private class EmbeddedZookeeper(val zkConnect: String) {
+    val snapshotDir = Utils.createTempDir()
+    val logDir = Utils.createTempDir()
+
+    val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
+    val (ip, port) = {
+      val splits = zkConnect.split(":")
+      (splits(0), splits(1).toInt)
+    }
+    val factory = new NIOServerCnxnFactory()
+    factory.configure(new InetSocketAddress(ip, port), 16)
+    factory.startup(zookeeper)
+
+    val actualPort = factory.getLocalPort
+
+    def shutdown() {
+      factory.shutdown()
+      Utils.deleteRecursively(snapshotDir)
+      Utils.deleteRecursively(logDir)
+    }
+  }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to