[SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration

## What changes were proposed in this pull request?

Remove Kafka 0.8 integration

## How was this patch tested?

Existing tests, build scripts

Closes #22703 from srowen/SPARK-25705.

Authored-by: Sean Owen <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/703e6da1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/703e6da1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/703e6da1

Branch: refs/heads/master
Commit: 703e6da1ecb52ab5b8f42b3b4cac39f27caa51d8
Parents: 2c664ed
Author: Sean Owen <[email protected]>
Authored: Tue Oct 16 09:10:24 2018 -0500
Committer: Sean Owen <[email protected]>
Committed: Tue Oct 16 09:10:24 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/SparkSubmit.scala   |   6 +-
 dev/create-release/release-build.sh             |   4 +-
 dev/mima                                        |   2 +-
 dev/run-tests.py                                |   1 -
 dev/sbt-checkstyle                              |   1 -
 dev/scalastyle                                  |   1 -
 dev/sparktestsupport/modules.py                 |  22 -
 dev/test-dependencies.sh                        |   2 +-
 docs/building-spark.md                          |   9 -
 docs/configuration.md                           |  12 +-
 docs/streaming-kafka-0-10-integration.md        |   5 +-
 docs/streaming-kafka-0-8-integration.md         | 196 -----
 docs/streaming-kafka-integration.md             |  53 +-
 docs/streaming-programming-guide.md             |   8 +-
 docs/structured-streaming-programming-guide.md  |   6 +-
 .../python/streaming/direct_kafka_wordcount.py  |  56 --
 .../main/python/streaming/kafka_wordcount.py    |  56 --
 .../streaming/kafka010/ConsumerStrategy.scala   |  35 +-
 .../spark/streaming/kafka010/KafkaUtils.scala   |  15 -
 .../streaming/kafka010/LocationStrategy.scala   |  16 +-
 .../spark/streaming/kafka010/OffsetRange.scala  |   8 -
 .../streaming/kafka010/PerPartitionConfig.scala |   3 -
 external/kafka-0-8-assembly/pom.xml             | 170 ----
 external/kafka-0-8/pom.xml                      | 109 ---
 .../apache/spark/streaming/kafka/Broker.scala   |  68 --
 .../kafka/DirectKafkaInputDStream.scala         | 233 ------
 .../spark/streaming/kafka/KafkaCluster.scala    | 439 ----------
 .../streaming/kafka/KafkaInputDStream.scala     | 142 ----
 .../apache/spark/streaming/kafka/KafkaRDD.scala | 273 -------
 .../streaming/kafka/KafkaRDDPartition.scala     |  42 -
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 299 -------
 .../spark/streaming/kafka/KafkaUtils.scala      | 806 -------------------
 .../spark/streaming/kafka/OffsetRange.scala     | 112 ---
 .../streaming/kafka/ReliableKafkaReceiver.scala | 302 -------
 .../spark/streaming/kafka/package-info.java     |  21 -
 .../apache/spark/streaming/kafka/package.scala  |  23 -
 .../kafka/JavaDirectKafkaStreamSuite.java       | 170 ----
 .../streaming/kafka/JavaKafkaRDDSuite.java      | 156 ----
 .../streaming/kafka/JavaKafkaStreamSuite.java   | 144 ----
 .../src/test/resources/log4j.properties         |  28 -
 .../kafka/DirectKafkaStreamSuite.scala          | 636 ---------------
 .../streaming/kafka/KafkaClusterSuite.scala     |  86 --
 .../spark/streaming/kafka/KafkaRDDSuite.scala   | 182 -----
 .../streaming/kafka/KafkaStreamSuite.scala      |  92 ---
 .../kafka/ReliableKafkaStreamSuite.scala        | 153 ----
 pom.xml                                         |   8 -
 project/SparkBuild.scala                        |  12 +-
 python/docs/pyspark.streaming.rst               |   7 -
 python/pyspark/streaming/dstream.py             |   3 +-
 python/pyspark/streaming/kafka.py               | 506 ------------
 python/pyspark/streaming/tests.py               | 287 +------
 51 files changed, 39 insertions(+), 5987 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 13fa6d0..88df732 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -992,9 +992,9 @@ private[spark] object SparkSubmitUtils {
 
   // Exposed for testing.
   // These components are used to make the default exclusion rules for Spark 
dependencies.
-  // We need to specify each component explicitly, otherwise we miss 
spark-streaming-kafka-0-8 and
-  // other spark-streaming utility components. Underscore is there to 
differentiate between
-  // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x
+  // We need to specify each component explicitly, otherwise we miss
+  // spark-streaming utility components. Underscore is there to differentiate 
between
+  // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x
   val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", 
"launcher_", "mllib_",
     "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", 
"sql_", "streaming_",
     "tags_", "unsafe_")

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/create-release/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index b80f55d..a741a3b 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -115,7 +115,9 @@ SCALA_2_10_PROFILES="-Pscala-2.10"
 SCALA_2_11_PROFILES=
 if [[ $SPARK_VERSION > "2.3" ]]; then
   BASE_PROFILES="$BASE_PROFILES -Pkubernetes"
-  SCALA_2_11_PROFILES="-Pkafka-0-8"
+  if [[ $SPARK_VERSION < "3.0." ]]; then
+    SCALA_2_11_PROFILES="-Pkafka-0-8"
+  fi
 else
   PUBLISH_SCALA_2_10=1
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/mima
----------------------------------------------------------------------
diff --git a/dev/mima b/dev/mima
index a9ac8af..dc7b085 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
 FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
 cd "$FWDIR"
 
-SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pkubernetes -Pyarn -Pspark-ganglia-lgpl 
-Pkinesis-asl -Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl 
-Phive-thriftserver -Phive"
 TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
tools/fullClasspath" | tail -n1)"
 OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES 
"export oldDeps/fullClasspath" | tail -n1)"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index a125f5b..26045ee 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -332,7 +332,6 @@ def build_spark_sbt(hadoop_version):
     # Enable all of the profiles for the build:
     build_profiles = get_hadoop_profiles(hadoop_version) + 
modules.root.build_profile_flags
     sbt_goals = ["test:package",  # Build test jars as some tests depend on 
them
-                 "streaming-kafka-0-8-assembly/assembly",
                  "streaming-kinesis-asl-assembly/assembly"]
     profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/sbt-checkstyle
