Repository: spark
Updated Branches:
  refs/heads/master d2d438d1d -> 505b927cb


[SPARK-16312][FOLLOW-UP][STREAMING][KAFKA][DOC] Add java code snippet for Kafka 
0.10 integration doc

## What changes were proposed in this pull request?

added java code snippet for Kafka 0.10 integration doc

## How was this patch tested?

SKIP_API=1 jekyll build

## Screenshot

![kafka-doc](https://cloud.githubusercontent.com/assets/15843379/19826272/bf0d8a4c-9db8-11e6-9e40-1396723df4bc.png)

Author: Liwei Lin <[email protected]>

Closes #15679 from lw-lin/kafka-010-examples.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/505b927c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/505b927c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/505b927c

Branch: refs/heads/master
Commit: 505b927cb7ff037adb797b9c3b9ecac3f885b7c8
Parents: d2d438d
Author: Liwei Lin <[email protected]>
Authored: Sun Oct 30 09:32:19 2016 +0000
Committer: Sean Owen <[email protected]>
Committed: Sun Oct 30 09:32:19 2016 +0000

----------------------------------------------------------------------
 docs/streaming-kafka-0-10-integration.md | 133 +++++++++++++++++++++++---
 1 file changed, 122 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/505b927c/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md 
b/docs/streaming-kafka-0-10-integration.md
index de95ea9..c1ef396 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -8,9 +8,9 @@ The Spark Streaming integration for Kafka 0.10 is similar in 
design to the 0.8 [
 ### Linking
 For Scala/Java applications using SBT/Maven project definitions, link your 
streaming application with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
 
-               groupId = org.apache.spark
-               artifactId = 
spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
-               version = {{site.SPARK_VERSION_SHORT}}
+       groupId = org.apache.spark
+       artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
+       version = {{site.SPARK_VERSION_SHORT}}
 
 ### Creating a Direct Stream
  Note that the namespace for the import includes the version, 
org.apache.spark.streaming.kafka010
@@ -44,6 +44,42 @@ For Scala/Java applications using SBT/Maven project 
definitions, link your strea
 Each item in the stream is a 
[ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
 </div>
 <div data-lang="java" markdown="1">
+       import java.util.*;
+       import org.apache.spark.SparkConf;
+       import org.apache.spark.TaskContext;
+       import org.apache.spark.api.java.*;
+       import org.apache.spark.api.java.function.*;
+       import org.apache.spark.streaming.api.java.*;
+       import org.apache.spark.streaming.kafka010.*;
+       import org.apache.kafka.clients.consumer.ConsumerRecord;
+       import org.apache.kafka.common.TopicPartition;
+       import org.apache.kafka.common.serialization.StringDeserializer;
+       import scala.Tuple2;
+       
+       Map<String, Object> kafkaParams = new HashMap<>();
+       kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
+       kafkaParams.put("key.deserializer", StringDeserializer.class);
+       kafkaParams.put("value.deserializer", StringDeserializer.class);
+       kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
+       kafkaParams.put("auto.offset.reset", "latest");
+       kafkaParams.put("enable.auto.commit", false);
+       
+       Collection<String> topics = Arrays.asList("topicA", "topicB");
+       
+       final JavaInputDStream<ConsumerRecord<String, String>> stream =
+         KafkaUtils.createDirectStream(
+           streamingContext,
+           LocationStrategies.PreferConsistent(),
+           ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
+         );
+       
+       stream.mapToPair(
+         new PairFunction<ConsumerRecord<String, String>, String, String>() {
+           @Override
+           public Tuple2<String, String> call(ConsumerRecord<String, String> 
record) {
+             return new Tuple2<>(record.key(), record.value());
+           }
+         })
 </div>
 </div>
 
@@ -85,6 +121,20 @@ If you have a use case that is better suited to batch 
processing, you can create
 
 </div>
 <div data-lang="java" markdown="1">
+       // Import dependencies and create kafka params as in Create Direct 
Stream above
+
+       OffsetRange[] offsetRanges = {
+         // topic, partition, inclusive starting offset, exclusive ending 
offset
+         OffsetRange.create("test", 0, 0, 100),
+         OffsetRange.create("test", 1, 0, 100)
+       };
+
+       JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
+         sparkContext,
+         kafkaParams,
+         offsetRanges,
+         LocationStrategies.PreferConsistent()
+       );
 </div>
 </div>
 
@@ -103,6 +153,20 @@ Note that you cannot use `PreferBrokers`, because without 
the stream there is no
        }
 </div>
 <div data-lang="java" markdown="1">
+       stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, 
String>>>() {
+         @Override
+         public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+           final OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+           rdd.foreachPartition(new 
VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
+             @Override
+             public void call(Iterator<ConsumerRecord<String, String>> 
consumerRecords) {
+               OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+               System.out.println(
+                 o.topic() + " " + o.partition() + " " + o.fromOffset() + " " 
+ o.untilOffset());
+             }
+           });
+         }
+       });
 </div>
 </div>
 
