Repository: spark
Updated Branches:
  refs/heads/branch-1.4 760837341 -> 7c11ccf39


[SPARK-7284] [STREAMING] Updated streaming documentation

- Kinesis API updated
- Kafka version updated, and Python API for Direct Kafka added
- Added SQLContext.getOrCreate()
- Added information on how to get partitionId in foreachRDD

Author: Tathagata Das <[email protected]>

Closes #6781 from tdas/SPARK-7284 and squashes the following commits:

aac7be0 [Tathagata Das] Added information on how to get partition id
a66ec22 [Tathagata Das] Complete the line incomplete line,
a92ca39 [Tathagata Das] Updated streaming documentation

(cherry picked from commit e9471d3414d327c7d0853e18f1844ab1bd09c8ed)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c11ccf3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c11ccf3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c11ccf3

Branch: refs/heads/branch-1.4
Commit: 7c11ccf3913ac6a5d178994704d8b0983829b43b
Parents: 7608373
Author: Tathagata Das <[email protected]>
Authored: Fri Jun 12 15:22:59 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Fri Jun 12 15:23:09 2015 -0700

----------------------------------------------------------------------
 docs/streaming-kafka-integration.md   | 12 ++++-
 docs/streaming-kinesis-integration.md | 24 ++++++----
 docs/streaming-programming-guide.md   | 70 ++++++++++--------------------
 3 files changed, 50 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c11ccf3/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index d6d5605..eaec6a4 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -119,6 +119,13 @@ Next, we discuss how to use this approach in your 
streaming application.
        and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
 
        </div>
+       <div data-lang="python" markdown="1">
+               from pyspark.streaming.kafka import KafkaUtils
+               directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})
+
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py).
+       </div>
        </div>
 
        In the Kafka parameters, you must specify either `metadata.broker.list` 
or `bootstrap.servers`.
@@ -147,10 +154,13 @@ Next, we discuss how to use this approach in your 
streaming application.
                    }
                );
        </div>
+       <div data-lang="python" markdown="1">
+       Not supported
        </div>
+       </div>
 
        You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
 
        Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at 
which each Kafka partition will be read by this direct API. 
 
-3. **Deploying:** Similar to the first approach, you can package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into 
the application JAR and the launch the application using `spark-submit`. Make 
sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation.
+3. **Deploying:** This is same as the first approach, for Scala, Java and 
Python.

http://git-wip-us.apache.org/repos/asf/spark/blob/7c11ccf3/docs/streaming-kinesis-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kinesis-integration.md 
b/docs/streaming-kinesis-integration.md
index 379eb51..aa9749a 100644
--- a/docs/streaming-kinesis-integration.md
+++ b/docs/streaming-kinesis-integration.md
@@ -32,7 +32,8 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
 
                val kinesisStream = KinesisUtils.createStream(
-               streamingContext, [Kinesis stream name], [endpoint URL], 
[checkpoint interval], [initial position])
+                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
+                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2)
 
        See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala).
 Refer to the Running the Example section for instructions on how to run the 
example.
@@ -44,7 +45,8 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
                import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 
                JavaReceiverInputDStream<byte[]> kinesisStream = 
KinesisUtils.createStream(
-               streamingContext, [Kinesis stream name], [endpoint URL], 
[checkpoint interval], [initial position]);
+                       streamingContext, [Kinesis app name], [Kinesis stream 
name], [endpoint URL],
+                       [region name], [initial position], [checkpoint 
interval], StorageLevel.MEMORY_AND_DISK_2);
 
        See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java).
 Refer to the next subsection for instructions to run the example.
@@ -54,19 +56,23 @@ A Kinesis stream can be set up at one of the valid Kinesis 
endpoints with 1 or m
 
     - `streamingContext`: StreamingContext containg an application name used 
by Kinesis to tie this Kinesis application to the Kinesis stream
 
-       - `[Kinesis stream name]`: The Kinesis stream that this streaming 
application receives from
-               - The application name used in the streaming context becomes 
the Kinesis application name
+       - `[Kineiss app name]`: The application name that will be used to 
checkpoint the Kinesis
+               sequence numbers in DynamoDB table.
                - The application name must be unique for a given account and 
