[SPARK-25705][BUILD][STREAMING][TEST-MAVEN] Remove Kafka 0.8 integration ## What changes were proposed in this pull request?
Remove Kafka 0.8 integration ## How was this patch tested? Existing tests, build scripts Closes #22703 from srowen/SPARK-25705. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/703e6da1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/703e6da1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/703e6da1 Branch: refs/heads/master Commit: 703e6da1ecb52ab5b8f42b3b4cac39f27caa51d8 Parents: 2c664ed Author: Sean Owen <[email protected]> Authored: Tue Oct 16 09:10:24 2018 -0500 Committer: Sean Owen <[email protected]> Committed: Tue Oct 16 09:10:24 2018 -0500 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkSubmit.scala | 6 +- dev/create-release/release-build.sh | 4 +- dev/mima | 2 +- dev/run-tests.py | 1 - dev/sbt-checkstyle | 1 - dev/scalastyle | 1 - dev/sparktestsupport/modules.py | 22 - dev/test-dependencies.sh | 2 +- docs/building-spark.md | 9 - docs/configuration.md | 12 +- docs/streaming-kafka-0-10-integration.md | 5 +- docs/streaming-kafka-0-8-integration.md | 196 ----- docs/streaming-kafka-integration.md | 53 +- docs/streaming-programming-guide.md | 8 +- docs/structured-streaming-programming-guide.md | 6 +- .../python/streaming/direct_kafka_wordcount.py | 56 -- .../main/python/streaming/kafka_wordcount.py | 56 -- .../streaming/kafka010/ConsumerStrategy.scala | 35 +- .../spark/streaming/kafka010/KafkaUtils.scala | 15 - .../streaming/kafka010/LocationStrategy.scala | 16 +- .../spark/streaming/kafka010/OffsetRange.scala | 8 - .../streaming/kafka010/PerPartitionConfig.scala | 3 - external/kafka-0-8-assembly/pom.xml | 170 ---- external/kafka-0-8/pom.xml | 109 --- .../apache/spark/streaming/kafka/Broker.scala | 68 -- .../kafka/DirectKafkaInputDStream.scala | 233 ------ .../spark/streaming/kafka/KafkaCluster.scala | 439 ---------- .../streaming/kafka/KafkaInputDStream.scala | 142 ---- .../apache/spark/streaming/kafka/KafkaRDD.scala | 273 ------- .../streaming/kafka/KafkaRDDPartition.scala | 42 - .../spark/streaming/kafka/KafkaTestUtils.scala | 299 ------- .../spark/streaming/kafka/KafkaUtils.scala | 806 ------------------- .../spark/streaming/kafka/OffsetRange.scala | 112 --- .../streaming/kafka/ReliableKafkaReceiver.scala | 302 ------- .../spark/streaming/kafka/package-info.java | 21 - .../apache/spark/streaming/kafka/package.scala | 23 - .../kafka/JavaDirectKafkaStreamSuite.java | 170 ---- .../streaming/kafka/JavaKafkaRDDSuite.java | 156 ---- .../streaming/kafka/JavaKafkaStreamSuite.java | 144 ---- .../src/test/resources/log4j.properties | 28 - .../kafka/DirectKafkaStreamSuite.scala | 636 --------------- .../streaming/kafka/KafkaClusterSuite.scala | 86 -- .../spark/streaming/kafka/KafkaRDDSuite.scala | 182 ----- .../streaming/kafka/KafkaStreamSuite.scala | 92 --- .../kafka/ReliableKafkaStreamSuite.scala | 153 ---- pom.xml | 8 - project/SparkBuild.scala | 12 +- python/docs/pyspark.streaming.rst | 7 - python/pyspark/streaming/dstream.py | 3 +- python/pyspark/streaming/kafka.py | 506 ------------ python/pyspark/streaming/tests.py | 287 +------ 51 files changed, 39 insertions(+), 5987 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 13fa6d0..88df732 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -992,9 +992,9 @@ private[spark] object SparkSubmitUtils { // Exposed for testing. // These components are used to make the default exclusion rules for Spark dependencies. - // We need to specify each component explicitly, otherwise we miss spark-streaming-kafka-0-8 and - // other spark-streaming utility components. Underscore is there to differentiate between - // spark-streaming_2.1x and spark-streaming-kafka-0-8-assembly_2.1x + // We need to specify each component explicitly, otherwise we miss + // spark-streaming utility components. Underscore is there to differentiate between + // spark-streaming_2.1x and spark-streaming-kafka-0-10-assembly_2.1x val IVY_DEFAULT_EXCLUDES = Seq("catalyst_", "core_", "graphx_", "kvstore_", "launcher_", "mllib_", "mllib-local_", "network-common_", "network-shuffle_", "repl_", "sketch_", "sql_", "streaming_", "tags_", "unsafe_") http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/create-release/release-build.sh ---------------------------------------------------------------------- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index b80f55d..a741a3b 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -115,7 +115,9 @@ SCALA_2_10_PROFILES="-Pscala-2.10" SCALA_2_11_PROFILES= if [[ $SPARK_VERSION > "2.3" ]]; then BASE_PROFILES="$BASE_PROFILES -Pkubernetes" - SCALA_2_11_PROFILES="-Pkafka-0-8" + if [[ $SPARK_VERSION < "3.0." ]]; then + SCALA_2_11_PROFILES="-Pkafka-0-8" + fi else PUBLISH_SCALA_2_10=1 fi http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/mima ---------------------------------------------------------------------- diff --git a/dev/mima b/dev/mima index a9ac8af..dc7b085 100755 --- a/dev/mima +++ b/dev/mima @@ -24,7 +24,7 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +SPARK_PROFILES="-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/run-tests.py ---------------------------------------------------------------------- diff --git a/dev/run-tests.py b/dev/run-tests.py index a125f5b..26045ee 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -332,7 +332,6 @@ def build_spark_sbt(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags sbt_goals = ["test:package", # Build test jars as some tests depend on them - "streaming-kafka-0-8-assembly/assembly", "streaming-kinesis-asl-assembly/assembly"] profiles_and_goals = build_profiles + sbt_goals http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/sbt-checkstyle ---------------------------------------------------------------------- diff --git a/dev/sbt-checkstyle b/dev/sbt-checkstyle index 1e825db..18f3bd8 100755 --- a/dev/sbt-checkstyle +++ b/dev/sbt-checkstyle @@ -23,7 +23,6 @@ ERRORS=$(echo -e "q\n" \ | build/sbt \ -Pkinesis-asl \ -Pmesos \ - -Pkafka-0-8 \ -Pkubernetes \ -Pyarn \ -Phive \ http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/scalastyle ---------------------------------------------------------------------- diff --git a/dev/scalastyle b/dev/scalastyle index 0448e1d..2d6ee0d 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -23,7 +23,6 @@ ERRORS=$(echo -e "q\n" \ | build/sbt \ -Pkinesis-asl \ -Pmesos \ - -Pkafka-0-8 \ -Pkubernetes \ -Pyarn \ -Phive \ http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index bd5f009..a63f9d8 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -252,24 +252,6 @@ streaming_kinesis_asl = Module( ) -streaming_kafka = Module( - name="streaming-kafka-0-8", - dependencies=[streaming], - source_file_regexes=[ - "external/kafka-0-8", - "external/kafka-0-8-assembly", - ], - build_profile_flags=[ - "-Pkafka-0-8", - ], - environ={ - "ENABLE_KAFKA_0_8_TESTS": "1" - }, - sbt_test_goals=[ - "streaming-kafka-0-8/test", - ] -) - streaming_kafka_0_10 = Module( name="streaming-kafka-0-10", dependencies=[streaming], @@ -374,15 +356,11 @@ pyspark_streaming = Module( dependencies=[ pyspark_core, streaming, - streaming_kafka, streaming_kinesis_asl ], source_file_regexes=[ "python/pyspark/streaming" ], - environ={ - "ENABLE_KAFKA_0_8_TESTS": "1" - }, python_test_goals=[ "pyspark.streaming.util", "pyspark.streaming.tests", http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/dev/test-dependencies.sh ---------------------------------------------------------------------- diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index cc8f5d3..63e01e1 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -29,7 +29,7 @@ export LC_ALL=C # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution. # NOTE: These should match those in the release publishing script -HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pkubernetes -Pyarn -Phive" +HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn -Phive" MVN="build/mvn" HADOOP_PROFILES=( hadoop-2.7 http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/building-spark.md ---------------------------------------------------------------------- diff --git a/docs/building-spark.md b/docs/building-spark.md index b2775d2..6bcc30d 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -89,15 +89,6 @@ like ZooKeeper and Hadoop itself. ## Building with Kubernetes support ./build/mvn -Pkubernetes -DskipTests clean package - -## Building with Kafka 0.8 support - -Kafka 0.8 support must be explicitly enabled with the `kafka-0-8` profile. -Note: Kafka 0.8 support is deprecated as of Spark 2.3.0. - - ./build/mvn -Pkafka-0-8 -DskipTests clean package - -Kafka 0.10 support is still automatically built. ## Building submodules individually http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 613e214..432b4cd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2017,7 +2017,7 @@ showDF(properties, numRows = 200, truncate = FALSE) <td> Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the new Kafka direct stream API. See the - <a href="streaming-kafka-integration.html">Kafka Integration guide</a> + <a href="streaming-kafka-0-10-integration.html">Kafka Integration guide</a> for more details. </td> </tr> @@ -2030,16 +2030,6 @@ showDF(properties, numRows = 200, truncate = FALSE) </td> </tr> <tr> - <td><code>spark.streaming.kafka.maxRetries</code></td> - <td>1</td> - <td> - Maximum number of consecutive retries the driver will make in order to find - the latest offsets on the leader of each partition (a default value of 1 - means that the driver will make a maximum of 2 attempts). Only applies to - the new Kafka direct stream API. - </td> -</tr> -<tr> <td><code>spark.streaming.ui.retainedBatches</code></td> <td>1000</td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-0-10-integration.md ---------------------------------------------------------------------- diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 386066a..c78459c 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -3,7 +3,10 @@ layout: global title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) --- -The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 [Direct Stream approach](streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers). It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the [new Kafka consumer API](http://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change. +The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka +partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses +the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, +there are notable differences in usage. ### Linking For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-0-8-integration.md ---------------------------------------------------------------------- diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md deleted file mode 100644 index becf217..0000000 --- a/docs/streaming-kafka-0-8-integration.md +++ /dev/null @@ -1,196 +0,0 @@ ---- -layout: global -title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher) ---- - -**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.** - -Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka's high-level API, and a new approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details. Both approaches are considered stable APIs as of the current version of Spark. - -## Approach 1: Receiver-based Approach -This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data. - -However, under default configuration, this approach can lose data under failures (see [receiver reliability](streaming-programming-guide.html#receiver-reliability). To ensure zero-data loss, you have to additionally enable Write-Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write-ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See [Deploying section](streaming-programming-guide.html#deploying-applications) in the streaming programming guide for more details on Write-Ahead Logs. - -Next, we discuss how to use this approach in your streaming application. - -1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). - - groupId = org.apache.spark - artifactId = spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} - - For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below. - -2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows. - - <div class="codetabs"> - <div data-lang="scala" markdown="1"> - import org.apache.spark.streaming.kafka._ - - val kafkaStream = KafkaUtils.createStream(streamingContext, - [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) - - You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$). - </div> - <div data-lang="java" markdown="1"> - import org.apache.spark.streaming.kafka.*; - - JavaPairReceiverInputDStream<String, String> kafkaStream = - KafkaUtils.createStream(streamingContext, - [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]); - - You can also specify the key and value classes and their corresponding decoder classes using variations of `createStream`. See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html). - - </div> - <div data-lang="python" markdown="1"> - from pyspark.streaming.kafka import KafkaUtils - - kafkaStream = KafkaUtils.createStream(streamingContext, \ - [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) - - By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils). - </div> - </div> - - **Points to remember:** - - - Topic partitions in Kafka do not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that. - - - Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers. - - - If you have enabled Write-Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use -`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`). - -3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications. - - For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). - - For Python applications which lack SBT/Maven project management, `spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is, - - ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ... - - Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kafka-0-8-assembly` from the - [Maven repository](https://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-0-8-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. - -## Approach 2: Direct Approach (No Receivers) -This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. - -This approach has the following advantages over the receiver-based approach (i.e. Approach 1). - -- *Simplified Parallelism:* No need to create multiple input Kafka streams and union them. With `directStream`, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune. - -- *Efficiency:* Achieving zero-data loss in the first approach required the data to be stored in a Write-Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write-Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write-Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka. - -- *Exactly-once semantics:* The first approach uses Kafka's high-level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with-write-ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transa ction that saves results and offsets (see [Semantics of output operations](streaming-programming-guide.html#semantics-of-output-operations) in the main programming guide for further information). - -Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below). - -Next, we discuss how to use this approach in your streaming application. - -1. **Linking:** This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information). - - groupId = org.apache.spark - artifactId = spark-streaming-kafka-0-8_{{site.SCALA_BINARY_VERSION}} - version = {{site.SPARK_VERSION_SHORT}} - -2. **Programming:** In the streaming application code, import `KafkaUtils` and create an input DStream as follows. - - <div class="codetabs"> - <div data-lang="scala" markdown="1"> - import org.apache.spark.streaming.kafka._ - - val directKafkaStream = KafkaUtils.createDirectStream[ - [key class], [value class], [key decoder class], [value decoder class] ]( - streamingContext, [map of Kafka parameters], [set of topics to consume]) - - You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type. - See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$). - </div> - <div data-lang="java" markdown="1"> - import org.apache.spark.streaming.kafka.*; - - JavaPairInputDStream<String, String> directKafkaStream = - KafkaUtils.createDirectStream(streamingContext, - [key class], [value class], [key decoder class], [value decoder class], - [map of Kafka parameters], [set of topics to consume]); - - You can also pass a `messageHandler` to `createDirectStream` to access `MessageAndMetadata` that contains metadata about the current message and transform it to any desired type. - See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html). - - </div> - <div data-lang="python" markdown="1"> - from pyspark.streaming.kafka import KafkaUtils - directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) - - You can also pass a `messageHandler` to `createDirectStream` to access `KafkaMessageAndMetadata` that contains metadata about the current message and transform it to any desired type. - By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils). - </div> - </div> - - In the Kafka parameters, you must specify either `metadata.broker.list` or `bootstrap.servers`. - By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration `auto.offset.reset` in Kafka parameters to `smallest`, then it will start consuming from the smallest offset. - - You can also start consuming from any arbitrary offset using other variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following. - - <div class="codetabs"> - <div data-lang="scala" markdown="1"> - // Hold a reference to the current offset ranges, so it can be used downstream - var offsetRanges = Array.empty[OffsetRange] - - directKafkaStream.transform { rdd => - offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges - rdd - }.map { - ... - }.foreachRDD { rdd => - for (o <- offsetRanges) { - println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") - } - ... - } - </div> - <div data-lang="java" markdown="1"> - // Hold a reference to the current offset ranges, so it can be used downstream - AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); - - directKafkaStream.transformToPair(rdd -> { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - return rdd; - }).map( - ... - ).foreachRDD(rdd -> { - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } - ... - }); - </div> - <div data-lang="python" markdown="1"> - offsetRanges = [] - - def storeOffsetRanges(rdd): - global offsetRanges - offsetRanges = rdd.offsetRanges() - return rdd - - def printOffsetRanges(rdd): - for o in offsetRanges: - print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset) - - directKafkaStream \ - .transform(storeOffsetRanges) \ - .foreachRDD(printOffsetRanges) - </div> - </div> - - You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application. - - Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window(). - - Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, [configurations](configuration.html) of the form `spark.streaming.receiver.*` ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the [configurations](configuration.html) `spark.streaming.kafka.*`. An important one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. - -3. **Deploying:** This is same as the first approach. http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-kafka-integration.md ---------------------------------------------------------------------- diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index 4aca391..0ec5a31 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -3,52 +3,9 @@ layout: global title: Spark Streaming + Kafka Integration Guide --- -[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Please read the [Kafka documentation](https://kafka.apache.org/documentation.html) thoroughly before starting an integration using Spark. +[Apache Kafka](https://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, +replicated commit log service. Please read the [Kafka documentation](https://kafka.apache.org/documentation.html) +thoroughly before starting an integration using Spark. -The Kafka project introduced a new consumer API between versions 0.8 and 0.10, so there are 2 separate corresponding Spark Streaming packages available. Please choose the correct package for your brokers and desired features; note that the 0.8 integration is compatible with later 0.9 and 0.10 brokers, but the 0.10 integration is not compatible with earlier brokers. - -**Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.** - -<table class="table"> -<tr><th></th><th><a href="streaming-kafka-0-8-integration.html">spark-streaming-kafka-0-8</a></th><th><a href="streaming-kafka-0-10-integration.html">spark-streaming-kafka-0-10</a></th></tr> -<tr> - <td>Broker Version</td> - <td>0.8.2.1 or higher</td> - <td>0.10.0 or higher</td> -</tr> -<tr> - <td>API Maturity</td> - <td>Deprecated</td> - <td>Stable</td> -</tr> -<tr> - <td>Language Support</td> - <td>Scala, Java, Python</td> - <td>Scala, Java</td> -</tr> -<tr> - <td>Receiver DStream</td> - <td>Yes</td> - <td>No</td> -</tr> -<tr> - <td>Direct DStream</td> - <td>Yes</td> - <td>Yes</td> -</tr> -<tr> - <td>SSL / TLS Support</td> - <td>No</td> - <td>Yes</td> -</tr> -<tr> - <td>Offset Commit API</td> - <td>No</td> - <td>Yes</td> -</tr> -<tr> - <td>Dynamic Topic Subscription</td> - <td>No</td> - <td>Yes</td> -</tr> -</table> +At the moment, Spark requires Kafka 0.10 and higher. See +<a href="streaming-kafka-0-10-integration.html">Kafka 0.10 integration documentation</a> for details. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1103d5c..70bee50 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -745,7 +745,7 @@ and add it to the classpath. Some of these advanced sources are as follows. -- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka broker versions 0.8.2.1 or higher. See the [Kafka Integration Guide](streaming-kafka-integration.html) for more details. +- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka broker versions 0.10 or higher. See the [Kafka Integration Guide](streaming-kafka-0-10-integration.html) for more details. - **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kinesis Client Library 1.2.1. See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details. @@ -2172,7 +2172,7 @@ the input data stream (using `inputStream.repartition(<number of partitions>)`). This distributes the received batches of data across the specified number of machines in the cluster before further processing. -For direct stream, please refer to [Spark Streaming + Kafka Integration Guide](streaming-kafka-integration.html) +For direct stream, please refer to [Spark Streaming + Kafka Integration Guide](streaming-kafka-0-10-integration.html) ### Level of Parallelism in Data Processing {:.no_toc} @@ -2433,7 +2433,7 @@ The following table summarizes the semantics under failures: ### With Kafka Direct API {:.no_toc} -In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach is further discussed in the [Kafka Integration Guide](streaming-kafka-integration.html). +In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach is further discussed in the [Kafka Integration Guide](streaming-kafka-0-10-integration.html). ## Semantics of output operations {:.no_toc} @@ -2463,7 +2463,7 @@ additional effort may be necessary to achieve exactly-once semantics. There are # Where to Go from Here * Additional guides - - [Kafka Integration Guide](streaming-kafka-integration.html) + - [Kafka Integration Guide](streaming-kafka-0-10-integration.html) - [Kinesis Integration Guide](streaming-kinesis-integration.html) - [Custom Receiver Guide](streaming-custom-receivers.html) * Third-party DStream data sources can be found in [Third Party Projects](https://spark.apache.org/third-party-projects.html) http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/docs/structured-streaming-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b6e4277..1cedbb2 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -497,7 +497,7 @@ There are a few built-in sources. - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, orc, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations. - - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details. + - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-0-10-integration.html) for more details. - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. @@ -566,7 +566,7 @@ Here are the details of all the sources in Spark. <tr> <td><b>Kafka Source</b></td> <td> - See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>. + See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a>. </td> <td>Yes</td> <td></td> @@ -1819,7 +1819,7 @@ Here are the details of all the sinks in Spark. <tr> <td><b>Kafka Sink</b></td> <td>Append, Update, Complete</td> - <td>See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td> + <td>See the <a href="structured-streaming-kafka-0-10-integration.html">Kafka Integration Guide</a></td> <td>Yes (at-least-once)</td> <td>More details in the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td> </tr> http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/examples/src/main/python/streaming/direct_kafka_wordcount.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py b/examples/src/main/python/streaming/direct_kafka_wordcount.py deleted file mode 100644 index c5c186c..0000000 --- a/examples/src/main/python/streaming/direct_kafka_wordcount.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -r""" - Counts words in UTF8 encoded, '\n' delimited text directly received from Kafka in every 2 seconds. - Usage: direct_kafka_wordcount.py <broker_list> <topic> - - To run this on your local machine, you need to setup Kafka and create a producer first, see - http://kafka.apache.org/documentation.html#quickstart - - and then run the example - `$ bin/spark-submit --jars \ - external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \ - examples/src/main/python/streaming/direct_kafka_wordcount.py \ - localhost:9092 test` -""" -from __future__ import print_function - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.kafka import KafkaUtils - -if __name__ == "__main__": - if len(sys.argv) != 3: - print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr) - sys.exit(-1) - - sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") - ssc = StreamingContext(sc, 2) - - brokers, topic = sys.argv[1:] - kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) - lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/examples/src/main/python/streaming/kafka_wordcount.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py deleted file mode 100644 index e9ee08b..0000000 --- a/examples/src/main/python/streaming/kafka_wordcount.py +++ /dev/null @@ -1,56 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -r""" - Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - Usage: kafka_wordcount.py <zk> <topic> - - To run this on your local machine, you need to setup Kafka and create a producer first, see - http://kafka.apache.org/documentation.html#quickstart - - and then run the example - `$ bin/spark-submit --jars \ - external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar \ - examples/src/main/python/streaming/kafka_wordcount.py \ - localhost:2181 test` -""" -from __future__ import print_function - -import sys - -from pyspark import SparkContext -from pyspark.streaming import StreamingContext -from pyspark.streaming.kafka import KafkaUtils - -if __name__ == "__main__": - if len(sys.argv) != 3: - print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr) - sys.exit(-1) - - sc = SparkContext(appName="PythonStreamingKafkaWordCount") - ssc = StreamingContext(sc, 1) - - zkQuorum, topic = sys.argv[1:] - kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) - lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) - counts.pprint() - - ssc.start() - ssc.awaitTermination() http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index d2100fc..cf283a5 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -26,11 +26,9 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition -import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging /** - * :: Experimental :: * Choice of how to create and configure underlying Kafka Consumers on driver and executors. * See [[ConsumerStrategies]] to obtain instances. * Kafka 0.10 consumers can require additional, sometimes complex, setup after object @@ -38,7 +36,6 @@ import org.apache.spark.internal.Logging * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ -@Experimental abstract class ConsumerStrategy[K, V] { /** * Kafka <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs"> @@ -208,13 +205,10 @@ private case class Assign[K, V]( } /** - * :: Experimental :: - * object for obtaining instances of [[ConsumerStrategy]] + * Object for obtaining instances of [[ConsumerStrategy]] */ -@Experimental object ConsumerStrategies { /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka @@ -227,7 +221,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def Subscribe[K, V]( topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object], @@ -239,7 +232,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka @@ -249,7 +241,6 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def Subscribe[K, V]( topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { @@ -260,7 +251,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka @@ -273,7 +263,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def Subscribe[K, V]( topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], @@ -282,7 +271,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Subscribe to a collection of topics. * @param topics collection of topics to subscribe * @param kafkaParams Kafka @@ -292,14 +280,13 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def Subscribe[K, V]( topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { new Subscribe[K, V](topics, kafkaParams, ju.Collections.emptyMap[TopicPartition, jl.Long]()) } - /** :: Experimental :: + /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. * @param pattern pattern to subscribe to @@ -313,7 +300,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: collection.Map[String, Object], @@ -324,7 +310,7 @@ object ConsumerStrategies { new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } - /** :: Experimental :: + /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. * @param pattern pattern to subscribe to @@ -335,7 +321,6 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { @@ -345,7 +330,7 @@ object ConsumerStrategies { ju.Collections.emptyMap[TopicPartition, jl.Long]()) } - /** :: Experimental :: + /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. * @param pattern pattern to subscribe to @@ -359,7 +344,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: ju.Map[String, Object], @@ -367,7 +351,7 @@ object ConsumerStrategies { new SubscribePattern[K, V](pattern, kafkaParams, offsets) } - /** :: Experimental :: + /** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. * The pattern matching will be done periodically against topics existing at the time of check. * @param pattern pattern to subscribe to @@ -378,7 +362,6 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { @@ -389,7 +372,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka @@ -402,7 +384,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], @@ -414,7 +395,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka @@ -424,7 +404,6 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object]): ConsumerStrategy[K, V] = { @@ -435,7 +414,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka @@ -448,7 +426,6 @@ object ConsumerStrategies { * TopicPartition, the committed offset (if applicable) or kafka param * auto.offset.reset will be used. */ - @Experimental def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object], @@ -457,7 +434,6 @@ object ConsumerStrategies { } /** - * :: Experimental :: * Assign a fixed collection of TopicPartitions * @param topicPartitions collection of TopicPartitions to assign * @param kafkaParams Kafka @@ -467,7 +443,6 @@ object ConsumerStrategies { * Requires "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. */ - @Experimental def Assign[K, V]( topicPartitions: ju.Collection[TopicPartition], kafkaParams: ju.Map[String, Object]): ConsumerStrategy[K, V] = { http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index e6bdef0..64b6ef6 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -23,7 +23,6 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkContext -import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext } import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD @@ -32,13 +31,10 @@ import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingCont import org.apache.spark.streaming.dstream._ /** - * :: Experimental :: * object for constructing Kafka streams and RDDs */ -@Experimental object KafkaUtils extends Logging { /** - * :: Experimental :: * Scala constructor for a batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. @@ -52,7 +48,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createRDD[K, V]( sc: SparkContext, kafkaParams: ju.Map[String, Object], @@ -75,7 +70,6 @@ object KafkaUtils extends Logging { } /** - * :: Experimental :: * Java constructor for a batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. @@ -89,7 +83,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createRDD[K, V]( jsc: JavaSparkContext, kafkaParams: ju.Map[String, Object], @@ -101,7 +94,6 @@ object KafkaUtils extends Logging { } /** - * :: Experimental :: * Scala constructor for a DStream where * each given Kafka topic/partition corresponds to an RDD partition. * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number @@ -114,7 +106,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createDirectStream[K, V]( ssc: StreamingContext, locationStrategy: LocationStrategy, @@ -125,7 +116,6 @@ object KafkaUtils extends Logging { } /** - * :: Experimental :: * Scala constructor for a DStream where * each given Kafka topic/partition corresponds to an RDD partition. * @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]], @@ -137,7 +127,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createDirectStream[K, V]( ssc: StreamingContext, locationStrategy: LocationStrategy, @@ -148,7 +137,6 @@ object KafkaUtils extends Logging { } /** - * :: Experimental :: * Java constructor for a DStream where * each given Kafka topic/partition corresponds to an RDD partition. * @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]], @@ -158,7 +146,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createDirectStream[K, V]( jssc: JavaStreamingContext, locationStrategy: LocationStrategy, @@ -170,7 +157,6 @@ object KafkaUtils extends Logging { } /** - * :: Experimental :: * Java constructor for a DStream where * each given Kafka topic/partition corresponds to an RDD partition. * @param locationStrategy In most cases, pass in [[LocationStrategies.PreferConsistent]], @@ -182,7 +168,6 @@ object KafkaUtils extends Logging { * @tparam K type of Kafka message key * @tparam V type of Kafka message value */ - @Experimental def createDirectStream[K, V]( jssc: JavaStreamingContext, locationStrategy: LocationStrategy, http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala index c9a8a13..b4d9669 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/LocationStrategy.scala @@ -23,18 +23,14 @@ import scala.collection.JavaConverters._ import org.apache.kafka.common.TopicPartition -import org.apache.spark.annotation.Experimental - /** - * :: Experimental :: * Choice of how to schedule consumers for a given TopicPartition on an executor. * See [[LocationStrategies]] to obtain instances. * Kafka 0.10 consumers prefetch messages, so it's important for performance * to keep cached consumers on appropriate executors, not recreate them for every partition. * Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere. */ -@Experimental sealed abstract class LocationStrategy private case object PreferBrokers extends LocationStrategy @@ -44,42 +40,32 @@ private case object PreferConsistent extends LocationStrategy private case class PreferFixed(hostMap: ju.Map[TopicPartition, String]) extends LocationStrategy /** - * :: Experimental :: object to obtain instances of [[LocationStrategy]] - * + * Object to obtain instances of [[LocationStrategy]] */ -@Experimental object LocationStrategies { /** - * :: Experimental :: * Use this only if your executors are on the same nodes as your Kafka brokers. */ - @Experimental def PreferBrokers: LocationStrategy = org.apache.spark.streaming.kafka010.PreferBrokers /** - * :: Experimental :: * Use this in most cases, it will consistently distribute partitions across all executors. */ - @Experimental def PreferConsistent: LocationStrategy = org.apache.spark.streaming.kafka010.PreferConsistent /** - * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. */ - @Experimental def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava)) /** - * :: Experimental :: * Use this to place particular TopicPartitions on particular hosts if your load is uneven. * Any TopicPartition not specified in the map will use a consistent location. */ - @Experimental def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(hostMap) } http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala index c66d3c9..077f02e 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/OffsetRange.scala @@ -20,8 +20,6 @@ package org.apache.spark.streaming.kafka010 import org.apache.kafka.clients.consumer.OffsetCommitCallback import org.apache.kafka.common.TopicPartition -import org.apache.spark.annotation.Experimental - /** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see @@ -38,7 +36,6 @@ trait HasOffsetRanges { } /** - * :: Experimental :: * Represents any object that can commit a collection of [[OffsetRange]]s. * The direct Kafka DStream implements this interface (see * [[KafkaUtils.createDirectStream]]). @@ -56,25 +53,20 @@ trait HasOffsetRanges { * }) * }}} */ -@Experimental trait CanCommitOffsets { /** - * :: Experimental :: * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. * This is only needed if you intend to store offsets in Kafka, instead of your own store. * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. */ - @Experimental def commitAsync(offsetRanges: Array[OffsetRange]): Unit /** - * :: Experimental :: * Queue up offset ranges for commit to Kafka at a future time. Threadsafe. * This is only needed if you intend to store offsets in Kafka, instead of your own store. * @param offsetRanges The maximum untilOffset for a given partition will be used at commit. * @param callback Only the most recently provided callback will be used at commit. */ - @Experimental def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit } http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala index 4017fdb..77193e2 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala @@ -20,14 +20,11 @@ package org.apache.spark.streaming.kafka010 import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf -import org.apache.spark.annotation.Experimental /** - * :: Experimental :: * Interface for user-supplied configurations that can't otherwise be set via Spark properties, * because they need tweaking on a per-partition basis, */ -@Experimental abstract class PerPartitionConfig extends Serializable { /** * Maximum rate (number of records per second) at which data will be read http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml deleted file mode 100644 index 83edb11..0000000 --- a/external/kafka-0-8-assembly/pom.xml +++ /dev/null @@ -1,170 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>3.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>spark-streaming-kafka-0-8-assembly_2.11</artifactId> - <packaging>jar</packaging> - <name>Spark Project External Kafka Assembly</name> - <url>http://spark.apache.org/</url> - - <properties> - <sbt.project.name>streaming-kafka-0-8-assembly</sbt.project.name> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <!-- - Demote already included in the Spark assembly. - --> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.lz4</groupId> - <artifactId>lz4-java</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <classifier>${avro.mapred.classifier}</classifier> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <configuration> - <shadedArtifactAttached>false</shadedArtifactAttached> - <artifactSet> - <includes> - <include>*:*</include> - </includes> - </artifactSet> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - </excludes> - </filter> - </filters> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> - <resource>reference.conf</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> - <resource>log4j.properties</resource> - </transformer> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/> - <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> -</build> -</project> - http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/pom.xml b/external/kafka-0-8/pom.xml deleted file mode 100644 index 4545877..0000000 --- a/external/kafka-0-8/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.spark</groupId> - <artifactId>spark-parent_2.11</artifactId> - <version>3.0.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> - <properties> - <sbt.project.name>streaming-kafka-0-8</sbt.project.name> - </properties> - <packaging>jar</packaging> - <name>Spark Integration for Kafka 0.8</name> - <url>http://spark.apache.org/</url> - - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.2.1</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>net.sf.jopt-simple</groupId> - <artifactId>jopt-simple</artifactId> - <version>3.2</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_${scala.binary.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-tags_${scala.binary.version}</artifactId> - </dependency> - - <!-- - This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude - them will yield errors. - --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-tags_${scala.binary.version}</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - </dependencies> - <build> - <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> - </build> -</project> http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala deleted file mode 100644 index 89ccbe2..0000000 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import org.apache.spark.annotation.Experimental - -/** - * Represents the host and port info for a Kafka broker. - * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID. - */ -@deprecated("Update to Kafka 0.10 integration", "2.3.0") -final class Broker private( - /** Broker's hostname */ - val host: String, - /** Broker's port */ - val port: Int) extends Serializable { - override def equals(obj: Any): Boolean = obj match { - case that: Broker => - this.host == that.host && - this.port == that.port - case _ => false - } - - override def hashCode: Int = { - 41 * (41 + host.hashCode) + port - } - - override def toString(): String = { - s"Broker($host, $port)" - } -} - -/** - * :: Experimental :: - * Companion object that provides methods to create instances of [[Broker]]. - */ -@Experimental -@deprecated("Update to Kafka 0.10 integration", "2.3.0") -object Broker { - def create(host: String, port: Int): Broker = - new Broker(host, port) - - def apply(host: String, port: Int): Broker = - new Broker(host, port) - - def unapply(broker: Broker): Option[(String, Int)] = { - if (broker == null) { - None - } else { - Some((broker.host, broker.port)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/703e6da1/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala deleted file mode 100644 index 2ec771e..0000000 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import scala.annotation.tailrec -import scala.collection.mutable -import scala.reflect.ClassTag - -import kafka.common.TopicAndPartition -import kafka.message.MessageAndMetadata -import kafka.serializer.Decoder - -import org.apache.spark.SparkException -import org.apache.spark.internal.Logging -import org.apache.spark.streaming.{StreamingContext, Time} -import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset -import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} -import org.apache.spark.streaming.scheduler.rate.RateEstimator - -/** - * A stream of [[KafkaRDD]] where - * each given Kafka topic/partition corresponds to an RDD partition. - * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number - * of messages - * per second that each '''partition''' will accept. - * Starting offsets are specified in advance, - * and this DStream is not responsible for committing offsets, - * so that you can control exactly-once semantics. - * For an easy interface to Kafka-managed offsets, - * see [[KafkaCluster]] - * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> - * configuration parameters</a>. - * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), - * NOT zookeeper servers, specified in host1:port1,host2:port2 form. - * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) - * starting point of the stream - * @param messageHandler function for translating each message into the desired type - */ -private[streaming] -class DirectKafkaInputDStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[K]: ClassTag, - T <: Decoder[V]: ClassTag, - R: ClassTag]( - _ssc: StreamingContext, - val kafkaParams: Map[String, String], - val fromOffsets: Map[TopicAndPartition, Long], - messageHandler: MessageAndMetadata[K, V] => R - ) extends InputDStream[R](_ssc) with Logging { - val maxRetries = context.sparkContext.getConf.getInt( - "spark.streaming.kafka.maxRetries", 1) - - private[streaming] override def name: String = s"Kafka direct stream [$id]" - - protected[streaming] override val checkpointData = - new DirectKafkaInputDStreamCheckpointData - - - /** - * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker. - */ - override protected[streaming] val rateController: Option[RateController] = { - if (RateController.isBackPressureEnabled(ssc.conf)) { - Some(new DirectKafkaRateController(id, - RateEstimator.create(ssc.conf, context.graph.batchDuration))) - } else { - None - } - } - - protected val kc = new KafkaCluster(kafkaParams) - - private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( - "spark.streaming.kafka.maxRatePerPartition", 0) - - private val initialRate = context.sparkContext.getConf.getLong( - "spark.streaming.backpressure.initialRate", 0) - - protected[streaming] def maxMessagesPerPartition( - offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { - - val estimatedRateLimit = rateController.map { x => { - val lr = x.getLatestRate() - if (lr > 0) lr else initialRate - }} - - // calculate a per-partition rate limit based on current lag - val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { - case Some(rate) => - val lagPerPartition = offsets.map { case (tp, offset) => - tp -> Math.max(offset - currentOffsets(tp), 0) - } - val totalLag = lagPerPartition.values.sum - - lagPerPartition.map { case (tp, lag) => - val backpressureRate = lag / totalLag.toDouble * rate - tp -> (if (maxRateLimitPerPartition > 0) { - Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) - } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition.toDouble } - } - - if (effectiveRateLimitPerPartition.values.sum > 0) { - val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 - Some(effectiveRateLimitPerPartition.map { - case (tp, limit) => tp -> Math.max((secsPerBatch * limit).toLong, 1L) - }) - } else { - None - } - } - - protected var currentOffsets = fromOffsets - - @tailrec - protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { - val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) - // Either.fold would confuse @tailrec, do it manually - if (o.isLeft) { - val err = o.left.get.toString - if (retries <= 0) { - throw new SparkException(err) - } else { - logError(err) - Thread.sleep(kc.config.refreshLeaderBackoffMs) - latestLeaderOffsets(retries - 1) - } - } else { - o.right.get - } - } - - // limits the maximum number of messages per partition - protected def clamp( - leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { - val offsets = leaderOffsets.mapValues(lo => lo.offset) - - maxMessagesPerPartition(offsets).map { mmp => - mmp.map { case (tp, messages) => - val lo = leaderOffsets(tp) - tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset)) - } - }.getOrElse(leaderOffsets) - } - - override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { - val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) - val rdd = KafkaRDD[K, V, U, T, R]( - context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) - - // Report the record number and metadata of this batch interval to InputInfoTracker. - val offsetRanges = currentOffsets.map { case (tp, fo) => - val uo = untilOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo.offset) - } - val description = offsetRanges.filter { offsetRange => - // Don't display empty ranges. - offsetRange.fromOffset != offsetRange.untilOffset - }.map { offsetRange => - s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + - s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" - }.mkString("\n") - // Copy offsetRanges to immutable.List to prevent from being modified by the user - val metadata = Map( - "offsets" -> offsetRanges.toList, - StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) - val inputInfo = StreamInputInfo(id, rdd.count, metadata) - ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) - - currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) - Some(rdd) - } - - override def start(): Unit = { - } - - def stop(): Unit = { - } - - private[streaming] - class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { - def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { - data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] - } - - override def update(time: Time): Unit = { - batchForTime.clear() - generatedRDDs.foreach { kv => - val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray - batchForTime += kv._1 -> a - } - } - - override def cleanup(time: Time): Unit = { } - - override def restore(): Unit = { - // this is assuming that the topics don't change during execution, which is true currently - val topics = fromOffsets.keySet - val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics)) - - batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => - logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") - generatedRDDs += t -> new KafkaRDD[K, V, U, T, R]( - context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler) - } - } - } - - /** - * A RateController to retrieve the rate from RateEstimator. - */ - private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator) - extends RateController(id, estimator) { - override def publish(rate: Long): Unit = () - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
