This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1f68dc73f69f [SPARK-46775][DOCS] Fix formatting of Kinesis docs
1f68dc73f69f is described below
commit 1f68dc73f69fa577619d9fdf99482c108e69dbe1
Author: Nicholas Chammas <[email protected]>
AuthorDate: Mon Jan 22 09:50:03 2024 +0900
[SPARK-46775][DOCS] Fix formatting of Kinesis docs
### What changes were proposed in this pull request?
- Convert the mixed indentation styles (tabs and spaces) to spaces only.
- Add syntax highlighting to the code blocks.
- Fix a couple of broken links to API docs.
### Why are the changes needed?
This makes the docs a bit easier to read and edit.
### Does this PR introduce _any_ user-facing change?
Yes, it changes the formatting of this documentation.
### How was this patch tested?
I built the docs and manually reviewed them.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44802 from nchammas/kinesis-docs.
Authored-by: Nicholas Chammas <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
docs/streaming-kinesis-integration.md | 399 ++++++++++++++++++----------------
1 file changed, 208 insertions(+), 191 deletions(-)
diff --git a/docs/streaming-kinesis-integration.md
b/docs/streaming-kinesis-integration.md
index ed19ddcc9b08..0396d3cc64d1 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -32,201 +32,216 @@ A Kinesis stream can be set up at one of the valid
Kinesis endpoints with 1 or m
1. **Linking:** For Scala/Java applications using SBT/Maven project
definitions, link your streaming application against the following artifact
(see [Linking section](streaming-programming-guide.html#linking) in the main
programming guide for further information).
- groupId = org.apache.spark
- artifactId =
spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
- version = {{site.SPARK_VERSION_SHORT}}
+ groupId = org.apache.spark
+ artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
+ version = {{site.SPARK_VERSION_SHORT}}
- For Python applications, you will have to add this above library and
its dependencies when deploying your application. See the *Deploying*
subsection below.
- **Note that by linking to this library, you will include
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
+ For Python applications, you will have to add this above library and its
dependencies when deploying your application. See the *Deploying* subsection
below.
+ **Note that by linking to this library, you will include
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
2. **Programming:** In the streaming application code, import
`KinesisInputDStream` and create the input DStream of byte array as follows:
- <div class="codetabs">
+ <div class="codetabs">
<div data-lang="python" markdown="1">
- from pyspark.streaming.kinesis import KinesisUtils,
InitialPositionInStream
-
- kinesisStream = KinesisUtils.createStream(
- streamingContext, [Kinesis app name], [Kinesis stream name],
[endpoint URL],
- [region name], [initial position], [checkpoint interval],
[metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
-
- See the [API docs](api/python/reference/pyspark.streaming.html#kinesis)
- and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
Refer to the [Running the Example](#running-the-example) subsection for
instructions to run the example.
-
- - CloudWatch metrics level and dimensions. See [the AWS documentation
about monitoring
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)
for details. Default is MetricsLevel.DETAILED
-
- </div>
-
- <div data-lang="scala" markdown="1">
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.kinesis.KinesisInputDStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import org.apache.spark.streaming.kinesis.KinesisInitialPositions
-
- val kinesisStream = KinesisInputDStream.builder
- .streamingContext(streamingContext)
- .endpointUrl([endpoint URL])
- .regionName([region name])
- .streamName([streamName])
- .initialPosition([initial position])
- .checkpointAppName([Kinesis app name])
- .checkpointInterval([checkpoint interval])
- .metricsLevel([metricsLevel.DETAILED])
- .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
- .build()
-
- See the [API
docs](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.html)
- and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
Refer to the [Running the Example](#running-the-example) subsection for
instructions on how to run the example.
-
- </div>
-
- <div data-lang="java" markdown="1">
- import org.apache.spark.storage.StorageLevel;
- import org.apache.spark.streaming.kinesis.KinesisInputDStream;
- import org.apache.spark.streaming.Seconds;
- import org.apache.spark.streaming.StreamingContext;
- import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
-
- KinesisInputDStream<byte[]> kinesisStream =
KinesisInputDStream.builder()
- .streamingContext(streamingContext)
- .endpointUrl([endpoint URL])
- .regionName([region name])
- .streamName([streamName])
- .initialPosition([initial position])
- .checkpointAppName([Kinesis app name])
- .checkpointInterval([checkpoint interval])
- .metricsLevel([metricsLevel.DETAILED])
- .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
- .build();
-
- See the [API
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisInputDStream.html)
- and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
Refer to the [Running the Example](#running-the-example) subsection for
instructions to run the example.
-
- </div>
-
- </div>
-
- You may also provide the following settings. This is currently only
supported in Scala and Java.
-
- - A "message handler function" that takes a Kinesis `Record` and
returns a generic object `T`, in case you would like to use other data included
in a `Record` such as partition key.
-
- <div class="codetabs">
- <div data-lang="scala" markdown="1">
- import collection.JavaConverters._
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.kinesis.KinesisInputDStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
- import
org.apache.spark.streaming.kinesis.KinesisInitialPositions
- import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
- import
com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
-
- val kinesisStream = KinesisInputDStream.builder
- .streamingContext(streamingContext)
- .endpointUrl([endpoint URL])
- .regionName([region name])
- .streamName([streamName])
- .initialPosition([initial position])
- .checkpointAppName([Kinesis app name])
- .checkpointInterval([checkpoint interval])
- .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
- .metricsLevel(MetricsLevel.DETAILED)
-
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
- .buildWithMessageHandler([message handler])
-
- </div>
- <div data-lang="java" markdown="1">
- import org.apache.spark.storage.StorageLevel;
- import org.apache.spark.streaming.kinesis.KinesisInputDStream;
- import org.apache.spark.streaming.Seconds;
- import org.apache.spark.streaming.StreamingContext;
- import
org.apache.spark.streaming.kinesis.KinesisInitialPositions;
- import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
- import
com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
- import scala.collection.JavaConverters;
-
- KinesisInputDStream<byte[]> kinesisStream =
KinesisInputDStream.builder()
- .streamingContext(streamingContext)
- .endpointUrl([endpoint URL])
- .regionName([region name])
- .streamName([streamName])
- .initialPosition([initial position])
- .checkpointAppName([Kinesis app name])
- .checkpointInterval([checkpoint interval])
- .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
- .metricsLevel(MetricsLevel.DETAILED)
-
.metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
- .buildWithMessageHandler([message handler]);
-
- </div>
- </div>
-
- - `streamingContext`: StreamingContext containing an application name
used by Kinesis to tie this Kinesis application to the Kinesis stream
-
- - `[Kinesis app name]`: The application name that will be used to
checkpoint the Kinesis
- sequence numbers in DynamoDB table.
- - The application name must be unique for a given account and
region.
- - If the table exists but has incorrect checkpoint information
(for a different stream, or
- old expired sequenced numbers), then there may be
temporary errors.
-
- - `[Kinesis stream name]`: The Kinesis stream that this streaming
application will pull data from.
-
- - `[endpoint URL]`: Valid Kinesis endpoints URL can be found
[here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
-
- - `[region name]`: Valid Kinesis region names can be found
[here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
-
- - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2
seconds) at which the Kinesis Client Library saves its position in the stream.
For starters, set it to the same as the batch interval of the streaming
application.
-
- - `[initial position]`: Can be either
`KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or
`KinesisInitialPositions.AtTimestamp` (see [`Kinesis
Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API
documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html)
for more details).
-
- - `[message handler]`: A function that takes a Kinesis `Record` and
outputs generic `T`.
-
- In other versions of the API, you can also specify the AWS access key
and secret key directly.
+ ```python
+ from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+
+ kinesisStream = KinesisUtils.createStream(
+ streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint
URL],
+ [region name], [initial position], [checkpoint interval],
[metricsLevel.DETAILED],
+ StorageLevel.MEMORY_AND_DISK_2)
+ ```
+
+ See the [API docs](api/python/reference/pyspark.streaming.html#kinesis)
+ and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
Refer to the [Running the Example](#running-the-example) subsection for
instructions to run the example.
+
+ - CloudWatch metrics level and dimensions. See [the AWS documentation
about monitoring
KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html)
for details. Default is `MetricsLevel.DETAILED`.
+
+ </div>
+
+ <div data-lang="scala" markdown="1">
+ ```scala
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.{Seconds, StreamingContext}
+ import org.apache.spark.streaming.kinesis.KinesisInitialPositions
+
+ val kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPosition([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .metricsLevel([metricsLevel.DETAILED])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .build()
+ ```
+
+ See the [API
docs](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream$.html)
+ and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
Refer to the [Running the Example](#running-the-example) subsection for
instructions on how to run the example.
+
+ </div>
+
+ <div data-lang="java" markdown="1">
+ ```java
+ import org.apache.spark.storage.StorageLevel;
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream;
+ import org.apache.spark.streaming.Seconds;
+ import org.apache.spark.streaming.StreamingContext;
+ import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
+
+ KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPosition([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .metricsLevel([metricsLevel.DETAILED])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .build();
+ ```
+
+ See the [API
docs](api/java/org/apache/spark/streaming/kinesis/package-summary.html)
+ and the
[example]({{site.SPARK_GITHUB_URL}}/tree/master/connector/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
Refer to the [Running the Example](#running-the-example) subsection for
instructions to run the example.
+
+ </div>
+
+ </div>
+
+ You may also provide the following settings. This is currently only
supported in Scala and Java.
+
+ - A "message handler function" that takes a Kinesis `Record` and returns a
generic object `T`, in case you would like to use other data included in a
`Record` such as partition key.
+
+ <div class="codetabs">
+ <div data-lang="scala" markdown="1">
+ ```scala
+ import collection.JavaConverters._
+ import org.apache.spark.storage.StorageLevel
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream
+ import org.apache.spark.streaming.{Seconds, StreamingContext}
+ import org.apache.spark.streaming.kinesis.KinesisInitialPositions
+ import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
+ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
+
+ val kinesisStream = KinesisInputDStream.builder
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPosition([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .metricsLevel(MetricsLevel.DETAILED)
+
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
+ .buildWithMessageHandler([message handler])
+ ```
+
+ </div>
+ <div data-lang="java" markdown="1">
+ ```java
+ import org.apache.spark.storage.StorageLevel;
+ import org.apache.spark.streaming.kinesis.KinesisInputDStream;
+ import org.apache.spark.streaming.Seconds;
+ import org.apache.spark.streaming.StreamingContext;
+ import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
+ import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
+ import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
+ import scala.collection.JavaConverters;
+
+ KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
+ .streamingContext(streamingContext)
+ .endpointUrl([endpoint URL])
+ .regionName([region name])
+ .streamName([streamName])
+ .initialPosition([initial position])
+ .checkpointAppName([Kinesis app name])
+ .checkpointInterval([checkpoint interval])
+ .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+ .metricsLevel(MetricsLevel.DETAILED)
+ .metricsEnabledDimensions(
+ JavaConverters.asScalaSetConverter(
+
KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS
+ )
+ .asScala().toSet()
+ )
+ .buildWithMessageHandler([message handler]);
+ ```
+
+ </div>
+ </div>
+
+ - `streamingContext`: StreamingContext containing an application name used
by Kinesis to tie this Kinesis application to the Kinesis stream
+
+ - `[Kinesis app name]`: The application name that will be used to
checkpoint the Kinesis
+ sequence numbers in DynamoDB table.
+ - The application name must be unique for a given account and region.
+ - If the table exists but has incorrect checkpoint information (for a
different stream, or
+ old expired sequenced numbers), then there may be temporary errors.
+
+ - `[Kinesis stream name]`: The Kinesis stream that this streaming
application will pull data from.
+
+ - `[endpoint URL]`: Valid Kinesis endpoints URL can be found
[here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
+
+ - `[region name]`: Valid Kinesis region names can be found
[here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
+
+ - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds)
at which the Kinesis Client Library saves its position in the stream. For
starters, set it to the same as the batch interval of the streaming application.
+
+ - `[initial position]`: Can be either
`KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or
`KinesisInitialPositions.AtTimestamp` (see [`Kinesis
Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API
documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html)
for more details).
+
+ - `[message handler]`: A function that takes a Kinesis `Record` and
outputs generic `T`.
+
+ In other versions of the API, you can also specify the AWS access key and
secret key directly.
3. **Deploying:** As with any Spark applications, `spark-submit` is used to
launch your application. However, the details are slightly different for
Scala/Java applications and Python applications.
- For Scala and Java applications, if you are using SBT or Maven for
project management, then package
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its
dependencies into the application JAR. Make sure
`spark-core_{{site.SCALA_BINARY_VERSION}}` and
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided`
dependencies as those are already present in a Spark installation. Then use
`spark-submit` to launch your application (see [Deploying section](streaming-p
[...]
+ For Scala and Java applications, if you are using SBT or Maven for project
management, then package
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its
dependencies into the application JAR. Make sure
`spark-core_{{site.SCALA_BINARY_VERSION}}` and
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided`
dependencies as those are already present in a Spark installation. Then use
`spark-submit` to launch your application (see [Deploying section](streamin
[...]
- For Python applications which lack SBT/Maven project management,
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its
dependencies can be directly added to `spark-submit` using `--packages` (see
[Application Submission Guide](submitting-applications.html)). That is,
+ For Python applications which lack SBT/Maven project management,
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its
dependencies can be directly added to `spark-submit` using `--packages` (see
[Application Submission Guide](submitting-applications.html)). That is,
- ./bin/spark-submit --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
...
+ ./bin/spark-submit --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
...
- Alternatively, you can also download the JAR of the Maven artifact
`spark-streaming-kinesis-asl-assembly` from the
- [Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
+ Alternatively, you can also download the JAR of the Maven artifact
`spark-streaming-kinesis-asl-assembly` from the
+ [Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
- <p style="text-align: center;">
- <img src="img/streaming-kinesis-arch.png"
- title="Spark Streaming Kinesis Architecture"
- alt="Spark Streaming Kinesis Architecture"
- width="60%"
+ <p style="text-align: center;">
+ <img src="img/streaming-kinesis-arch.png"
+ title="Spark Streaming Kinesis Architecture"
+ alt="Spark Streaming Kinesis Architecture"
+ width="60%"
/>
- <!-- Images are downsized intentionally to improve quality on
retina displays -->
- </p>
+ </p>
- *Points to remember at runtime:*
+ *Points to remember at runtime:*
- - Kinesis data processing is ordered per partition and occurs at-least
once per message.
+ - Kinesis data processing is ordered per partition and occurs at-least
once per message.
- - Multiple applications can read from the same Kinesis stream. Kinesis
will maintain the application-specific shard and checkpoint info in DynamoDB.
+ - Multiple applications can read from the same Kinesis stream. Kinesis
will maintain the application-specific shard and checkpoint info in DynamoDB.
- - A single Kinesis stream shard is processed by one input DStream at a
time.
+ - A single Kinesis stream shard is processed by one input DStream at a
time.
- - A single Kinesis input DStream can read from multiple shards of a
Kinesis stream by creating multiple KinesisRecordProcessor threads.
+ - A single Kinesis input DStream can read from multiple shards of a
Kinesis stream by creating multiple KinesisRecordProcessor threads.
- - Multiple input DStreams running in separate processes/instances can
read from a Kinesis stream.
+ - Multiple input DStreams running in separate processes/instances can read
from a Kinesis stream.
- - You never need more Kinesis input DStreams than the number of Kinesis
stream shards as each input DStream will create at least one
KinesisRecordProcessor thread that handles a single shard.
+ - You never need more Kinesis input DStreams than the number of Kinesis
stream shards as each input DStream will create at least one
KinesisRecordProcessor thread that handles a single shard.
- - Horizontal scaling is achieved by adding/removing Kinesis input
DStreams (within a single process or across multiple processes/instances) - up
to the total number of Kinesis stream shards per the previous point.
+ - Horizontal scaling is achieved by adding/removing Kinesis input
DStreams (within a single process or across multiple processes/instances) - up
to the total number of Kinesis stream shards per the previous point.
- - The Kinesis input DStream will balance the load between all DStreams
- even across processes/instances.
+ - The Kinesis input DStream will balance the load between all DStreams -
even across processes/instances.
- - The Kinesis input DStream will balance the load during re-shard
events (merging and splitting) due to changes in load.
+ - The Kinesis input DStream will balance the load during re-shard events
(merging and splitting) due to changes in load.
- - As a best practice, it's recommended that you avoid re-shard jitter
by over-provisioning when possible.
+ - As a best practice, it's recommended that you avoid re-shard jitter by
over-provisioning when possible.
- - Each Kinesis input DStream maintains its own checkpoint info. See
the Kinesis Checkpointing section for more details.
+ - Each Kinesis input DStream maintains its own checkpoint info. See the
Kinesis Checkpointing section for more details.
- - There is no correlation between the number of Kinesis stream shards
and the number of RDD partitions/shards created across the Spark cluster during
input DStream processing. These are 2 independent partitioning schemes.
+ - There is no correlation between the number of Kinesis stream shards and
the number of RDD partitions/shards created across the Spark cluster during
input DStream processing. These are 2 independent partitioning schemes.
#### Running the Example
To run the example,
@@ -239,37 +254,39 @@ To run the example,
- In the Spark root directory, run the example as
- <div class="codetabs">
+ <div class="codetabs">
<div data-lang="python" markdown="1">
-
- ./bin/spark-submit --jars
'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar'
\
-
connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
\
- [Kinesis app name] [Kinesis stream name] [endpoint URL] [region
name]
-
- </div>
-
- <div data-lang="scala" markdown="1">
-
- ./bin/run-example --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name]
[endpoint URL]
-
- </div>
-
- <div data-lang="java" markdown="1">
-
- ./bin/run-example --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name]
[endpoint URL]
-
- </div>
-
- </div>
+ ```sh
+ ./bin/spark-submit --jars
'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar'
\
+
connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py
\
+ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
+ ```
+ </div>
+
+ <div data-lang="scala" markdown="1">
+ ```sh
+ ./bin/run-example --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name]
[endpoint URL]
+ ```
+ </div>
+
+ <div data-lang="java" markdown="1">
+ ```sh
+ ./bin/run-example --packages
org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name]
[endpoint URL]
+ ```
+ </div>
+
+ </div>
This will wait for data to be received from the Kinesis stream.
- To generate random string data to put onto the Kinesis stream, in another
terminal, run the associated Kinesis data producer.
- ./bin/run-example streaming.KinesisWordProducerASL [Kinesis
stream name] [endpoint URL] 1000 10
+ ```sh
+ ./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name]
[endpoint URL] 1000 10
+ ```
- This will push 1000 lines per second of 10 random numbers per line to
the Kinesis stream. This data should then be received and processed by the
running example.
+ This will push 1000 lines per second of 10 random numbers per line to the
Kinesis stream. This data should then be received and processed by the running
example.
#### Record De-aggregation
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]