region.
-               - The Kinesis backend automatically associates the application 
name to the Kinesis stream using a DynamoDB table (always in the us-east-1 
region) created during Kinesis Client Library initialization. 
-               - Changing the application name or stream name can lead to 
Kinesis errors in some cases.  If you see errors, you may need to manually 
delete the DynamoDB table.
+               - If the table exists but has incorrect checkpoint information 
(for a different stream, or
+                 old expired sequenced numbers), then there may be temporary 
errors.
 
+       - `[Kinesis stream name]`: The Kinesis stream that this streaming 
application will pull data from.
 
        - `[endpoint URL]`: Valid Kinesis endpoints URL can be found 
[here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
 
+       - `[region name]`: Valid Kinesis region names can be found 
[here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
+
        - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 
seconds) at which the Kinesis Client Library saves its position in the stream.  
For starters, set it to the same as the batch interval of the streaming 
application.
 
        - `[initial position]`: Can be either 
`InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see 
Kinesis Checkpointing section and Amazon Kinesis API documentation for more 
details).
 
+       In other versions of the API, you can also specify the AWS access key 
and secret key directly.
 
 3. **Deploying:** Package 
`spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by 
`spark-submit`) into the application JAR. Then use `spark-submit` to launch 
your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
 
@@ -122,12 +128,12 @@ To run the example,
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
 
-       bin/run-example streaming.KinesisWordCountASL [Kinesis stream name] 
[endpoint URL]
+        bin/run-example streaming.KinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
 
        </div>
        <div data-lang="java" markdown="1">
 
-        bin/run-example streaming.JavaKinesisWordCountASL [Kinesis stream 
name] [endpoint URL]
+        bin/run-example streaming.JavaKinesisWordCountASL [Kinesis app name] 
[Kinesis stream name] [endpoint URL]
 
        </div>
        </div>
@@ -136,7 +142,7 @@ To run the example,
 
 - To generate random string data to put onto the Kinesis stream, in another 
terminal, run the associated Kinesis data producer.
 
-               bin/run-example streaming.KinesisWordCountProducerASL [Kinesis 
stream name] [endpoint URL] 1000 10
+               bin/run-example streaming.KinesisWordProducerASL [Kinesis 
stream name] [endpoint URL] 1000 10
 
        This will push 1000 lines per second of 10 random numbers per line to 
the Kinesis stream.  This data should then be received and processed by the 
running example.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c11ccf3/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 42b3394..836f047 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -77,7 +77,7 @@ main entry point for all streaming functionality. We create a 
local StreamingCon
 {% highlight scala %}
 import org.apache.spark._
 import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 
1.3+
+import org.apache.spark.streaming.StreamingContext._ // not necessary since 
Spark 1.3
 
 // Create a local StreamingContext with two working thread and batch interval 
of 1 second.
 // The master requires 2 cores to prevent from a starvation scenario.
@@ -109,7 +109,7 @@ each line will be split into multiple words and the stream 
of words is represent
 `words` DStream.  Next, we want to count these words.
 
 {% highlight scala %}
-import org.apache.spark.streaming.StreamingContext._ // not necessary in Spark 
1.3+
+import org.apache.spark.streaming.StreamingContext._ // not necessary since 
Spark 1.3
 // Count each word in each batch
 val pairs = words.map(word => (word, 1))
 val wordCounts = pairs.reduceByKey(_ + _)
