Repository: spark
Updated Branches:
  refs/heads/master a333a72e0 -> b305e377f


[SPARK-8390] [STREAMING] [KAFKA] fix docs related to HasOffsetRanges

Author: cody koeninger <[email protected]>

Closes #6863 from koeninger/SPARK-8390 and squashes the following commits:

26a06bd [cody koeninger] Merge branch 'master' into SPARK-8390
3744492 [cody koeninger] [Streaming][Kafka][SPARK-8390] doc changes per TD, 
test to make sure approach shown in docs actually compiles + runs
b108c9d [cody koeninger] [Streaming][Kafka][SPARK-8390] further doc fixes, 
clean up spacing
bb4336b [cody koeninger] [Streaming][Kafka][SPARK-8390] fix docs related to 
HasOffsetRanges, cleanup
3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting 
offset ranges out of the existing java direct stream api


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b305e377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b305e377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b305e377

Branch: refs/heads/master
Commit: b305e377fb0a2ca67d9924b995c51e483a4944ad
Parents: a333a72
Author: cody koeninger <[email protected]>
Authored: Fri Jun 19 17:16:56 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Fri Jun 19 17:18:31 2015 -0700

----------------------------------------------------------------------
 docs/streaming-kafka-integration.md             | 70 ++++++++++++++------
 .../kafka/JavaDirectKafkaStreamSuite.java       | 11 ++-
 .../kafka/DirectKafkaStreamSuite.scala          | 16 +++--
 3 files changed, 71 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b305e377/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 02bc95d..775d508 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -2,7 +2,7 @@
 layout: global
 title: Spark Streaming + Kafka Integration Guide
 ---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Here 
we explain how to configure Spark Streaming to receive data from Kafka. There 
are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new experimental approach (introduced in Spark 1.3) 
without using Receivers. They have different programming models, performance 
characteristics, and semantics guarantees, so read on for more details.  
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service. Here we 
explain how to configure Spark Streaming to receive data from Kafka. There are 
two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new experimental approach (introduced in Spark 1.3) 
without using Receivers. They have different programming models, performance 
characteristics, and semantics guarantees, so read on for more details.
 
 ## Approach 1: Receiver-based Approach
 This approach uses a Receiver to receive the data. The Received is implemented 
