Repository: spark
Updated Branches:
refs/heads/master bcbe44440 -> 98ede4949
[SPARK-18198][DOC][STREAMING] Highlight code snippets
## What changes were proposed in this pull request?
This patch uses `{% highlight lang %}...{% endhighlight %}` to highlight code
snippets in the `Structured Streaming Kafka010 integration doc` and the `Spark
Streaming Kafka010 integration doc`.
This patch consists of two commits:
- the first commit fixes only the leading spaces -- this is large
- the second commit adds the highlight instructions -- this is much simpler and
easier to review
## How was this patch tested?
SKIP_API=1 jekyll build
## Screenshots
**Before**

**After**

Author: Liwei Lin <[email protected]>
Closes #15715 from lw-lin/doc-highlight-code-snippet.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98ede494
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98ede494
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98ede494
Branch: refs/heads/master
Commit: 98ede49496d0d7b4724085083d4f24436b92a7bf
Parents: bcbe444
Author: Liwei Lin <[email protected]>
Authored: Wed Nov 2 09:10:34 2016 +0000
Committer: Sean Owen <[email protected]>
Committed: Wed Nov 2 09:10:34 2016 +0000
----------------------------------------------------------------------
docs/streaming-kafka-0-10-integration.md | 391 +++++++++++---------
docs/structured-streaming-kafka-integration.md | 156 ++++----
2 files changed, 287 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/98ede494/docs/streaming-kafka-0-10-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-0-10-integration.md
b/docs/streaming-kafka-0-10-integration.md
index c1ef396..b645d3c 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -17,69 +17,72 @@ For Scala/Java applications using SBT/Maven project
definitions, link your strea
<div class="codetabs">
<div data-lang="scala" markdown="1">
- import org.apache.kafka.clients.consumer.ConsumerRecord
- import org.apache.kafka.common.serialization.StringDeserializer
- import org.apache.spark.streaming.kafka010._
- import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
- import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
-
- val kafkaParams = Map[String, Object](
- "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
- "key.deserializer" -> classOf[StringDeserializer],
- "value.deserializer" -> classOf[StringDeserializer],
- "group.id" -> "use_a_separate_group_id_for_each_stream",
- "auto.offset.reset" -> "latest",
- "enable.auto.commit" -> (false: java.lang.Boolean)
- )
-
- val topics = Array("topicA", "topicB")
- val stream = KafkaUtils.createDirectStream[String, String](
- streamingContext,
- PreferConsistent,
- Subscribe[String, String](topics, kafkaParams)
- )
-
- stream.map(record => (record.key, record.value))
-
+{% highlight scala %}
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.spark.streaming.kafka010._
+import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
+import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
+
+val kafkaParams = Map[String, Object](
+ "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
+ "key.deserializer" -> classOf[StringDeserializer],
+ "value.deserializer" -> classOf[StringDeserializer],
+ "group.id" -> "use_a_separate_group_id_for_each_stream",
+ "auto.offset.reset" -> "latest",
+ "enable.auto.commit" -> (false: java.lang.Boolean)
+)
+
+val topics = Array("topicA", "topicB")
+val stream = KafkaUtils.createDirectStream[String, String](
+ streamingContext,
+ PreferConsistent,
+ Subscribe[String, String](topics, kafkaParams)
+)
+
+stream.map(record => (record.key, record.value))
+{% endhighlight %}
Each item in the stream is a
[ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
</div>
<div data-lang="java" markdown="1">
- import java.util.*;
- import org.apache.spark.SparkConf;
- import org.apache.spark.TaskContext;
- import org.apache.spark.api.java.*;
- import org.apache.spark.api.java.function.*;
- import org.apache.spark.streaming.api.java.*;
- import org.apache.spark.streaming.kafka010.*;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.TopicPartition;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import scala.Tuple2;
-
- Map<String, Object> kafkaParams = new HashMap<>();
- kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
- kafkaParams.put("key.deserializer", StringDeserializer.class);
- kafkaParams.put("value.deserializer", StringDeserializer.class);
- kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
- kafkaParams.put("auto.offset.reset", "latest");
- kafkaParams.put("enable.auto.commit", false);
-
- Collection<String> topics = Arrays.asList("topicA", "topicB");
-
- final JavaInputDStream<ConsumerRecord<String, String>> stream =
- KafkaUtils.createDirectStream(
- streamingContext,
- LocationStrategies.PreferConsistent(),
- ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
- );
-
- stream.mapToPair(
- new PairFunction<ConsumerRecord<String, String>, String, String>() {
- @Override
- public Tuple2<String, String> call(ConsumerRecord<String, String>
record) {
- return new Tuple2<>(record.key(), record.value());
- }
- })
+{% highlight java %}
+import java.util.*;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.kafka010.*;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import scala.Tuple2;
+
+Map<String, Object> kafkaParams = new HashMap<>();
+kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
+kafkaParams.put("key.deserializer", StringDeserializer.class);
+kafkaParams.put("value.deserializer", StringDeserializer.class);
+kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
+kafkaParams.put("auto.offset.reset", "latest");
+kafkaParams.put("enable.auto.commit", false);
+
+Collection<String> topics = Arrays.asList("topicA", "topicB");
+
+final JavaInputDStream<ConsumerRecord<String, String>> stream =
+ KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
+ );
+
+stream.mapToPair(
+ new PairFunction<ConsumerRecord<String, String>, String, String>() {
+ @Override
+ public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
+ return new Tuple2<>(record.key(), record.value());
+ }
+ })
+{% endhighlight %}
</div>
</div>
@@ -109,32 +112,35 @@ If you have a use case that is better suited to batch
processing, you can create
<div class="codetabs">
<div data-lang="scala" markdown="1">
- // Import dependencies and create kafka params as in Create Direct
Stream above
-
- val offsetRanges = Array(
- // topic, partition, inclusive starting offset, exclusive ending
offset
- OffsetRange("test", 0, 0, 100),
- OffsetRange("test", 1, 0, 100)
- )
+{% highlight scala %}
+// Import dependencies and create kafka params as in Create Direct Stream above
- val rdd = KafkaUtils.createRDD[String, String](sparkContext,
kafkaParams, offsetRanges, PreferConsistent)
+val offsetRanges = Array(
+ // topic, partition, inclusive starting offset, exclusive ending offset
+ OffsetRange("test", 0, 0, 100),
+ OffsetRange("test", 1, 0, 100)
+)
+val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams,
offsetRanges, PreferConsistent)
+{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
- // Import dependencies and create kafka params as in Create Direct
Stream above
-
- OffsetRange[] offsetRanges = {
- // topic, partition, inclusive starting offset, exclusive ending
offset
- OffsetRange.create("test", 0, 0, 100),
- OffsetRange.create("test", 1, 0, 100)
- };
-
- JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
- sparkContext,
- kafkaParams,
- offsetRanges,
- LocationStrategies.PreferConsistent()
- );
+{% highlight java %}
+// Import dependencies and create kafka params as in Create Direct Stream above
+
+OffsetRange[] offsetRanges = {
+ // topic, partition, inclusive starting offset, exclusive ending offset
+ OffsetRange.create("test", 0, 0, 100),
+ OffsetRange.create("test", 1, 0, 100)
+};
+
+JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
+ sparkContext,
+ kafkaParams,
+ offsetRanges,
+ LocationStrategies.PreferConsistent()
+);
+{% endhighlight %}
</div>
</div>
@@ -144,29 +150,33 @@ Note that you cannot use `PreferBrokers`, because without
the stream there is no
<div class="codetabs">
<div data-lang="scala" markdown="1">
- stream.foreachRDD { rdd =>
- val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- rdd.foreachPartition { iter =>
- val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
- println(s"${o.topic} ${o.partition} ${o.fromOffset}
${o.untilOffset}")
- }
- }
+{% highlight scala %}
+stream.foreachRDD { rdd =>
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ rdd.foreachPartition { iter =>
+ val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
+ println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
+ }
+}
+{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
- stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String,
String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- final OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
- rdd.foreachPartition(new
VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
- @Override
- public void call(Iterator<ConsumerRecord<String, String>>
consumerRecords) {
- OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " "
+ o.untilOffset());
- }
- });
- }
- });
+{% highlight java %}
+stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
+ @Override
+ public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+ final OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
+ rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String,
String>>>() {
+ @Override
+ public void call(Iterator<ConsumerRecord<String, String>>
consumerRecords) {
+ OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " +
o.untilOffset());
+ }
+ });
+ }
+});
+{% endhighlight %}
</div>
</div>
@@ -183,25 +193,28 @@ Kafka has an offset commit API that stores offsets in a
special Kafka topic. By
<div class="codetabs">
<div data-lang="scala" markdown="1">
- stream.foreachRDD { rdd =>
- val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-
- // some time later, after outputs have completed
- stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
- }
-
+{% highlight scala %}
+stream.foreachRDD { rdd =>
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+
+ // some time later, after outputs have completed
+ stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
+}
+{% endhighlight %}
As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if
called on the result of createDirectStream, not after transformations. The
commitAsync call is threadsafe, but must occur after outputs if you want
meaningful semantics.
</div>
<div data-lang="java" markdown="1">
- stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String,
String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
-
- // some time later, after outputs have completed
- ((CanCommitOffsets)
stream.inputDStream()).commitAsync(offsetRanges);
- }
- });
+{% highlight java %}
+stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
+ @Override
+ public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ // some time later, after outputs have completed
+ ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
+ }
+});
+{% endhighlight %}
</div>
</div>
@@ -210,64 +223,68 @@ For data stores that support transactions, saving offsets
in the same transactio
<div class="codetabs">
<div data-lang="scala" markdown="1">
- // The details depend on your data store, but the general idea looks
like this
+{% highlight scala %}
+// The details depend on your data store, but the general idea looks like this
- // begin from the the offsets committed to the database
- val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
- new TopicPartition(resultSet.string("topic"),
resultSet.int("partition")) -> resultSet.long("offset")
- }.toMap
+// begin from the the offsets committed to the database
+val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
+ new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) ->
resultSet.long("offset")
+}.toMap
- val stream = KafkaUtils.createDirectStream[String, String](
- streamingContext,
- PreferConsistent,
- Assign[String, String](fromOffsets.keys.toList, kafkaParams,
fromOffsets)
- )
+val stream = KafkaUtils.createDirectStream[String, String](
+ streamingContext,
+ PreferConsistent,
+ Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
+)
- stream.foreachRDD { rdd =>
- val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+stream.foreachRDD { rdd =>
+ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- val results = yourCalculation(rdd)
+ val results = yourCalculation(rdd)
- // begin your transaction
+ // begin your transaction
- // update results
- // update offsets where the end of existing offsets matches the
beginning of this batch of offsets
- // assert that offsets were updated correctly
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of
this batch of offsets
+ // assert that offsets were updated correctly
- // end your transaction
- }
+ // end your transaction
+}
+{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
- // The details depend on your data store, but the general idea looks
like this
-
- // begin from the the offsets committed to the database
- Map<TopicPartition, Long> fromOffsets = new HashMap<>();
- for (resultSet : selectOffsetsFromYourDatabase)
- fromOffsets.put(new TopicPartition(resultSet.string("topic"),
resultSet.int("partition")), resultSet.long("offset"));
- }
-
- JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
- streamingContext,
- LocationStrategies.PreferConsistent(),
- ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(),
kafkaParams, fromOffsets)
- );
-
- stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String,
String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
-
- Object results = yourCalculation(rdd);
-
- // begin your transaction
-
- // update results
- // update offsets where the end of existing offsets matches the
beginning of this batch of offsets
- // assert that offsets were updated correctly
-
- // end your transaction
- }
- });
+{% highlight java %}
+// The details depend on your data store, but the general idea looks like this
+
+// begin from the the offsets committed to the database
+Map<TopicPartition, Long> fromOffsets = new HashMap<>();
+for (resultSet : selectOffsetsFromYourDatabase)
+ fromOffsets.put(new TopicPartition(resultSet.string("topic"),
resultSet.int("partition")), resultSet.long("offset"));
+}
+
+JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
+ streamingContext,
+ LocationStrategies.PreferConsistent(),
+ ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams,
fromOffsets)
+);
+
+stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
+ @Override
+ public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
+
+ // begin your transaction
+
+ // update results
+ // update offsets where the end of existing offsets matches the beginning
of this batch of offsets
+ // assert that offsets were updated correctly
+
+ // end your transaction
+ }
+});
+{% endhighlight %}
</div>
</div>
@@ -277,25 +294,29 @@ The new Kafka consumer [supports
SSL](http://kafka.apache.org/documentation.html
<div class="codetabs">
<div data-lang="scala" markdown="1">
- val kafkaParams = Map[String, Object](
- // the usual params, make sure to change the port in
bootstrap.servers if 9092 is not TLS
- "security.protocol" -> "SSL",
- "ssl.truststore.location" ->
"/some-directory/kafka.client.truststore.jks",
- "ssl.truststore.password" -> "test1234",
- "ssl.keystore.location" ->
"/some-directory/kafka.client.keystore.jks",
- "ssl.keystore.password" -> "test1234",
- "ssl.key.password" -> "test1234"
- )
+{% highlight scala %}
+val kafkaParams = Map[String, Object](
+ // the usual params, make sure to change the port in bootstrap.servers if
9092 is not TLS
+ "security.protocol" -> "SSL",
+ "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
+ "ssl.truststore.password" -> "test1234",
+ "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
+ "ssl.keystore.password" -> "test1234",
+ "ssl.key.password" -> "test1234"
+)
+{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
- Map<String, Object> kafkaParams = new HashMap<String, Object>();
- // the usual params, make sure to change the port in bootstrap.servers
if 9092 is not TLS
- kafkaParams.put("security.protocol", "SSL");
- kafkaParams.put("ssl.truststore.location",
"/some-directory/kafka.client.truststore.jks");
- kafkaParams.put("ssl.truststore.password", "test1234");
- kafkaParams.put("ssl.keystore.location",
"/some-directory/kafka.client.keystore.jks");
- kafkaParams.put("ssl.keystore.password", "test1234");
- kafkaParams.put("ssl.key.password", "test1234");
+{% highlight java %}
+Map<String, Object> kafkaParams = new HashMap<String, Object>();
+// the usual params, make sure to change the port in bootstrap.servers if 9092
is not TLS
+kafkaParams.put("security.protocol", "SSL");
+kafkaParams.put("ssl.truststore.location",
"/some-directory/kafka.client.truststore.jks");
+kafkaParams.put("ssl.truststore.password", "test1234");
+kafkaParams.put("ssl.keystore.location",
"/some-directory/kafka.client.keystore.jks");
+kafkaParams.put("ssl.keystore.password", "test1234");
+kafkaParams.put("ssl.key.password", "test1234");
+{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/spark/blob/98ede494/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md
b/docs/structured-streaming-kafka-integration.md
index a6c3b3a..c4c9fb3 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -19,97 +19,103 @@ application. See the [Deploying](#deploying) subsection
below.
<div class="codetabs">
<div data-lang="scala" markdown="1">
+{% highlight scala %}
- // Subscribe to 1 topic
- val ds1 = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1")
- .load()
- ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
+// Subscribe to 1 topic
+val ds1 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
- // Subscribe to multiple topics
- val ds2 = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1,topic2")
- .load()
- ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
+// Subscribe to multiple topics
+val ds2 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
- // Subscribe to a pattern
- val ds3 = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribePattern", "topic.*")
- .load()
- ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
+// Subscribe to a pattern
+val ds3 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
+{% highlight java %}
- // Subscribe to 1 topic
- Dataset<Row> ds1 = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1")
- .load()
- ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+// Subscribe to 1 topic
+Dataset<Row> ds1 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- // Subscribe to multiple topics
- Dataset<Row> ds2 = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1,topic2")
- .load()
- ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+// Subscribe to multiple topics
+Dataset<Row> ds2 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- // Subscribe to a pattern
- Dataset<Row> ds3 = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribePattern", "topic.*")
- .load()
- ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+// Subscribe to a pattern
+Dataset<Row> ds3 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
+{% highlight python %}
- # Subscribe to 1 topic
- ds1 = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1")
- .load()
- ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+# Subscribe to 1 topic
+ds1 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1")
+ .load()
+ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- # Subscribe to multiple topics
- ds2 = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribe", "topic1,topic2")
- .load()
- ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+# Subscribe to multiple topics
+ds2 = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribe", "topic1,topic2")
+ .load()
+ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- # Subscribe to a pattern
- ds3 = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("subscribePattern", "topic.*")
- .load()
- ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+# Subscribe to a pattern
+ds3 = spark
+ .readStream()
+ .format("kafka")
+ .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+ .option("subscribePattern", "topic.*")
+ .load()
+ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+{% endhighlight %}
</div>
</div>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]