@@ -682,7 +682,7 @@ for Java, and 
[StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
 ### Advanced Sources
 {:.no_toc}
 
-<span class="badge" style="background-color: grey">Python API</span> As of 
Spark 1.3,
+<span class="badge" style="background-color: grey">Python API</span> As of 
Spark {{site.SPARK_VERSION_SHORT}},
 out of these sources, *only* Kafka is available in the Python API. We will add 
more advanced sources in the Python API in future.
 
 This category of sources require interfacing with external non-Spark 
libraries, some of them with
@@ -723,7 +723,7 @@ and it in the classpath.
 
 Some of these advanced sources are as follows.
 
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka 0.8.1.1. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka 0.8.2.1. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
 
 - **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Flume 1.4.0. See the [Flume Integration 
Guide](streaming-flume-integration.html) for more details.
 
@@ -991,8 +991,9 @@ cleanedDStream = wordCounts.transform(lambda rdd: 
rdd.join(spamInfoRDD).filter(.
 </div>
 </div>
 
-In fact, you can also use [machine learning](mllib-guide.html) and
-[graph computation](graphx-programming-guide.html) algorithms in the 
`transform` method.
+Note that the supplied function gets called in every batch interval. This 
allows you to do
+time-varying RDD operations, that is, RDD operations, number of partitions, 
broadcast variables,
+etc. can be changed between batches.
 
 #### Window Operations
 {:.no_toc}
@@ -1427,26 +1428,6 @@ You can easily use [DataFrames and 
SQL](sql-programming-guide.html) operations o
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 
-/** Lazily instantiated singleton instance of SQLContext */
-object SQLContextSingleton {
-  @transient private var instance: SQLContext = null
-
-  // Instantiate SQLContext on demand
-  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
-    if (instance == null) {
-      instance = new SQLContext(sparkContext)
-    }
-    instance
-  }
-}
-
-...
-
-/** Case class for converting RDD to DataFrame */
-case class Row(word: String)
-
-...
-
 /** DataFrame operations inside your streaming program */
 
 val words: DStream[String] = ...
@@ -1454,11 +1435,11 @@ val words: DStream[String] = ...
 words.foreachRDD { rdd =>
 
   // Get the singleton instance of SQLContext
-  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
   import sqlContext.implicits._
 
-  // Convert RDD[String] to RDD[case class] to DataFrame
-  val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+  // Convert RDD[String] to DataFrame
+  val wordsDataFrame = rdd.toDF("word")
 
   // Register as table
   wordsDataFrame.registerTempTable("words")
@@ -1476,19 +1457,6 @@ See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
 <div data-lang="java" markdown="1">
 {% highlight java %}
 
-/** Lazily instantiated singleton instance of SQLContext */
-class JavaSQLContextSingleton {
-  static private transient SQLContext instance = null;
-  static public SQLContext getInstance(SparkContext sparkContext) {
-    if (instance == null) {
-      instance = new SQLContext(sparkContext);
-    }
-    return instance;
-  }
-}
-
-...
-
 /** Java Bean class for converting RDD to DataFrame */
 public class JavaRow implements java.io.Serializable {
   private String word;
@@ -1512,7 +1480,9 @@ words.foreachRDD(
   new Function2<JavaRDD<String>, Time, Void>() {
     @Override
     public Void call(JavaRDD<String> rdd, Time time) {
-      SQLContext sqlContext = 
JavaSQLContextSingleton.getInstance(rdd.context());
+
+      // Get the singleton instance of SQLContext
+      SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
 
       // Convert RDD[String] to RDD[case class] to DataFrame
       JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
@@ -2234,7 +2204,7 @@ The following table summarizes the semantics under 
failures:
 
 ### With Kafka Direct API
 {:.no_toc}
-In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach (experimental as of Spark 
1.3) is further discussed in the [Kafka Integration 
Guide](streaming-kafka-integration.html).
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach (experimental as of Spark 
{{site.SPARK_VERSION_SHORT}}) is further discussed in the [Kafka Integration 
Guide](streaming-kafka-integration.html).
 
 ## Semantics of output operations
 {:.no_toc}
@@ -2248,8 +2218,16 @@ additional effort may be necessary to achieve 
exactly-once semantics. There are
 
 - *Transactional updates*: All updates are made transactionally so that 
updates are made exactly once atomically. One way to do this would be the 
following.
 
-    - Use the batch time (available in `foreachRDD`) and the partition index 
of the transformed RDD to create an identifier. This identifier uniquely 
identifies a blob data in the streaming application.
-    - Update external system with this blob transactionally (that is, exactly 
once, atomically) using the identifier. That is, if the identifier is not 
already committed, commit the partition data and the identifier atomically. 
Else if this was already committed, skip the update. 
+    - Use the batch time (available in `foreachRDD`) and the partition index 
of the RDD to create an identifier. This identifier uniquely identifies a blob 
data in the streaming application.
+    - Update external system with this blob transactionally (that is, exactly 
once, atomically) using the identifier. That is, if the identifier is not 
already committed, commit the partition data and the identifier atomically. 
Else if this was already committed, skip the update.
+
+          dstream.foreachRDD { (rdd, time) =>
+            rdd.foreachPartition { partitionIterator =>
+              val partitionId = TaskContext.get.partitionId()
+              val uniqueId = generateUniqueId(time.milliseconds, partitionId)
+              // use this uniqueId to transactionally commit the data in 
partitionIterator
+            }
+          }
 
 
 
***************************************************************************************************


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to