using the Kafka high-level consumer API. As with all receivers, the data 
received from Kafka through a Receiver is stored in Spark executors, and then 
jobs launched by Spark Streaming processes the data. 
@@ -74,15 +74,15 @@ Next, we discuss how to use this approach in your streaming 
application.
        [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
 
 ## Approach 2: Direct Approach (No Receivers)
-This is a new receiver-less "direct" approach has been introduced in Spark 1.3 
to ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature in Spark 
1.3 and is only available in the Scala and Java API.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to 
ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature introduced 
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it 
is not yet at full feature parity.
 
-This approach has the following advantages over the received-based approach 
(i.e. Approach 1).
+This approach has the following advantages over the receiver-based approach 
(i.e. Approach 1).
 
-- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union-ing them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there is Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
+- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there are Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
 
-- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write Ahead Log. This second approach eliminate 
the problem as there is no receiver, and hence no need for Write Ahead Logs.
+- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write Ahead Log. This second approach 
eliminates the problem as there is no receiver, and hence no need for Write 
Ahead Logs. As long as you have sufficient Kafka retention, messages can be 
recovered from Kafka.
 
-- *Exactly-once semantics:* The first approach uses Kafka's high level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with write ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper and offsets tracked only by Spark Streaming within 
its checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures.
+- *Exactly-once semantics:* The first approach uses Kafka's high level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with write ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper. Offsets are tracked by Spark Streaming within its 
checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures. In order to achieve exactly-once semantics for 
output of your results, your output operation that saves the data to an 
external data store must be either idempotent, or an atomic transa
 ction that saves results and offsets (see [Semanitcs of output 
operations](streaming-programming-guide.html#semantics-of-output-operations) in 
the main programming guide for further information).
 
 Note that one disadvantage of this approach is that it does not update offsets 
in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show 
progress. However, you can access the offsets processed by this approach in 
each batch and update Zookeeper yourself (see below).
 
@@ -135,32 +135,60 @@ Next, we discuss how to use this approach in your 
streaming application.
 
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
-               directKafkaStream.foreachRDD { rdd => 
-                   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
-                   // offsetRanges.length = # of Kafka partitions being 
consumed
-                   ...
+               // Hold a reference to the current offset ranges, so it can be 
used downstream
+               var offsetRanges = Array[OffsetRange]()
+               
+               directKafkaStream.transform { rdd =>
+                 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+                 rdd
+               }.map {
+                  ...
+               }.foreachRDD { rdd =>
+                 for (o <- offsetRanges) {
+                   println(s"${o.topic} ${o.partition} ${o.fromOffset} 
${o.untilOffset}")
+                 }
+                 ...
                }
        </div>
        <div data-lang="java" markdown="1">
-               directKafkaStream.foreachRDD(
-                   new Function<JavaPairRDD<String, String>, Void>() {
-                       @Override
-                       public Void call(JavaPairRDD<String, Integer> rdd) 
throws IOException {
-                           OffsetRange[] offsetRanges = 
((HasOffsetRanges)rdd).offsetRanges
-                               // offsetRanges.length = # of Kafka partitions 
being consumed
-                           ...
-                           return null;
-                       }
+               // Hold a reference to the current offset ranges, so it can be 
used downstream
+               final AtomicReference<OffsetRange[]> offsetRanges = new 
AtomicReference();
+               
+               directKafkaStream.transformToPair(
+                 new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
+                   @Override
+                   public JavaPairRDD<String, String> call(JavaPairRDD<String, 
String> rdd) throws Exception {
+                     OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+                     offsetRanges.set(offsets);
+                     return rdd;
                    }
+                 }
+               ).map(
+                 ...
+               ).foreachRDD(
+                 new Function<JavaPairRDD<String, String>, Void>() {
+                   @Override
+                   public Void call(JavaPairRDD<String, String> rdd) throws 
IOException {
+                     for (OffsetRange o : offsetRanges.get()) {
+                       System.out.println(
+                         o.topic() + " " + o.partition() + " " + 
o.fromOffset() + " " + o.untilOffset()
+                       );
+                     }
+                     ...
+                     return null;
+                   }
+                 }
                );
        </div>
        <div data-lang="python" markdown="1">
-       Not supported
+               Not supported yet
        </div>
        </div>
 
        You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
 
-       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at 
which each Kafka partition will be read by this direct API. 
+       Note that the typecast to HasOffsetRanges will only succeed if it is 
done in the first method called on the directKafkaStream, not later down a 
chain of methods. You can use transform() instead of foreachRDD() as your first 
method call in order to access offsets, then call further Spark methods. 
However, be aware that the one-to-one mapping between RDD partition and Kafka 
partition does not remain after any methods that shuffle or repartition, e.g. 
reduceByKey() or window().
+
+       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate 
(in messages per second) at which each Kafka partition will be read by this 
direct API.
 
 3. **Deploying:** This is same as the first approach, for Scala, Java and 
Python.

http://git-wip-us.apache.org/repos/asf/spark/blob/b305e377/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 3913b71..02cd24a 100644
--- 
a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ 
b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.kafka;
 
 import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicReference;
 
 import scala.Tuple2;
 
@@ -68,6 +69,8 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
   public void testKafkaStream() throws InterruptedException {
     final String topic1 = "topic1";
     final String topic2 = "topic2";
+    // hold a reference to the current offset ranges, so it can be used 
downstream
+    final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference();
 
     String[] topic1data = createTopicAndSendData(topic1);
     String[] topic2data = createTopicAndSendData(topic2);
@@ -93,7 +96,8 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
         new Function<JavaPairRDD<String, String>, JavaPairRDD<String, 
String>>() {
           @Override
           public JavaPairRDD<String, String> call(JavaPairRDD<String, String> 
rdd) throws Exception {
-            OffsetRange[] offsets = 
((HasOffsetRanges)rdd.rdd()).offsetRanges();
+            OffsetRange[] offsets = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+            offsetRanges.set(offsets);
             Assert.assertEquals(offsets[0].topic(), topic1);
             return rdd;
           }
@@ -131,6 +135,11 @@ public class JavaDirectKafkaStreamSuite implements 
Serializable {
           @Override
           public Void call(JavaRDD<String> rdd) throws Exception {
             result.addAll(rdd.collect());
+            for (OffsetRange o : offsetRanges.get()) {
+              System.out.println(
+                o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + 
o.untilOffset()
+              );
+            }
             return null;
           }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/b305e377/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 212eb35..8e1715f 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -102,13 +102,21 @@ class DirectKafkaStreamSuite
     val allReceived =
       new ArrayBuffer[(String, String)] with 
mutable.SynchronizedBuffer[(String, String)]
 
-    stream.foreachRDD { rdd =>
-    // Get the offset ranges in the RDD
-      val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+    // hold a reference to the current offset ranges, so it can be used 
downstream
+    var offsetRanges = Array[OffsetRange]()
+
+    stream.transform { rdd =>
+      // Get the offset ranges in the RDD
+      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+      rdd
+    }.foreachRDD { rdd =>
+      for (o <- offsetRanges) {
+        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+      }
       val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
       // For each partition, get size of the range in the partition,
       // and the number of items in the partition
-        val off = offsets(i)
+        val off = offsetRanges(i)
         val all = iter.toSeq
         val partSize = all.size
         val rangeSize = off.untilOffset - off.fromOffset


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to