----------------------------------------------------------------------
diff --git a/dev/sbt-checkstyle b/dev/sbt-checkstyle
index 1e825db..18f3bd8 100755
--- a/dev/sbt-checkstyle
+++ b/dev/sbt-checkstyle
@@ -23,7 +23,6 @@ ERRORS=$(echo -e "q\n" \
     | build/sbt \
         -Pkinesis-asl \
         -Pmesos \
-        -Pkafka-0-8 \
         -Pkubernetes \
         -Pyarn \
         -Phive \

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/scalastyle
----------------------------------------------------------------------
diff --git a/dev/scalastyle b/dev/scalastyle
index 0448e1d..2d6ee0d 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -23,7 +23,6 @@ ERRORS=$(echo -e "q\n" \
     | build/sbt \
         -Pkinesis-asl \
         -Pmesos \
-        -Pkafka-0-8 \
         -Pkubernetes \
         -Pyarn \
         -Phive \

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index bd5f009..a63f9d8 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -252,24 +252,6 @@ streaming_kinesis_asl = Module(
 )
 
 
-streaming_kafka = Module(
-    name="streaming-kafka-0-8",
-    dependencies=[streaming],
-    source_file_regexes=[
-        "external/kafka-0-8",
-        "external/kafka-0-8-assembly",
-    ],
-    build_profile_flags=[
-        "-Pkafka-0-8",
-    ],
-    environ={
-        "ENABLE_KAFKA_0_8_TESTS": "1"
-    },
-    sbt_test_goals=[
-        "streaming-kafka-0-8/test",
-    ]
-)
-
 streaming_kafka_0_10 = Module(
     name="streaming-kafka-0-10",
     dependencies=[streaming],
@@ -374,15 +356,11 @@ pyspark_streaming = Module(
     dependencies=[
         pyspark_core,
         streaming,
-        streaming_kafka,
         streaming_kinesis_asl
     ],
     source_file_regexes=[
         "python/pyspark/streaming"
     ],
-    environ={
-        "ENABLE_KAFKA_0_8_TESTS": "1"
-    },
     python_test_goals=[
         "pyspark.streaming.util",
         "pyspark.streaming.tests",

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/test-dependencies.sh
----------------------------------------------------------------------
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index cc8f5d3..63e01e1 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
 # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style 
resolution.
 
 # NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pkubernetes 
-Pyarn -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn 
-Phive"
 MVN="build/mvn"
 HADOOP_PROFILES=(
     hadoop-2.7

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/building-spark.md
----------------------------------------------------------------------
diff --git a/docs/building-spark.md b/docs/building-spark.md
index b2775d2..6bcc30d 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -89,15 +89,6 @@ like ZooKeeper and Hadoop itself.
 ## Building with Kubernetes support
 
     ./build/mvn -Pkubernetes -DskipTests clean package
-    
-## Building with Kafka 0.8 support
-
-Kafka 0.8 support must be explicitly enabled with the `kafka-0-8` profile.
-Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
-
-    ./build/mvn -Pkafka-0-8 -DskipTests clean package
-
-Kafka 0.10 support is still automatically built.
 
 ## Building submodules individually
 

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 613e214..432b4cd 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2017,7 +2017,7 @@ showDF(properties, numRows = 200, truncate = FALSE)
   <td>
     Maximum rate (number of records per second) at which data will be read 
from each Kafka
     partition when using the new Kafka direct stream API. See the
-    <a href="streaming-kafka-integration.html">Kafka Integration guide</a>
+    <a href="streaming-kafka-0-10-integration.html">Kafka Integration guide</a>
     for more details.
   </td>
 </tr>
@@ -2030,16 +2030,6 @@ showDF(properties, numRows = 200, truncate = FALSE)
     </td>
 </tr>
 <tr>
-  <td><code>spark.streaming.kafka.maxRetries</code></td>
-  <td>1</td>
-  <td>
-    Maximum number of consecutive retries the driver will make in order to find
-    the latest offsets on the leader of each partition (a default value of 1
-    means that the driver will make a maximum of 2 attempts). Only applies to
-    the new Kafka direct stream API.
-  </td>
-</tr>
-<tr>
   <td><code>spark.streaming.ui.retainedBatches</code></td>
   <td>1000</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md 
b/docs/streaming-kafka-0-10-integration.md
index 386066a..c78459c 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -3,7 +3,10 @@ layout: global
 title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 
or higher)
 ---
 
-The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 
[Direct Stream 
approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers).
  It provides simple parallelism,  1:1 correspondence between Kafka partitions 
and Spark partitions, and access to offsets and metadata. However, because the 
newer integration uses the [new Kafka consumer 
API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, there are notable differences in usage. This version of the 
integration is marked as experimental, so the API is potentially subject to 
change.
+The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 
1:1 correspondence between Kafka 
+partitions and Spark partitions, and access to offsets and metadata. However, 
because the newer integration uses 
+the [new Kafka consumer 
API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the 
simple API, 
+there are notable differences in usage.
 
 ### Linking
 For Scala/Java applications using SBT/Maven project definitions, link your 
streaming application with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-0-8-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-8-integration.md 
b/docs/streaming-kafka-0-8-integration.md
deleted file mode 100644
index becf217..0000000
--- a/docs/streaming-kafka-0-8-integration.md
+++ /dev/null
@@ -1,196 +0,0 @@
----
-layout: global
-title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 
or higher)
----
-
-**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
-
-Here we explain how to configure Spark Streaming to receive data from Kafka. 
There are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new approach (introduced in Spark 1.3) without using 
Receivers. They have different programming models, performance characteristics, 
and semantics guarantees, so read on for more details.  Both approaches are 
considered stable APIs as of the current version of Spark.
-
-## Approach 1: Receiver-based Approach
-This approach uses a Receiver to receive the data. The Receiver is implemented 
using the Kafka high-level consumer API. As with all receivers, the data 
received from Kafka through a Receiver is stored in Spark executors, and then 
jobs launched by Spark Streaming processes the data.
-
-However, under default configuration, this approach can lose data under 
failures (see [receiver 
reliability](streaming-programming-guide.html#receiver-reliability). To ensure 
zero-data loss, you have to additionally enable Write-Ahead Logs in Spark 
Streaming (introduced in Spark 1.2). This synchronously saves all the received 
Kafka data into write-ahead logs on a distributed file system (e.g HDFS), so 
that all the data can be recovered on failure. See [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the 
streaming programming guide for more details on Write-Ahead Logs.
-
-Next, we discuss how to use this approach in your streaming application.
-
-1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
-
-               groupId = org.apache.spark
-               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
-               version = {{site.SPARK_VERSION_SHORT}}
-
-       For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
-
-2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               import org.apache.spark.streaming.kafka._
-
-               val kafkaStream = KafkaUtils.createStream(streamingContext,
-            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
-
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
-       </div>
-       <div data-lang="java" markdown="1">
-               import org.apache.spark.streaming.kafka.*;
-
-               JavaPairReceiverInputDStream<String, String> kafkaStream =
-                       KafkaUtils.createStream(streamingContext,
-            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
-
-    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
-
-       </div>
-       <div data-lang="python" markdown="1">
-               from pyspark.streaming.kafka import KafkaUtils
-
-               kafkaStream = KafkaUtils.createStream(streamingContext, \
-                       [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
-
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
-       </div>
-       </div>
-
-       **Points to remember:**
-
-       - Topic partitions in Kafka do not correlate to partitions of RDDs 
generated in Spark Streaming. So increasing the number of topic-specific 
partitions in the `KafkaUtils.createStream()` only increases the number of 
threads using which topics that are consumed within a single receiver. It does 
not increase the parallelism of Spark in processing the data. Refer to the main 
document for more information on that.
-
-       - Multiple Kafka input DStreams can be created with different groups 
and topics for parallel receiving of data using multiple receivers.
-
-       - If you have enabled Write-Ahead Logs with a replicated file system 
like HDFS, the received data is already being replicated in the log. Hence, the 
storage level in storage level for the input stream to 
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
-`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
-
-3. **Deploying:** As with any Spark applications, `spark-submit` is used to 
launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
-
-       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` 
and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
-
-       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
can be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is,
-
-           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
-
-       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-0-8-assembly` from the
-       [Maven 
repository](https://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
-
-## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this feature was introduced in Spark 1.3 
for the Scala and Java API, in Spark 1.4 for the Python API.
-
-This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
-
-- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there are Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
-
-- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write-Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write-Ahead Log. This second approach 
eliminates the problem as there is no receiver, and hence no need for 
Write-Ahead Logs. As long as you have sufficient Kafka retention, messages can 
be recovered from Kafka.
-
-- *Exactly-once semantics:* The first approach uses Kafka's high-level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with-write-ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper. Offsets are tracked by Spark Streaming within its 
checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures. In order to achieve exactly-once semantics for 
output of your results, your output operation that saves the data to an 
external data store must be either idempotent, or an atomic transa
 ction that saves results and offsets (see [Semantics of output 
operations](streaming-programming-guide.html#semantics-of-output-operations) in 
the main programming guide for further information).
-
-Note that one disadvantage of this approach is that it does not update offsets 
in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show 
progress. However, you can access the offsets processed by this approach in 
each batch and update Zookeeper yourself (see below).
-
-Next, we discuss how to use this approach in your streaming application.
-
-1. **Linking:** This approach is supported only in Scala/Java application. 
Link your SBT/Maven project with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
-
-               groupId = org.apache.spark
-               artifactId = 
spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}
-               version = {{site.SPARK_VERSION_SHORT}}
-
-2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               import org.apache.spark.streaming.kafka._
-
-               val directKafkaStream = KafkaUtils.createDirectStream[
-                       [key class], [value class], [key decoder class], [value 
decoder class] ](
-                       streamingContext, [map of Kafka parameters], [set of 
topics to consume])
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$).
-       </div>
-       <div data-lang="java" markdown="1">
-               import org.apache.spark.streaming.kafka.*;
-
-               JavaPairInputDStream<String, String> directKafkaStream =
-                       KafkaUtils.createDirectStream(streamingContext,
-                               [key class], [value class], [key decoder 
class], [value decoder class],
-                               [map of Kafka parameters], [set of topics to 
consume]);
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`MessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html).
-
-       </div>
-       <div data-lang="python" markdown="1">
-               from pyspark.streaming.kafka import KafkaUtils
-               directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})
-
-       You can also pass a `messageHandler` to `createDirectStream` to access 
`KafkaMessageAndMetadata` that contains metadata about the current message and 
transform it to any desired type.
-       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils).
-       </div>
-       </div>
-
-       In the Kafka parameters, you must specify either `metadata.broker.list` 
or `bootstrap.servers`.
-       By default, it will start consuming from the latest offset of each 
Kafka partition. If you set configuration `auto.offset.reset` in Kafka 
parameters to `smallest`, then it will start consuming from the smallest offset.
-
-       You can also start consuming from any arbitrary offset using other 
variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to 
access the Kafka offsets consumed in each batch, you can do the following.
-
-       <div class="codetabs">
-       <div data-lang="scala" markdown="1">
-               // Hold a reference to the current offset ranges, so it can be 
used downstream
-               var offsetRanges = Array.empty[OffsetRange]
-
-               directKafkaStream.transform { rdd =>
-                 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-                 rdd
-               }.map {
-                  ...
-               }.foreachRDD { rdd =>
-                 for (o <- offsetRanges) {
-                   println(s"${o.topic} ${o.partition} ${o.fromOffset} 
${o.untilOffset}")
-                 }
-                 ...
-               }
-       </div>
-       <div data-lang="java" markdown="1">
-               // Hold a reference to the current offset ranges, so it can be 
used downstream
-               AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference<>();
-
-               directKafkaStream.transformToPair(rdd -> {
-      OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-      offsetRanges.set(offsets);
-      return rdd;
-               }).map(
-                 ...
-               ).foreachRDD(rdd -> {
-      for (OffsetRange o : offsetRanges.get()) {
-        System.out.println(
-          o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
-        );
-      }
-      ...
-               });
-       </div>
-       <div data-lang="python" markdown="1">
-               offsetRanges = []
-
-               def storeOffsetRanges(rdd):
-                   global offsetRanges
-                   offsetRanges = rdd.offsetRanges()
-                   return rdd
-
-               def printOffsetRanges(rdd):
-                   for o in offsetRanges:
-                       print "%s %s %s %s" % (o.topic, o.partition, 
o.fromOffset, o.untilOffset)
-
-               directKafkaStream \
-                   .transform(storeOffsetRanges) \
-                   .foreachRDD(printOffsetRanges)
-       </div>
-       </div>
-
-       You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
-
-       Note that the typecast to HasOffsetRanges will only succeed if it is 
done in the first method called on the directKafkaStream, not later down a 
chain of methods. You can use transform() instead of foreachRDD() as your first 
method call in order to access offsets, then call further Spark methods. 
However, be aware that the one-to-one mapping between RDD partition and Kafka 
partition does not remain after any methods that shuffle or repartition, e.g. 
reduceByKey() or window().
-
-       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate 
(in messages per second) at which each Kafka partition will be read by this 
direct API.
-
-3. **Deploying:** This is same as the first approach.

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 4aca391..0ec5a31 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -3,52 +3,9 @@ layout: global
 title: Spark Streaming + Kafka Integration Guide
 ---
 
-[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Please 
read the [Kafka documentation](https://kafka.apache.org/documentation.html) 
thoroughly before starting an integration using Spark.
+[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, 
+replicated commit log service.  Please read the [Kafka 
documentation](https://kafka.apache.org/documentation.html) 
+thoroughly before starting an integration using Spark.
 
-The Kafka project introduced a new consumer API between versions 0.8 and 0.10, 
so there are 2 separate corresponding Spark Streaming packages available.  
Please choose the correct package for your brokers and desired features; note 
that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 
0.10 integration is not compatible with earlier brokers.
-
-**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.**
-
-<table class="table">
-<tr><th></th><th><a 
href="streaming-kafka-0-8-integration.html">spark-streaming-kafka-0-8</a></th><th><a
 
href="streaming-kafka-0-10-integration.html">spark-streaming-kafka-0-10</a></th></tr>
-<tr>
-  <td>Broker Version</td>
-  <td>0.8.2.1 or higher</td>
-  <td>0.10.0 or higher</td>
-</tr>
-<tr>
-  <td>API Maturity</td>
-  <td>Deprecated</td>
-  <td>Stable</td>
-</tr>
-<tr>
-  <td>Language Support</td>
-  <td>Scala, Java, Python</td>
-  <td>Scala, Java</td>
-</tr>
-<tr>
-  <td>Receiver DStream</td>
-  <td>Yes</td>
-  <td>No</td>
-</tr>
-<tr>
-  <td>Direct DStream</td>
-  <td>Yes</td>
-  <td>Yes</td>
-</tr>
-<tr>
-  <td>SSL / TLS Support</td>
-  <td>No</td>
-  <td>Yes</td>
-</tr>
-<tr>
-  <td>Offset Commit API</td>
-  <td>No</td>
-  <td>Yes</td>
-</tr>
-<tr>
-  <td>Dynamic Topic Subscription</td>
-  <td>No</td>
-  <td>Yes</td>
-</tr>
-</table>
+At the moment, Spark requires Kafka 0.10 and higher. See 
+<a href="streaming-kafka-0-10-integration.html">Kafka 0.10 integration 
documentation</a> for details.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 1103d5c..70bee50 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -745,7 +745,7 @@ and add it to the classpath.
 
 Some of these advanced sources are as follows.
 
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka broker versions 0.8.2.1 or higher. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka broker versions 0.10 or higher. See the [Kafka Integration 
Guide](streaming-kafka-0-10-integration.html) for more details.
 
 - **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kinesis Client Library 1.2.1. See the [Kinesis Integration 
Guide](streaming-kinesis-integration.html) for more details.
 
@@ -2172,7 +2172,7 @@ the input data stream (using 
`inputStream.repartition(<number of partitions>)`).
 This distributes the received batches of data across the specified number of 
machines in the cluster
 before further processing.
 
-For direct stream, please refer to [Spark Streaming + Kafka Integration 
Guide](streaming-kafka-integration.html)
+For direct stream, please refer to [Spark Streaming + Kafka Integration 
Guide](streaming-kafka-0-10-integration.html)
 
 ### Level of Parallelism in Data Processing
 {:.no_toc}
@@ -2433,7 +2433,7 @@ The following table summarizes the semantics under 
failures:
 
 ### With Kafka Direct API
 {:.no_toc}
-In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach is further discussed in the 
[Kafka Integration Guide](streaming-kafka-integration.html).
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach is further discussed in the 
[Kafka Integration Guide](streaming-kafka-0-10-integration.html).
 
 ## Semantics of output operations
 {:.no_toc}
@@ -2463,7 +2463,7 @@ additional effort may be necessary to achieve 
exactly-once semantics. There are
 
 # Where to Go from Here
 * Additional guides
-    - [Kafka Integration Guide](streaming-kafka-integration.html)
+    - [Kafka Integration Guide](streaming-kafka-0-10-integration.html)
     - [Kinesis Integration Guide](streaming-kinesis-integration.html)
     - [Custom Receiver Guide](streaming-custom-receivers.html)
 * Third-party DStream data sources can be found in [Third Party 
Projects](https://spark.apache.org/third-party-projects.html)

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b6e4277..1cedbb2 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -497,7 +497,7 @@ There are a few built-in sources.
 
   - **File source** - Reads files written in a directory as a stream of data. 
Supported file formats are text, csv, json, orc, parquet. See the docs of the 
DataStreamReader interface for a more up-to-date list, and supported options 
for each file format. Note that the files must be atomically placed in the 
given directory, which in most file systems, can be achieved by file move 
operations.
 
-  - **Kafka source** - Reads data from Kafka. It's compatible with Kafka 
broker versions 0.10.0 or higher. See the [Kafka Integration 
Guide](structured-streaming-kafka-integration.html) for more details.
+  - **Kafka source** - Reads data from Kafka. It's compatible with Kafka 
broker versions 0.10.0 or higher. See the [Kafka Integration 
Guide](structured-streaming-kafka-0-10-integration.html) for more details.
 
   - **Socket source (for testing)** - Reads UTF8 text data from a socket 
connection. The listening server socket is at the driver. Note that this should 
be used only for testing as this does not provide end-to-end fault-tolerance 
guarantees. 
 
@@ -566,7 +566,7 @@ Here are the details of all the sources in Spark.
   <tr>
     <td><b>Kafka Source</b></td>
     <td>
-        See the <a href="structured-streaming-kafka-integration.html">Kafka 
Integration Guide</a>.
+        See the <a 
href="structured-streaming-kafka-0-10-integration.html">Kafka Integration 
Guide</a>.
     </td>
     <td>Yes</td>
     <td></td>
@@ -1819,7 +1819,7 @@ Here are the details of all the sinks in Spark.
   <tr>
     <td><b>Kafka Sink</b></td>
     <td>Append, Update, Complete</td>
-    <td>See the <a href="structured-streaming-kafka-integration.html">Kafka 
Integration Guide</a></td>
+    <td>See the <a 
href="structured-streaming-kafka-0-10-integration.html">Kafka Integration 
Guide</a></td>
     <td>Yes (at-least-once)</td>
     <td>More details in the <a 
href="structured-streaming-kafka-integration.html">Kafka Integration 
Guide</a></td>
   </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/examples/src/main/python/streaming/direct_kafka_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py 
b/examples/src/main/python/streaming/direct_kafka_wordcount.py
deleted file mode 100644
index c5c186c..0000000
--- a/examples/src/main/python/streaming/direct_kafka_wordcount.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-r"""
- Counts words in UTF8 encoded, '\n' delimited text directly received from 
Kafka in every 2 seconds.
- Usage: direct_kafka_wordcount.py <broker_list> <topic>
-
- To run this on your local machine, you need to setup Kafka and create a 
producer first, see
- http://kafka.apache.org/documentation.html#quickstart
-
- and then run the example
-    `$ bin/spark-submit --jars \
-      
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
-      examples/src/main/python/streaming/direct_kafka_wordcount.py \
-      localhost:9092 test`
-"""
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.streaming import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
-
-if __name__ == "__main__":
-    if len(sys.argv) != 3:
-        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", 
file=sys.stderr)
-        sys.exit(-1)
-
-    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
-    ssc = StreamingContext(sc, 2)
-
-    brokers, topic = sys.argv[1:]
-    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 
brokers})
-    lines = kvs.map(lambda x: x[1])
-    counts = lines.flatMap(lambda line: line.split(" ")) \
-        .map(lambda word: (word, 1)) \
-        .reduceByKey(lambda a, b: a+b)
-    counts.pprint()
-
-    ssc.start()
-    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/examples/src/main/python/streaming/kafka_wordcount.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/streaming/kafka_wordcount.py 
b/examples/src/main/python/streaming/kafka_wordcount.py
deleted file mode 100644
index e9ee08b..0000000
--- a/examples/src/main/python/streaming/kafka_wordcount.py
+++ /dev/null
@@ -1,56 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-r"""
- Counts words in UTF8 encoded, '\n' delimited text received from the network 
every second.
- Usage: kafka_wordcount.py <zk> <topic>
-
- To run this on your local machine, you need to setup Kafka and create a 
producer first, see
- http://kafka.apache.org/documentation.html#quickstart
-
- and then run the example
-    `$ bin/spark-submit --jars \
-      
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \
-      examples/src/main/python/streaming/kafka_wordcount.py \
-      localhost:2181 test`
-"""
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-from pyspark.streaming import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
-
-if __name__ == "__main__":
-    if len(sys.argv) != 3:
-        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
-        sys.exit(-1)
-
-    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
-    ssc = StreamingContext(sc, 1)
-
-    zkQuorum, topic = sys.argv[1:]
-    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1})
-    lines = kvs.map(lambda x: x[1])
-    counts = lines.flatMap(lambda line: line.split(" ")) \
-        .map(lambda word: (word, 1)) \
-        .reduceByKey(lambda a, b: a+b)
-    counts.pprint()
-
-    ssc.start()
-    ssc.awaitTermination()

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index d2100fc..cf283a5 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -26,11 +26,9 @@ import org.apache.kafka.clients.consumer._
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
 
 /**
- * :: Experimental ::
  * Choice of how to create and configure underlying Kafka Consumers on driver 
and executors.
  * See [[ConsumerStrategies]] to obtain instances.
  * Kafka 0.10 consumers can require additional, sometimes complex, setup after 
object
@@ -38,7 +36,6 @@ import org.apache.spark.internal.Logging
  * @tparam K type of Kafka message key
  * @tparam V type of Kafka message value
  */
-@Experimental
 abstract class ConsumerStrategy[K, V] {
   /**
    * Kafka <a 
href="http://kafka.apache.org/documentation.html#newconsumerconfigs";>
@@ -208,13 +205,10 @@ private case class Assign[K, V](
 }
 
 /**
- * :: Experimental ::
- * object for obtaining instances of [[ConsumerStrategy]]
+ * Object for obtaining instances of [[ConsumerStrategy]]
  */
-@Experimental
 object ConsumerStrategies {
   /**
-   *  :: Experimental ::
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
@@ -227,7 +221,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def Subscribe[K, V](
       topics: Iterable[jl.String],
       kafkaParams: collection.Map[String, Object],
@@ -239,7 +232,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
@@ -249,7 +241,6 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def Subscribe[K, V](
       topics: Iterable[jl.String],
       kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
@@ -260,7 +251,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
@@ -273,7 +263,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def Subscribe[K, V](
       topics: ju.Collection[jl.String],
       kafkaParams: ju.Map[String, Object],
@@ -282,7 +271,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Subscribe to a collection of topics.
    * @param topics collection of topics to subscribe
    * @param kafkaParams Kafka
@@ -292,14 +280,13 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def Subscribe[K, V](
       topics: ju.Collection[jl.String],
       kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
     new Subscribe[K, V](topics, kafkaParams, 
ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
 
-  /** :: Experimental ::
+  /**
    * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
    * The pattern matching will be done periodically against topics existing at 
the time of check.
    * @param pattern pattern to subscribe to
@@ -313,7 +300,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def SubscribePattern[K, V](
       pattern: ju.regex.Pattern,
       kafkaParams: collection.Map[String, Object],
@@ -324,7 +310,7 @@ object ConsumerStrategies {
       new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new 
jl.Long(l)).asJava))
   }
 
-  /** :: Experimental ::
+  /**
    * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
    * The pattern matching will be done periodically against topics existing at 
the time of check.
    * @param pattern pattern to subscribe to
@@ -335,7 +321,6 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def SubscribePattern[K, V](
       pattern: ju.regex.Pattern,
       kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
@@ -345,7 +330,7 @@ object ConsumerStrategies {
       ju.Collections.emptyMap[TopicPartition, jl.Long]())
   }
 
-  /** :: Experimental ::
+  /**
    * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
    * The pattern matching will be done periodically against topics existing at 
the time of check.
    * @param pattern pattern to subscribe to
@@ -359,7 +344,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def SubscribePattern[K, V](
       pattern: ju.regex.Pattern,
       kafkaParams: ju.Map[String, Object],
@@ -367,7 +351,7 @@ object ConsumerStrategies {
     new SubscribePattern[K, V](pattern, kafkaParams, offsets)
   }
 
-  /** :: Experimental ::
+  /**
    * Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
    * The pattern matching will be done periodically against topics existing at 
the time of check.
    * @param pattern pattern to subscribe to
@@ -378,7 +362,6 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def SubscribePattern[K, V](
       pattern: ju.regex.Pattern,
       kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {
@@ -389,7 +372,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
@@ -402,7 +384,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def Assign[K, V](
       topicPartitions: Iterable[TopicPartition],
       kafkaParams: collection.Map[String, Object],
@@ -414,7 +395,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
@@ -424,7 +404,6 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def Assign[K, V](
       topicPartitions: Iterable[TopicPartition],
       kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = {
@@ -435,7 +414,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
@@ -448,7 +426,6 @@ object ConsumerStrategies {
    * TopicPartition, the committed offset (if applicable) or kafka param
    * auto.offset.reset will be used.
    */
-  @Experimental
   def Assign[K, V](
       topicPartitions: ju.Collection[TopicPartition],
       kafkaParams: ju.Map[String, Object],
@@ -457,7 +434,6 @@ object ConsumerStrategies {
   }
 
   /**
-   *  :: Experimental ::
    * Assign a fixed collection of TopicPartitions
    * @param topicPartitions collection of TopicPartitions to assign
    * @param kafkaParams Kafka
@@ -467,7 +443,6 @@ object ConsumerStrategies {
    *  Requires "bootstrap.servers" to be set
    * with Kafka broker(s) specified in host1:port1,host2:port2 form.
    */
-  @Experimental
   def Assign[K, V](
       topicPartitions: ju.Collection[TopicPartition],
       kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index e6bdef0..64b6ef6 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkContext
-import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -32,13 +31,10 @@ import org.apache.spark.streaming.api.java.{ 
JavaInputDStream, JavaStreamingCont
 import org.apache.spark.streaming.dstream._
 
 /**
- * :: Experimental ::
  * object for constructing Kafka streams and RDDs
  */
-@Experimental
 object KafkaUtils extends Logging {
   /**
-   * :: Experimental ::
    * Scala constructor for a batch-oriented interface for consuming from Kafka.
    * Starting and ending offsets are specified in advance,
    * so that you can control exactly-once semantics.
@@ -52,7 +48,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createRDD[K, V](
       sc: SparkContext,
       kafkaParams: ju.Map[String, Object],
@@ -75,7 +70,6 @@ object KafkaUtils extends Logging {
   }
 
   /**
-   * :: Experimental ::
    * Java constructor for a batch-oriented interface for consuming from Kafka.
    * Starting and ending offsets are specified in advance,
    * so that you can control exactly-once semantics.
@@ -89,7 +83,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createRDD[K, V](
       jsc: JavaSparkContext,
       kafkaParams: ju.Map[String, Object],
@@ -101,7 +94,6 @@ object KafkaUtils extends Logging {
   }
 
   /**
-   * :: Experimental ::
    * Scala constructor for a DStream where
    * each given Kafka topic/partition corresponds to an RDD partition.
    * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
@@ -114,7 +106,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createDirectStream[K, V](
       ssc: StreamingContext,
       locationStrategy: LocationStrategy,
@@ -125,7 +116,6 @@ object KafkaUtils extends Logging {
   }
 
   /**
-   * :: Experimental ::
    * Scala constructor for a DStream where
    * each given Kafka topic/partition corresponds to an RDD partition.
    * @param locationStrategy In most cases, pass in 
[[LocationStrategies.PreferConsistent]],
@@ -137,7 +127,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createDirectStream[K, V](
       ssc: StreamingContext,
       locationStrategy: LocationStrategy,
@@ -148,7 +137,6 @@ object KafkaUtils extends Logging {
   }
 
   /**
-   * :: Experimental ::
    * Java constructor for a DStream where
    * each given Kafka topic/partition corresponds to an RDD partition.
    * @param locationStrategy In most cases, pass in 
[[LocationStrategies.PreferConsistent]],
@@ -158,7 +146,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createDirectStream[K, V](
       jssc: JavaStreamingContext,
       locationStrategy: LocationStrategy,
@@ -170,7 +157,6 @@ object KafkaUtils extends Logging {
   }
 
   /**
-   * :: Experimental ::
    * Java constructor for a DStream where
    * each given Kafka topic/partition corresponds to an RDD partition.
    * @param locationStrategy In most cases, pass in 
[[LocationStrategies.PreferConsistent]],
@@ -182,7 +168,6 @@ object KafkaUtils extends Logging {
    * @tparam K type of Kafka message key
    * @tparam V type of Kafka message value
    */
-  @Experimental
   def createDirectStream[K, V](
       jssc: JavaStreamingContext,
       locationStrategy: LocationStrategy,

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
index c9a8a13..b4d9669 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala
@@ -23,18 +23,14 @@ import scala.collection.JavaConverters._
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.annotation.Experimental
-
 
 /**
- *  :: Experimental ::
  * Choice of how to schedule consumers for a given TopicPartition on an 
executor.
  * See [[LocationStrategies]] to obtain instances.
  * Kafka 0.10 consumers prefetch messages, so it's important for performance
  * to keep cached consumers on appropriate executors, not recreate them for 
every partition.
  * Choice of location is only a preference, not an absolute; partitions may be 
scheduled elsewhere.
  */
-@Experimental
 sealed abstract class LocationStrategy
 
 private case object PreferBrokers extends LocationStrategy
@@ -44,42 +40,32 @@ private case object PreferConsistent extends 
LocationStrategy
 private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) 
extends LocationStrategy
 
 /**
- * :: Experimental :: object to obtain instances of [[LocationStrategy]]
- *
+ * Object to obtain instances of [[LocationStrategy]]
  */
-@Experimental
 object LocationStrategies {
   /**
-   *  :: Experimental ::
    * Use this only if your executors are on the same nodes as your Kafka 
brokers.
    */
-  @Experimental
   def PreferBrokers: LocationStrategy =
     org.apache.spark.streaming.kafka010.PreferBrokers
 
   /**
-   *  :: Experimental ::
    * Use this in most cases, it will consistently distribute partitions across 
all executors.
    */
-  @Experimental
   def PreferConsistent: LocationStrategy =
     org.apache.spark.streaming.kafka010.PreferConsistent
 
   /**
-   *  :: Experimental ::
    * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
    * Any TopicPartition not specified in the map will use a consistent 
location.
    */
-  @Experimental
   def PreferFixed(hostMap: collection.Map[TopicPartition, String]): 
LocationStrategy =
     new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
 
   /**
-   *  :: Experimental ::
    * Use this to place particular TopicPartitions on particular hosts if your 
load is uneven.
    * Any TopicPartition not specified in the map will use a consistent 
location.
    */
-  @Experimental
   def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
     new PreferFixed(hostMap)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
index c66d3c9..077f02e 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala
@@ -20,8 +20,6 @@ package org.apache.spark.streaming.kafka010
 import org.apache.kafka.clients.consumer.OffsetCommitCallback
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.annotation.Experimental
-
 /**
  * Represents any object that has a collection of [[OffsetRange]]s. This can 
be used to access the
  * offset ranges in RDDs generated by the direct Kafka DStream (see
@@ -38,7 +36,6 @@ trait HasOffsetRanges {
 }
 
 /**
- *  :: Experimental ::
  * Represents any object that can commit a collection of [[OffsetRange]]s.
  * The direct Kafka DStream implements this interface (see
  * [[KafkaUtils.createDirectStream]]).
@@ -56,25 +53,20 @@ trait HasOffsetRanges {
  *   })
  * }}}
  */
-@Experimental
 trait CanCommitOffsets {
   /**
-   *  :: Experimental ::
    * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
    * This is only needed if you intend to store offsets in Kafka, instead of 
your own store.
    * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
    */
-  @Experimental
   def commitAsync(offsetRanges: Array[OffsetRange]): Unit
 
   /**
-   *  :: Experimental ::
    * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
    * This is only needed if you intend to store offsets in Kafka, instead of 
your own store.
    * @param offsetRanges The maximum untilOffset for a given partition will be 
used at commit.
    * @param callback Only the most recently provided callback will be used at 
commit.
    */
-  @Experimental
   def commitAsync(offsetRanges: Array[OffsetRange], callback: 
OffsetCommitCallback): Unit
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
index 4017fdb..77193e2 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
@@ -20,14 +20,11 @@ package org.apache.spark.streaming.kafka010
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.SparkConf
-import org.apache.spark.annotation.Experimental
 
 /**
- * :: Experimental ::
  * Interface for user-supplied configurations that can't otherwise be set via 
Spark properties,
  * because they need tweaking on a per-partition basis,
  */
-@Experimental
 abstract class PerPartitionConfig extends Serializable {
   /**
    *  Maximum rate (number of records per second) at which data will be read

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-8-assembly/pom.xml 
b/external/kafka-0-8-assembly/pom.xml
deleted file mode 100644
index 83edb11..0000000
--- a/external/kafka-0-8-assembly/pom.xml
+++ /dev/null
@@ -1,170 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId>
-  <packaging>jar</packaging>
-  <name>Spark Project External Kafka Assembly</name>
-  <url>http://spark.apache.org/</url>
-
-  <properties>
-    <sbt.project.name>streaming-kafka-0-8-assembly</sbt.project.name>
-  </properties>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <!--
-      Demote already included in the Spark assembly.
-    -->
-    <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.lz4</groupId>
-      <artifactId>lz4-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.avro</groupId>
-      <artifactId>avro-mapred</artifactId>
-      <classifier>${avro.mapred.classifier}</classifier>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-recipes</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>log4j</groupId>
-      <artifactId>log4j</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.xerial.snappy</groupId>
-      <artifactId>snappy-java</artifactId>
-      <scope>provided</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-  
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-  
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  <plugins>
-    <plugin>
-      <groupId>org.apache.maven.plugins</groupId>
-      <artifactId>maven-shade-plugin</artifactId>
-      <configuration>
-        <shadedArtifactAttached>false</shadedArtifactAttached>
-        <artifactSet>
-          <includes>
-            <include>*:*</include>
-          </includes>
-        </artifactSet>
-        <filters>
-          <filter>
-            <artifact>*:*</artifact>
-            <excludes>
-              <exclude>META-INF/*.SF</exclude>
-              <exclude>META-INF/*.DSA</exclude>
-              <exclude>META-INF/*.RSA</exclude>
-            </excludes>
-          </filter>
-        </filters>
-      </configuration>
-      <executions>
-        <execution>
-          <phase>package</phase>
-          <goals>
-            <goal>shade</goal>
-          </goals>
-          <configuration>
-            <transformers>
-              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-              <transformer 
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
-                <resource>reference.conf</resource>
-              </transformer>
-              <transformer 
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                <resource>log4j.properties</resource>
-              </transformer>
-              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
-              <transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-            </transformers>
-          </configuration>
-        </execution>
-      </executions>
-    </plugin>
-  </plugins>
-</build>
-</project>
-

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml
deleted file mode 100644
index 4545877..0000000
--- a/external/kafka-0-8/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.spark</groupId>
-    <artifactId>spark-parent_2.11</artifactId>
-    <version>3.0.0-SNAPSHOT</version>
-    <relativePath>../../pom.xml</relativePath>
-  </parent>
-
-  <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
-  <properties>
-    <sbt.project.name>streaming-kafka-0-8</sbt.project.name>
-  </properties>
-  <packaging>jar</packaging>
-  <name>Spark Integration for Kafka 0.8</name>
-  <url>http://spark.apache.org/</url>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${scala.binary.version}</artifactId>
-      <version>0.8.2.1</version>
-      <exclusions>
-        <exclusion>
-          <groupId>com.sun.jmx</groupId>
-          <artifactId>jmxri</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>com.sun.jdmk</groupId>
-          <artifactId>jmxtools</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>net.sf.jopt-simple</groupId>
-          <artifactId>jopt-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.slf4j</groupId>
-          <artifactId>slf4j-simple</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>net.sf.jopt-simple</groupId>
-      <artifactId>jopt-simple</artifactId>
-      <version>3.2</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.scalacheck</groupId>
-      <artifactId>scalacheck_${scala.binary.version}</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-    </dependency>
-
-    <!--
-      This spark-tags test-dep is needed even though it isn't used in this 
module, otherwise testing-cmds that exclude
-      them will yield errors.
-    -->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-tags_${scala.binary.version}</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
-
-  </dependencies>
-  <build>
-    
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
-    
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
-  </build>
-</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
deleted file mode 100644
index 89ccbe2..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * Represents the host and port info for a Kafka broker.
- * Differs from the Kafka project's internal kafka.cluster.Broker, which 
contains a server ID.
- */
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-final class Broker private(
-    /** Broker's hostname */
-    val host: String,
-    /** Broker's port */
-    val port: Int) extends Serializable {
-  override def equals(obj: Any): Boolean = obj match {
-    case that: Broker =>
-      this.host == that.host &&
-      this.port == that.port
-    case _ => false
-  }
-
-  override def hashCode: Int = {
-    41 * (41 + host.hashCode) + port
-  }
-
-  override def toString(): String = {
-    s"Broker($host, $port)"
-  }
-}
-
-/**
- * :: Experimental ::
- * Companion object that provides methods to create instances of [[Broker]].
- */
-@Experimental
-@deprecated("Update to Kafka 0.10 integration", "2.3.0")
-object Broker {
-  def create(host: String, port: Int): Broker =
-    new Broker(host, port)
-
-  def apply(host: String, port: Int): Broker =
-    new Broker(host, port)
-
-  def unapply(broker: Broker): Option[(String, Int)] = {
-    if (broker == null) {
-      None
-    } else {
-      Some((broker.host, broker.port))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
deleted file mode 100644
index 2ec771e..0000000
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-
-import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-
-/**
- *  A stream of [[KafkaRDD]] where
- * each given Kafka topic/partition corresponds to an RDD partition.
- * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the 
maximum number
- *  of messages
- * per second that each '''partition''' will accept.
- * Starting offsets are specified in advance,
- * and this DStream is not responsible for committing offsets,
- * so that you can control exactly-once semantics.
- * For an easy interface to Kafka-managed offsets,
- *  see [[KafkaCluster]]
- * @param kafkaParams Kafka <a 
href="http://kafka.apache.org/documentation.html#configuration";>
- * configuration parameters</a>.
- *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
- *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
- *  starting point of the stream
- * @param messageHandler function for translating each message into the 
desired type
- */
-private[streaming]
-class DirectKafkaInputDStream[
-  K: ClassTag,
-  V: ClassTag,
-  U <: Decoder[K]: ClassTag,
-  T <: Decoder[V]: ClassTag,
-  R: ClassTag](
-    _ssc: StreamingContext,
-    val kafkaParams: Map[String, String],
-    val fromOffsets: Map[TopicAndPartition, Long],
-    messageHandler: MessageAndMetadata[K, V] => R
-  ) extends InputDStream[R](_ssc) with Logging {
-  val maxRetries = context.sparkContext.getConf.getInt(
-    "spark.streaming.kafka.maxRetries", 1)
-
-  private[streaming] override def name: String = s"Kafka direct stream [$id]"
-
-  protected[streaming] override val checkpointData =
-    new DirectKafkaInputDStreamCheckpointData
-
-
-  /**
-   * Asynchronously maintains & sends new rate limits to the receiver through 
the receiver tracker.
-   */
-  override protected[streaming] val rateController: Option[RateController] = {
-    if (RateController.isBackPressureEnabled(ssc.conf)) {
-      Some(new DirectKafkaRateController(id,
-        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
-    } else {
-      None
-    }
-  }
-
-  protected val kc = new KafkaCluster(kafkaParams)
-
-  private val maxRateLimitPerPartition: Long = 
context.sparkContext.getConf.getLong(
-      "spark.streaming.kafka.maxRatePerPartition", 0)
-
-  private val initialRate = context.sparkContext.getConf.getLong(
-    "spark.streaming.backpressure.initialRate", 0)
-
-  protected[streaming] def maxMessagesPerPartition(
-      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, 
Long]] = {
-
-    val estimatedRateLimit = rateController.map { x => {
-      val lr = x.getLatestRate()
-      if (lr > 0) lr else initialRate
-    }}
-
-    // calculate a per-partition rate limit based on current lag
-    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) 
match {
-      case Some(rate) =>
-        val lagPerPartition = offsets.map { case (tp, offset) =>
-          tp -> Math.max(offset - currentOffsets(tp), 0)
-        }
-        val totalLag = lagPerPartition.values.sum
-
-        lagPerPartition.map { case (tp, lag) =>
-          val backpressureRate = lag / totalLag.toDouble * rate
-          tp -> (if (maxRateLimitPerPartition > 0) {
-            Math.min(backpressureRate, maxRateLimitPerPartition)} else 
backpressureRate)
-        }
-      case None => offsets.map { case (tp, offset) => tp -> 
maxRateLimitPerPartition.toDouble }
-    }
-
-    if (effectiveRateLimitPerPartition.values.sum > 0) {
-      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 
1000
-      Some(effectiveRateLimitPerPartition.map {
-        case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L)
-      })
-    } else {
-      None
-    }
-  }
-
-  protected var currentOffsets = fromOffsets
-
-  @tailrec
-  protected final def latestLeaderOffsets(retries: Int): 
Map[TopicAndPartition, LeaderOffset] = {
-    val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
-    // Either.fold would confuse @tailrec, do it manually
-    if (o.isLeft) {
-      val err = o.left.get.toString
-      if (retries <= 0) {
-        throw new SparkException(err)
-      } else {
-        logError(err)
-        Thread.sleep(kc.config.refreshLeaderBackoffMs)
-        latestLeaderOffsets(retries - 1)
-      }
-    } else {
-      o.right.get
-    }
-  }
-
-  // limits the maximum number of messages per partition
-  protected def clamp(
-    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): 
Map[TopicAndPartition, LeaderOffset] = {
-    val offsets = leaderOffsets.mapValues(lo => lo.offset)
-
-    maxMessagesPerPartition(offsets).map { mmp =>
-      mmp.map { case (tp, messages) =>
-        val lo = leaderOffsets(tp)
-        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, 
lo.offset))
-      }
-    }.getOrElse(leaderOffsets)
-  }
-
-  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
-    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
-    val rdd = KafkaRDD[K, V, U, T, R](
-      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, 
messageHandler)
-
-    // Report the record number and metadata of this batch interval to 
InputInfoTracker.
-    val offsetRanges = currentOffsets.map { case (tp, fo) =>
-      val uo = untilOffsets(tp)
-      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
-    }
-    val description = offsetRanges.filter { offsetRange =>
-      // Don't display empty ranges.
-      offsetRange.fromOffset != offsetRange.untilOffset
-    }.map { offsetRange =>
-      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
-        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
-    }.mkString("\n")
-    // Copy offsetRanges to immutable.List to prevent from being modified by 
the user
-    val metadata = Map(
-      "offsets" -> offsetRanges.toList,
-      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
-    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
-    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
-    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
-    Some(rdd)
-  }
-
-  override def start(): Unit = {
-  }
-
-  def stop(): Unit = {
-  }
-
-  private[streaming]
-  class DirectKafkaInputDStreamCheckpointData extends 
DStreamCheckpointData(this) {
-    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] 
= {
-      data.asInstanceOf[mutable.HashMap[Time, 
Array[OffsetRange.OffsetRangeTuple]]]
-    }
-
-    override def update(time: Time): Unit = {
-      batchForTime.clear()
-      generatedRDDs.foreach { kv =>
-        val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, 
R]].offsetRanges.map(_.toTuple).toArray
-        batchForTime += kv._1 -> a
-      }
-    }
-
-    override def cleanup(time: Time): Unit = { }
-
-    override def restore(): Unit = {
-      // this is assuming that the topics don't change during execution, which 
is true currently
-      val topics = fromOffsets.keySet
-      val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
-
-      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
-         logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", 
"]")}")
-         generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
-           context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, 
messageHandler)
-      }
-    }
-  }
-
-  /**
-   * A RateController to retrieve the rate from RateEstimator.
-   */
-  private[streaming] class DirectKafkaRateController(id: Int, estimator: 
RateEstimator)
-    extends RateController(id, estimator) {
-    override def publish(rate: Long): Unit = ()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to