Repository: spark
Updated Branches:
  refs/heads/master 7f295059c -> 4f77c0623


[SPARK-20855][Docs][DStream] Update the Spark kinesis docs to use the 
KinesisInputDStream builder instead of deprecated KinesisUtils

## What changes were proposed in this pull request?

The examples and docs for Spark-Kinesis integrations use the deprecated 
KinesisUtils. We should update the docs to use the KinesisInputDStream builder 
to create DStreams.

## How was this patch tested?

The patch primarily updates the documents. The patch will also need to make 
changes to the Spark-Kinesis examples. The examples need to be tested.

Author: Yash Sharma <[email protected]>

Closes #18071 from yssharma/ysharma/kinesis_docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f77c062
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f77c062
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f77c062

Branch: refs/heads/master
Commit: 4f77c0623885d4a7455f9841a888d9f6e098e7f0
Parents: 7f29505
Author: Yash Sharma <[email protected]>
Authored: Tue Jul 25 08:27:03 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Tue Jul 25 08:27:03 2017 +0100

----------------------------------------------------------------------
 docs/streaming-kinesis-integration.md           | 108 ++++++++++++-------
 .../streaming/KinesisWordCountASL.scala         |  15 ++-
 2 files changed, 80 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f77c062/docs/streaming-kinesis-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 9709bd3..678b064 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -24,41 +24,58 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
        For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
        **Note that by linking to this library, you will include 
[ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
 
-2. **Programming:** In the streaming application code, import `KinesisUtils` 
and create the input DStream of byte array as follows:
+2. **Programming:** In the streaming application code, import 
`KinesisInputDStream` and create the input DStream of byte array as follows:
 
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
-               import org.apache.spark.streaming.Duration
-               import org.apache.spark.streaming.kinesis._
-               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-
-               val kinesisStream = KinesisUtils.createStream(
-                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
-                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
-
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
+            import org.apache.spark.storage.StorageLevel
+            import org.apache.spark.streaming.kinesis.KinesisInputDStream
+            import org.apache.spark.streaming.{Seconds, StreamingContext}
+            import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+            val kinesisStream = KinesisInputDStream.builder
+                .streamingContext(streamingContext)
+                .endpointUrl([endpoint URL])
+                .regionName([region name])
+                .streamName([streamName])
+                .initialPositionInStream([initial position])
+                .checkpointAppName([Kinesis app name])
+                .checkpointInterval([checkpoint interval])
+                .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+                .build()
+
+       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisInputDStream)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions on how to run the example.
 
        </div>
        <div data-lang="java" markdown="1">
-               import org.apache.spark.streaming.Duration;
-               import org.apache.spark.streaming.kinesis.*;
-               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-               JavaReceiverInputDStream<byte[]> kinesisStream = 
KinesisUtils.createStream(
-                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
-                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2);
+            import org.apache.spark.storage.StorageLevel
+            import org.apache.spark.streaming.kinesis.KinesisInputDStream
+            import org.apache.spark.streaming.Seconds
+            import org.apache.spark.streaming.StreamingContext
+            import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+            KinesisInputDStream<byte[]> kinesisStream = 
KinesisInputDStream.builder
+                .streamingContext(streamingContext)
+                .endpointUrl([endpoint URL])
+                .regionName([region name])
+                .streamName([streamName])
+                .initialPositionInStream([initial position])
+                .checkpointAppName([Kinesis app name])
+                .checkpointInterval([checkpoint interval])
+                .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+                .build();
 
        See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
 
        </div>
        <div data-lang="python" markdown="1">
-               from pyspark.streaming.kinesis import KinesisUtils, 
InitialPositionInStream
+            from pyspark.streaming.kinesis import KinesisUtils, 
InitialPositionInStream
 
-               kinesisStream = KinesisUtils.createStream(
-                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
-                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
+            kinesisStream = KinesisUtils.createStream(
+                streamingContext, [Kinesis app name], [Kinesis stream name], 
[endpoint URL],
+                [region name], [initial position], [checkpoint interval], 
StorageLevel.MEMORY_AND_DISK_2)
 
        See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py).
 Refer to the [Running the Example](#running-the-example) subsection for 
instructions to run the example.
@@ -70,27 +87,40 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
-
-               import org.apache.spark.streaming.Duration
-               import org.apache.spark.streaming.kinesis._
-               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-
-               val kinesisStream = KinesisUtils.createStream[T](
-                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
-                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2,
-                       [message handler])
+                import org.apache.spark.storage.StorageLevel
+                import org.apache.spark.streaming.kinesis.KinesisInputDStream
+                import org.apache.spark.streaming.{Seconds, StreamingContext}
+                import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+                val kinesisStream = KinesisInputDStream.builder
+                    .streamingContext(streamingContext)
+                    .endpointUrl([endpoint URL])
+                    .regionName([region name])
+                    .streamName([streamName])
+                    .initialPositionInStream([initial position])
+                    .checkpointAppName([Kinesis app name])
+                    .checkpointInterval([checkpoint interval])
+                    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+                    .buildWithMessageHandler([message handler])
 
        </div>
        <div data-lang="java" markdown="1">
-
-               import org.apache.spark.streaming.Duration;
-               import org.apache.spark.streaming.kinesis.*;
-               import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-
-               JavaReceiverInputDStream<T> kinesisStream = 
KinesisUtils.createStream(
-                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
-                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2,
-                       [message handler], [class T]);
+                import org.apache.spark.storage.StorageLevel
+                import org.apache.spark.streaming.kinesis.KinesisInputDStream
+                import org.apache.spark.streaming.Seconds
+                import org.apache.spark.streaming.StreamingContext
+                import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
+
+                KinesisInputDStream<byte[]> kinesisStream = 
KinesisInputDStream.builder
+                    .streamingContext(streamingContext)
+                    .endpointUrl([endpoint URL])
+                    .regionName([region name])
+                    .streamName([streamName])
+                    .initialPositionInStream([initial position])
+                    .checkpointAppName([Kinesis app name])
+                    .checkpointInterval([checkpoint interval])
+                    .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+                    .buildWithMessageHandler([message handler]);
 
        </div>
        </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/4f77c062/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
----------------------------------------------------------------------
diff --git 
a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index f14117b..cde2c4b 100644
--- 
a/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ 
b/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
 import scala.util.Random
 
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.regions.RegionUtils
 import com.amazonaws.services.kinesis.AmazonKinesisClient
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 import com.amazonaws.services.kinesis.model.PutRecordRequest
@@ -34,7 +33,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
-import org.apache.spark.streaming.kinesis.KinesisUtils
+import org.apache.spark.streaming.kinesis.KinesisInputDStream
 
 
 /**
@@ -135,8 +134,16 @@ object KinesisWordCountASL extends Logging {
 
     // Create the Kinesis DStreams
     val kinesisStreams = (0 until numStreams).map { i =>
-      KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, 
regionName,
-        InitialPositionInStream.LATEST, kinesisCheckpointInterval, 
StorageLevel.MEMORY_AND_DISK_2)
+      KinesisInputDStream.builder
+        .streamingContext(ssc)
+        .streamName(streamName)
+        .endpointUrl(endpointUrl)
+        .regionName(regionName)
+        .initialPositionInStream(InitialPositionInStream.LATEST)
+        .checkpointAppName(appName)
+        .checkpointInterval(kinesisCheckpointInterval)
+        .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
+        .build()
     }
 
     // Union all the streams


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to