@@ -120,15 +184,24 @@ Kafka has an offset commit API that stores offsets in a 
special Kafka topic.  By
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
        stream.foreachRDD { rdd =>
-         val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+         val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
          // some time later, after outputs have completed
-         stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
+         stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        }
 
 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if 
called on the result of createDirectStream, not after transformations.  The 
commitAsync call is threadsafe, but must occur after outputs if you want 
meaningful semantics.
 </div>
 <div data-lang="java" markdown="1">
+       stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, 
String>>>() {
+         @Override
+         public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+           OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+
+           // some time later, after outputs have completed
+           ((CanCommitOffsets) 
stream.inputDStream()).commitAsync(offsetRanges);
+         }
+       });
 </div>
 </div>
 
@@ -141,7 +214,7 @@ For data stores that support transactions, saving offsets 
in the same transactio
 
        // begin from the the offsets committed to the database
        val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
-         new TopicPartition(resultSet.string("topic")), 
resultSet.int("partition")) -> resultSet.long("offset")
+         new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")) -> resultSet.long("offset")
        }.toMap
 
        val stream = KafkaUtils.createDirectStream[String, String](
@@ -155,16 +228,46 @@ For data stores that support transactions, saving offsets 
in the same transactio
 
          val results = yourCalculation(rdd)
 
-         yourTransactionBlock {
-           // update results
+         // begin your transaction
 
-           // update offsets where the end of existing offsets matches the 
beginning of this batch of offsets
+         // update results
+         // update offsets where the end of existing offsets matches the 
beginning of this batch of offsets
+         // assert that offsets were updated correctly
 
-           // assert that offsets were updated correctly
-         }
+         // end your transaction
        }
 </div>
 <div data-lang="java" markdown="1">
+       // The details depend on your data store, but the general idea looks 
like this
+
+       // begin from the the offsets committed to the database
+       Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+       for (resultSet : selectOffsetsFromYourDatabase)
+         fromOffsets.put(new TopicPartition(resultSet.string("topic"), 
resultSet.int("partition")), resultSet.long("offset"));
+       }
+
+       JavaInputDStream<ConsumerRecord<String, String>> stream = 
KafkaUtils.createDirectStream(
+         streamingContext,
+         LocationStrategies.PreferConsistent(),
+         ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), 
kafkaParams, fromOffsets)
+       );
+
+       stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, 
String>>>() {
+         @Override
+         public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+           OffsetRange[] offsetRanges = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges();
+           
+           Object results = yourCalculation(rdd);
+
+           // begin your transaction
+
+           // update results
+           // update offsets where the end of existing offsets matches the 
beginning of this batch of offsets
+           // assert that offsets were updated correctly
+
+           // end your transaction
+         }
+       });
 </div>
 </div>
 
@@ -185,6 +288,14 @@ The new Kafka consumer [supports 
SSL](http://kafka.apache.org/documentation.html
        )
 </div>
 <div data-lang="java" markdown="1">
+       Map<String, Object> kafkaParams = new HashMap<String, Object>();
+       // the usual params, make sure to change the port in bootstrap.servers 
if 9092 is not TLS
+       kafkaParams.put("security.protocol", "SSL");
+       kafkaParams.put("ssl.truststore.location", 
"/some-directory/kafka.client.truststore.jks");
+       kafkaParams.put("ssl.truststore.password", "test1234");
+       kafkaParams.put("ssl.keystore.location", 
"/some-directory/kafka.client.keystore.jks");
+       kafkaParams.put("ssl.keystore.password", "test1234");
+       kafkaParams.put("ssl.key.password", "test1234");
 </div>
 </div>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to