Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bd33d4ee8 -> eca401ee5


[SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs

 - Provide example on `message handler`
 - Provide bit on KPL record de-aggregation
 - Fix typos

Author: Burak Yavuz <brk...@gmail.com>

Closes #9970 from brkyvz/kinesis-docs.

(cherry picked from commit 2377b707f25449f4557bf048bb384c743d9008e5)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eca401ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eca401ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eca401ee

Branch: refs/heads/branch-1.6
Commit: eca401ee5d3ae683cbee531c1f8bc981f9603fc8
Parents: bd33d4e
Author: Burak Yavuz <brk...@gmail.com>
Authored: Fri Dec 18 15:24:41 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Dec 18 15:24:49 2015 -0800

----------------------------------------------------------------------
 docs/streaming-kinesis-integration.md | 54 +++++++++++++++++++++++++-----
 1 file changed, 45 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eca401ee/docs/streaming-kinesis-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 238a911..07194b0 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -23,7 +23,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
        **Note that by linking to this library, you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
 
-2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream as follows:
+2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream of byte array as follows:
 
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
@@ -36,7 +36,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                        [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
        See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the Running the Example section for instructions on how to run the 
example.
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions on how to run the example.
 
        </div>
        <div data-lang="java" markdown="1">
@@ -49,7 +49,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                        [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2);
 
        See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the next subsection for instructions to run the example.
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 
        </div>
        <div data-lang="python" markdown="1">
@@ -60,18 +60,47 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                        [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
        See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
-       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the next subsection for instructions to run the example.
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 
        </div>
        </div>
 
-    - `streamingContext`: StreamingContext containg an application name used 
by Kinesis to tie this Kinesis application to the Kinesis stream
+       You may also provide a "message handler function" that takes a Kinesis 
`Record` and returns a generic object `T`, in case you would like to use other 
data included in a `Record` such as partition key. This is currently only 
supported in Scala and Java.
 
-       - `[Kineiss app name]`: The application name that will be used to 
checkpoint the Kinesis
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+
+               import org.apache.spark.streaming.Duration
+               import org.apache.spark.streaming.kinesis._
+               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+               val kinesisStream = KinesisUtils.createStream[T](
+                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
+                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2,
+                       [message handler])
+
+       </div>
+       <div data-lang="java" markdown="1">
+
+               import org.apache.spark.streaming.Duration;
+               import org.apache.spark.streaming.kinesis.*;
+               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+
+               JavaReceiverInputDStream<T> kinesisStream = 
KinesisUtils.createStream(
+                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
+                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2,
+                       [message handler], [class T]);
+
+       </div>
+       </div>
+
+       - `streamingContext`: StreamingContext containg an application name 
used by Kinesis to tie this Kinesis application to the Kinesis stream
+
+       - `[Kinesis app name]`: The application name that will be used to 
checkpoint the Kinesis
                sequence numbers in DynamoDB table.
                - The application name must be unique for a given account and 
region.
                - If the table exists but has incorrect checkpoint information 
(for a different stream, or
-                 old expired sequenced numbers), then there may be temporary 
errors.
+                       old expired sequenced numbers), then there may be 
temporary errors.
 
        - `[Kinesis stream name]`: The Kinesis stream that this streaming 
application will pull data from.
 
@@ -83,6 +112,8 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
        - `[initial position]`: Can be either 
`InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see 
Kinesis Checkpointing section and Amazon Kinesis API documentation for more 
details).
 
+       - `[message handler]`: A function that takes a Kinesis `Record` and 
outputs generic `T`.
+
        In other versions of the API, you can also specify the AWS access key 
and secret key directly.
 
 3. **Deploying:** Package 
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by 
`spark-submit`) into the application JAR. Then use `spark-submit` to launch 
your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
@@ -99,7 +130,7 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                <img src="img/streaming-kinesis-arch.png"
                        title="Spark Streaming Kinesis Architecture"
                        alt="Spark Streaming Kinesis Architecture"
-              width="60%" 
+              width="60%"
         />
                <!-- Images are downsized intentionally to improve quality on 
retina displays -->
        </p>
@@ -165,11 +196,16 @@ To run the example,
 
        This will push 1000 lines per second of 10 random numbers per line to 
the Kinesis stream.  This data should then be received and processed by the 
running example.
 
+#### Record De-aggregation
+
+When data is generated using the [Kinesis Producer Library 
(KPL)](http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html),
 messages may be aggregated for cost savings. Spark Streaming will automatically
+de-aggregate records during consumption.
+
 #### Kinesis Checkpointing
 - Each Kinesis input DStream periodically stores the current position of the 
stream in the backing DynamoDB table.  This allows the system to recover from 
failures and continue processing where the DStream left off.
 
 - Checkpointing too frequently will cause excess load on the AWS checkpoint 
storage layer and may lead to AWS throttling.  The provided example handles 
this throttling with a random-backoff-retry strategy.
 
 - If no Kinesis checkpoint info exists when the input DStream starts, it will 
start either from the oldest record available 
(InitialPositionInStream.TRIM_HORIZON) or from the latest tip 
(InitialPostitionInStream.LATEST).  This is configurable.
-- InitialPositionInStream.LATEST could lead to missed records if data is added 
to the stream while no input DStreams are running (and no checkpoint info is 
being stored). 
+- InitialPositionInStream.LATEST could lead to missed records if data is added 
to the stream while no input DStreams are running (and no checkpoint info is 
being stored).
 - InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of 
records where the impact is dependent on checkpoint frequency and processing 
idempotency.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to