Repository: spark
Updated Branches:
  refs/heads/master 51a79a770 -> cd3b68d93


[SPARK-6128][Streaming][Documentation] Updates to Spark Streaming Programming 
Guide

Updates to the documentation are as follows:

- Added information on Kafka Direct API and Kafka Python API
- Added joins to the main streaming guide
- Improved details on the fault-tolerance semantics

Generated docs located here
http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#fault-tolerance-semantics

More things to add:
- Configuration for Kafka receive rate
- May be add concurrentJobs

Author: Tathagata Das <[email protected]>

Closes #4956 from tdas/streaming-guide-update-1.3 and squashes the following 
commits:

819408c [Tathagata Das] Minor fixes.
debe484 [Tathagata Das] Added DataFrames and MLlib
380cf8d [Tathagata Das] Fix link
04167a6 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into streaming-guide-update-1.3
0b77486 [Tathagata Das] Updates based on Josh's comments.
86c4c2a [Tathagata Das] Updated streaming guides
82de92a [Tathagata Das] Add Kafka to Python api docs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd3b68d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd3b68d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd3b68d9

Branch: refs/heads/master
Commit: cd3b68d93a01f11bd3d5a441b341cb33d227e900
Parents: 51a79a7
Author: Tathagata Das <[email protected]>
Authored: Wed Mar 11 18:48:21 2015 -0700
Committer: Tathagata Das <[email protected]>
Committed: Wed Mar 11 18:48:21 2015 -0700

----------------------------------------------------------------------
 docs/configuration.md               |  14 +-
 docs/streaming-flume-integration.md |   2 +
 docs/streaming-kafka-integration.md | 151 ++++++++--
 docs/streaming-programming-guide.md | 470 +++++++++++++++++++++++++------
 4 files changed, 528 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd3b68d9/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index ae90fe1..a7116fb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1345,9 +1345,9 @@ Apart from these, the following properties are also 
available, and may be useful
 </tr>
 <tr>
   <td><code>spark.streaming.receiver.maxRate</code></td>
-  <td>infinite</td>
+  <td>not set</td>
   <td>
-    Maximum number records per second at which each receiver will receive data.
+    Maximum rate (number of records per second) at which each receiver will 
receive data.
     Effectively, each stream will consume at most this number of records per 
second.
     Setting this configuration to 0 or a negative number will put no limit on 
the rate.
     See the <a 
href="streaming-programming-guide.html#deploying-applications">deployment 
guide</a>
@@ -1375,6 +1375,16 @@ Apart from these, the following properties are also 
available, and may be useful
     higher memory usage in Spark.
   </td>
 </tr>
+<tr>
+  <td><code>spark.streaming.kafka.maxRatePerPartition</code></td>
+  <td>not set</td>
+  <td>
+    Maximum rate (number of records per second) at which data will be read 
from each Kafka
+    partition when using the new Kafka direct stream API. See the
+    <a href="streaming-kafka-integration.html">Kafka Integration guide</a>
+    for more details.
+  </td>
+</tr>
 </table>
 
 #### Cluster Managers

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3b68d9/docs/streaming-flume-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-flume-integration.md 
b/docs/streaming-flume-integration.md
index 40e1724..c8ab146 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
 
 [Apache Flume](https://flume.apache.org/) is a distributed, reliable, and 
available service for efficiently collecting, aggregating, and moving large 
amounts of log data. Here we explain how to configure Flume and Spark Streaming 
to receive data from Flume. There are two approaches to this.
 
+<span class="badge" style="background-color: grey">Python API</span> Flume is 
not yet available in the Python API.
+
 ## Approach 1: Flume-style Push-based Approach
 Flume is designed to push data between Flume agents. In this approach, Spark 
Streaming essentially sets up a receiver that acts an Avro agent for Flume, to 
which Flume can push the data. Here are the configuration steps.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3b68d9/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 77c0abb..64714f0 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -2,58 +2,155 @@
 layout: global
 title: Spark Streaming + Kafka Integration Guide
 ---
-[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Here 
we explain how to configure Spark Streaming to receive data from Kafka.
+[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging 
rethought as a distributed, partitioned, replicated commit log service.  Here 
we explain how to configure Spark Streaming to receive data from Kafka. There 
are two approaches to this - the old approach using Receivers and Kafka's 
high-level API, and a new experimental approach (introduced in Spark 1.3) 
without using Receivers. They have different programming models, performance 
characteristics, and semantics guarantees, so read on for more details.  
 
-1. **Linking:** In your SBT/Maven project definition, link your streaming 
application against the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
+## Approach 1: Receiver-based Approach
+This approach uses a Receiver to receive the data. The Received is implemented 
using the Kafka high-level consumer API. As with all receivers, the data 
received from Kafka through a Receiver is stored in Spark executors, and then 
jobs launched by Spark Streaming processes the data. 
+
+However, under default configuration, this approach can lose data under 
failures (see [receiver 
reliability](streaming-programming-guide.html#receiver-reliability). To ensure 
zero-data loss, you have to additionally enable Write Ahead Logs in Spark 
Streaming. To ensure zero data loss, enable the Write Ahead Logs (introduced in 
Spark 1.2). This synchronously saves all the received Kafka data into write 
ahead logs on a distributed file system (e.g HDFS), so that all the data can be 
recovered on failure. See [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the 
streaming programming guide for more details on Write Ahead Logs.
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** For Scala/Java applications using SBT/Maven project 
definitions, link your streaming application with the following artifact (see 
[Linking section](streaming-programming-guide.html#linking) in the main 
programming guide for further information).
 
                groupId = org.apache.spark
                artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
                version = {{site.SPARK_VERSION_SHORT}}
 
-2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create input DStream as follows.
+       For Python applications, you will have to add this above library and 
its dependencies when deploying your application. See the *Deploying* 
subsection below.
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
 
        <div class="codetabs">
        <div data-lang="scala" markdown="1">
                import org.apache.spark.streaming.kafka._
 
-               val kafkaStream = KafkaUtils.createStream(
-               streamingContext, [zookeeperQuorum], [group id of the 
consumer], [per-topic number of Kafka partitions to consume])
+               val kafkaStream = KafkaUtils.createStream(streamingContext, 
+            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume])
 
-       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
        </div>
        <div data-lang="java" markdown="1">
                import org.apache.spark.streaming.kafka.*;
 
-               JavaPairReceiverInputDStream<String, String> kafkaStream = 
KafkaUtils.createStream(
-               streamingContext, [zookeeperQuorum], [group id of the 
consumer], [per-topic number of Kafka partitions to consume]);
+               JavaPairReceiverInputDStream<String, String> kafkaStream = 
+                       KafkaUtils.createStream(streamingContext,
+            [ZK quorum], [consumer group id], [per-topic number of Kafka 
partitions to consume]);
 
-       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+    You can also specify the key and value classes and their corresponding 
decoder classes using variations of `createStream`. See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
        and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
+
+       </div>
+       <div data-lang="python" markdown="1">
+               from pyspark.streaming.kafka import KafkaUtils
+
+               kafkaStream = KafkaUtils.createStream(streamingContext, \
+                       [ZK quorum], [consumer group id], [per-topic number of 
Kafka partitions to consume])
+
+       By default, the Python API will decode Kafka data as UTF8 encoded 
strings. You can specify your custom decoding function to decode the byte 
arrays in Kafka records to any arbitrary data type. See the [API 
docs](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/kafka_wordcount.py).
 
        </div>
        </div>
 
-       *Points to remember:*
+       **Points to remember:**
 
        - Topic partitions in Kafka does not correlate to partitions of RDDs 
generated in Spark Streaming. So increasing the number of topic-specific 
partitions in the `KafkaUtils.createStream()` only increases the number of 
threads using which topics that are consumed within a single receiver. It does 
not increase the parallelism of Spark in processing the data. Refer to the main 
document for more information on that.
 
        - Multiple Kafka input DStreams can be created with different groups 
and topics for parallel receiving of data using multiple receivers.
 
-3. **Deploying:** Package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies 
(except `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by 
`spark-submit`) into the application JAR. Then use `spark-submit` to launch 
your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide).
-
-Note that the Kafka receiver used by default is an
-[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) 
section in the
-programming guide). In Spark 1.2, we have added an experimental *reliable* 
Kafka receiver that
-provides stronger
-[fault-tolerance 
guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
-data loss on failures. This receiver is automatically used when the write 
ahead log
-(also introduced in Spark 1.2) is enabled
-(see [Deployment](#deploying-applications.html) section in the programming 
guide). This
-may reduce the receiving throughput of individual Kafka receivers compared to 
the unreliable
-receivers, but this can be corrected by running
-[more receivers in 
parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
-to increase aggregate throughput. Additionally, it is recommended that the 
replication of the
-received data within Spark be disabled when the write ahead log is enabled as 
the log is already stored
-in a replicated storage system. This can be done by setting the storage level 
for the input
-stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
+       - If you have enabled Write Ahead Logs with a replicated file system 
like HDFS, the received data is already being replicated in the log. Hence, the 
storage level in storage level for the input stream to 
`StorageLevel.MEMORY_AND_DISK_SER` (that is, use
 `KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).
+
+3. **Deploying:** As with any Spark applications, `spark-submit` is used to 
launch your application. However, the details are slightly different for 
Scala/Java applications and Python applications.
+
+       For Scala and Java applications, if you are using SBT or Maven for 
project management, then package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into 
the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation. Then use 
`spark-submit` to launch your application (see [Deploying 
section](streaming-programming-guide.html#deploying-applications) in the main 
programming guide). 
+
+       For Python applications which lack SBT/Maven project management, 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies can 
be directly added to `spark-submit` using `--packages` (see [Application 
Submission Guide](submitting-applications.html)). That is, 
+
+           ./bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
+
+       Alternatively, you can also download the JAR of the Maven artifact 
`spark-streaming-kafka-assembly` from the 
+       [Maven 
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 and add it to `spark-submit` with `--jars`.
+
+## Approach 2: Direct Approach (No Receivers)
+This is a new receiver-less "direct" approach has been introduced in Spark 1.3 
to ensure stronger end-to-end guarantees. Instead of using receivers to receive 
data, this approach periodically queries Kafka for the latest offsets in each 
topic+partition, and accordingly defines the offset ranges to process in each 
batch. When the jobs to process the data are launched, Kafka's simple consumer 
API is used to read the defined ranges of offsets from Kafka (similar to read 
files from a file system). Note that this is an experimental feature in Spark 
1.3 and is only available in the Scala and Java API.
+
+This approach has the following advantages over the received-based approach 
(i.e. Approach 1).
+
+- *Simplified Parallelism:* No need to create multiple input Kafka streams and 
union-ing them. With `directStream`, Spark Streaming will create as many RDD 
partitions as there is Kafka partitions to consume, which will all read data 
from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD 
partitions, which is easier to understand and tune.
+
+- *Efficiency:* Achieving zero-data loss in the first approach required the 
data to be stored in a Write Ahead Log, which further replicated the data. This 
is actually inefficient as the data effectively gets replicated twice - once by 
Kafka, and a second time by the Write Ahead Log. This second approach eliminate 
the problem as there is no receiver, and hence no need for Write Ahead Logs.
+
+- *Exactly-once semantics:* The first approach uses Kafka's high level API to 
store consumed offsets in Zookeeper. This is traditionally the way to consume 
data from Kafka. While this approach (in combination with write ahead logs) can 
ensure zero data loss (i.e. at-least once semantics), there is a small chance 
some records may get consumed twice under some failures. This occurs because of 
inconsistencies between data reliably received by Spark Streaming and offsets 
tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API 
that does not use Zookeeper and offsets tracked only by Spark Streaming within 
its checkpoints. This eliminates inconsistencies between Spark Streaming and 
Zookeeper/Kafka, and so each record is received by Spark Streaming effectively 
exactly once despite failures.
+
+Note that one disadvantage of this approach is that it does not update offsets 
in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show 
progress. However, you can access the offsets processed by this approach in 
each batch and update Zookeeper yourself (see below).
+
+Next, we discuss how to use this approach in your streaming application.
+
+1. **Linking:** This approach is supported only in Scala/Java application. 
Link your SBT/Maven project with the following artifact (see [Linking 
section](streaming-programming-guide.html#linking) in the main programming 
guide for further information).
+
+               groupId = org.apache.spark
+               artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
+               version = {{site.SPARK_VERSION_SHORT}}
+
+2. **Programming:** In the streaming application code, import `KafkaUtils` and 
create an input DStream as follows.
+
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+               import org.apache.spark.streaming.kafka._
+
+               val directKafkaStream = KafkaUtils.createDirectStream[
+                       [key class], [value class], [key decoder class], [value 
decoder class] ](
+                       streamingContext, [map of Kafka parameters], [set of 
topics to consume])
+
+       See the [API 
docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala).
+       </div>
+       <div data-lang="java" markdown="1">
+               import org.apache.spark.streaming.kafka.*;
+
+               JavaPairReceiverInputDStream<String, String> directKafkaStream 
= 
+                       KafkaUtils.createDirectStream(streamingContext,
+                               [key class], [value class], [key decoder 
class], [value decoder class],
+                               [map of Kafka parameters], [set of topics to 
consume]);
+
+       See the [API 
docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
+       and the 
[example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java).
+
+       </div>
+       </div>
+
+       In the Kafka parameters, you must specify either `metadata.broker.list` 
or `bootstrap.servers`.
+       By default, it will start consuming from the latest offset of each 
Kafka partition. If you set configuration `auto.offset.reset` in Kafka 
parameters to `smallest`, then it will start consuming from the smallest 
offset. 
+
+       You can also start consuming from any arbitrary offset using other 
variations of `KafkaUtils.createDirectStream`. Furthermore, if you want to 
access the Kafka offsets consumed in each batch, you can do the following. 
+
+       <div class="codetabs">
+       <div data-lang="scala" markdown="1">
+               directKafkaStream.foreachRDD { rdd => 
+                   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
+                   // offsetRanges.length = # of Kafka partitions being 
consumed
+                   ...
+               }
+       </div>
+       <div data-lang="java" markdown="1">
+               directKafkaStream.foreachRDD(
+                   new Function<JavaPairRDD<String, String>, Void>() {
+                       @Override
+                       public Void call(JavaPairRDD<String, Integer> rdd) 
throws IOException {
+                           OffsetRange[] offsetRanges = 
((HasOffsetRanges)rdd).offsetRanges
+                               // offsetRanges.length = # of Kafka partitions 
being consumed
+                           ...
+                           return null;
+                       }
+                   }
+               );
+       </div>
+       </div>
+
+       You can use this to update Zookeeper yourself if you want 
Zookeeper-based Kafka monitoring tools to show progress of the streaming 
application.
+
+       Another thing to note is that since this approach does not use 
Receivers, the standard receiver-related (that is, 
[configurations](configuration.html) of the form `spark.streaming.receiver.*` ) 
will not apply to the input DStreams created by this approach (will apply to 
other input DStreams though). Instead, use the 
[configurations](configuration.html) `spark.streaming.kafka.*`. An important 
one is `spark.streaming.kafka.maxRatePerPartition` which is the maximum rate at 
which each Kafka partition will be read by this direct API. 
+
+3. **Deploying:** Similar to the first approach, you can package 
`spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies into 
the application JAR and the launch the application using `spark-submit`. Make 
sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and 
`spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` 
dependencies as those are already present in a Spark installation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/cd3b68d9/docs/streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-programming-guide.md 
b/docs/streaming-programming-guide.md
index 062ac26..6d62296 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -432,7 +432,7 @@ some of the common ones are as follows.
 </table>
 
 For an up-to-date list, please refer to the
-[Apache 
repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
+[Maven 
repository](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
 for the full list of supported sources and artifacts.
 
 ***
@@ -662,8 +662,7 @@ methods for creating DStreams from files and Akka actors as 
input sources.
 
        For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
 
-       <span class="badge" style="background-color: grey">Python API</span>    
As of Spark 1.2,
-       `fileStream` is not available in the Python API, only   
`textFileStream` is     available.
+       <span class="badge" style="background-color: grey">Python API</span> 
`fileStream` is not available in the Python API, only      `textFileStream` is  
   available.
 
 - **Streams based on Custom Actors:** DStreams can be created with data 
streams received through Akka
   actors by using `streamingContext.actorStream(actorProps, actor-name)`. See 
the [Custom Receiver
@@ -682,8 +681,9 @@ for Java, and 
[StreamingContext](api/python/pyspark.streaming.html#pyspark.strea
 
 ### Advanced Sources
 {:.no_toc}
-<span class="badge" style="background-color: grey">Python API</span> As of 
Spark 1.2,
-these sources are not available in the Python API.
+
+<span class="badge" style="background-color: grey">Python API</span> As of 
Spark 1.3,
+out of these sources, *only* Kafka is available in the Python API. We will add 
more advanced sources in the Python API in future.
 
 This category of sources require interfacing with external non-Spark 
libraries, some of them with
 complex dependencies (e.g., Kafka and Flume). Hence, to minimize issues 
related to version conflicts
@@ -723,6 +723,12 @@ and it in the classpath.
 
 Some of these advanced sources are as follows.
 
+- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Kafka 0.8.1.1. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
+
+- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with 
Flume 1.4.0. See the [Flume Integration 
Guide](streaming-flume-integration.html) for more details.
+
+- **Kinesis:** See the [Kinesis Integration 
Guide](streaming-kinesis-integration.html) for more details.
+
 - **Twitter:** Spark Streaming's TwitterUtils uses Twitter4j 3.0.3 to get the 
public stream of tweets using
   [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). 
Authentication information
   can be provided by any of the 
[methods](http://twitter4j.org/en/configuration.html) supported by
@@ -732,17 +738,10 @@ Some of these advanced sources are as follows.
   
([TwitterPopularTags]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala)
   and 
[TwitterAlgebirdCMS]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala)).
 
-- **Flume:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can received data 
from Flume 1.4.0. See the [Flume Integration 
Guide](streaming-flume-integration.html) for more details.
-
-- **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} can receive data 
from Kafka 0.8.0. See the [Kafka Integration 
Guide](streaming-kafka-integration.html) for more details.
-
-- **Kinesis:** See the [Kinesis Integration 
Guide](streaming-kinesis-integration.html) for more details.
-
 ### Custom Sources
 {:.no_toc}
 
-<span class="badge" style="background-color: grey">Python API</span> As of 
Spark 1.2,
-these sources are not available in the Python API.
+<span class="badge" style="background-color: grey">Python API</span> This is 
not yet supported in Python.
 
 Input DStreams can also be created out of custom data sources. All you have to 
do is implement an
 user-defined **receiver** (see next section to understand what that is) that 
can receive data from
@@ -846,7 +845,7 @@ Some of the common ones are as follows.
 <tr><td></td><td></td></tr>
 </table>
 
-The last two transformations are worth highlighting again.
+A few of these transformations are worth discussing in more detail.
 
 #### UpdateStateByKey Operation
 {:.no_toc}
@@ -997,7 +996,7 @@ In fact, you can also use [machine 
learning](mllib-guide.html) and
 
 #### Window Operations
 {:.no_toc}
-Finally, Spark Streaming also provides *windowed computations*, which allow 
you to apply
+Spark Streaming also provides *windowed computations*, which allow you to apply
 transformations over a sliding window of data. This following figure 
illustrates this sliding
 window.
 
@@ -1120,6 +1119,100 @@ said two parameters - <i>windowLength</i> and 
<i>slideInterval</i>.
 <tr><td></td><td></td></tr>
 </table>
 
+#### Join Operations
+{:.no_toc}
+Finally, its worth highlighting how easily you can perform different kinds of 
joins in Spark Streaming.
+
+
+##### Stream-stream joins
+{:.no_toc}
+Streams can be very easily joined with other streams.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream1: DStream[String, String] = ...
+val stream2: DStream[String, String] = ...
+val joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> stream1 = ...
+JavaPairDStream<String, String> stream2 = ...
+JavaPairDStream<String, String> joinedStream = stream1.join(stream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+stream1 = ...
+stream2 = ...
+joinedStream = stream1.join(stream2)
+{% endhighlight %}
+</div>
+</div>
+Here, in each batch interval, the RDD generated by `stream1` will be joined 
with the RDD generated by `stream2`. You can also do `leftOuterJoin`, 
`rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do 
joins over windows of the streams. That is pretty easy as well. 
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val windowedStream1 = stream1.window(Seconds(20))
+val windowedStream2 = stream2.window(Minutes(1))
+val joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairDStream<String, String> windowedStream1 = 
stream1.window(Durations.seconds(20));
+JavaPairDStream<String, String> windowedStream2 = 
stream2.window(Durations.minutes(1));
+JavaPairDStream<String, String> joinedStream = 
windowedStream1.join(windowedStream2);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+windowedStream1 = stream1.window(20)
+windowedStream2 = stream2.window(60)
+joinedStream = windowedStream1.join(windowedStream2)
+{% endhighlight %}
+</div>
+</div>
+
+##### Stream-dataset joins
+{:.no_toc}
+This has already been shown earlier while explain `DStream.transform` 
operation. Here is yet another example of joining a windowed stream with a 
dataset.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val dataset: RDD[String, String] = ...
+val windowedStream = stream.window(Seconds(20))...
+val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+JavaPairRDD<String, String> dataset = ...
+JavaPairDStream<String, String> windowedStream = 
stream.window(Durations.seconds(20));
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(
+    new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, 
String>>>() {
+        @Override 
+        public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, 
String>> rdd) {
+            return rdd.join(dataset);
+        }
+    }
+);
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+dataset = ... # some RDD
+windowedStream = stream.window(20)
+joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
+{% endhighlight %}
+</div>
+</div>
+
+In fact, you can also dynamically change the dataset you want to join against. 
The function provided to `transform` is evaluated every batch interval and 
therefore will use the current dataset that `dataset` reference points to.
 
 The complete list of DStream transformations is available in the API 
documentation. For the Scala API,
 see [DStream](api/scala/index.html#org.apache.spark.streaming.dstream.DStream)
@@ -1327,6 +1420,178 @@ Note that the connections in the pool should be lazily 
created on demand and tim
 
 ***
 
+## DataFrame and SQL Operations
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations 
on streaming data. You have to create a SQLContext using the SparkContext that 
the StreamingContext is using. Furthermore this has to done such that it can be 
restarted on driver failures. This is done by creating a lazily instantiated 
singleton instance of SQLContext. This is shown in the following example. It 
modifies the earlier [word count example](#a-quick-example) to generate word 
counts using DataFrames and SQL. Each RDD is converted to a DataFrame, 
registered as a temporary table and then queried using SQL.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+  @transient private var instance: SQLContext = null
+
+  // Instantiate SQLContext on demand
+  def getInstance(sparkContext: SparkContext): SQLContext = synchronized {
+    if (instance == null) {
+      instance = new SQLContext(sparkContext)
+    }
+    instance
+  }
+}
+
+...
+
+/** Case class for converting RDD to DataFrame */
+case class Row(word: String)
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+val words: DStream[String] = ...
+
+words.foreachRDD { rdd =>
+
+  // Get the singleton instance of SQLContext
+  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+  import sqlContext.implicits._
+
+  // Convert RDD[String] to RDD[case class] to DataFrame
+  val wordsDataFrame = rdd.map(w => Row(w)).toDF()
+
+  // Register as table
+  wordsDataFrame.registerTempTable("words")
+
+  // Do word count on DataFrame using SQL and print it
+  val wordCountsDataFrame = 
+    sqlContext.sql("select word, count(*) as total from words group by word")
+  wordCountsDataFrame.show()
+}
+
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala).
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+  static private transient SQLContext instance = null;
+  static public SQLContext getInstance(SparkContext sparkContext) {
+    if (instance == null) {
+      instance = new SQLContext(sparkContext);
+    }
+    return instance;
+  }
+}
+
+...
+
+/** Java Bean class for converting RDD to DataFrame */
+public class JavaRow implements java.io.Serializable {
+  private String word;
+
+  public String getWord() {
+    return word;
+  }
+
+  public void setWord(String word) {
+    this.word = word;
+  }
+}
+
+...
+
+/** DataFrame operations inside your streaming program */
+
+JavaDStream<String> words = ... 
+
+words.foreachRDD(
+  new Function2<JavaRDD<String>, Time, Void>() {
+    @Override
+    public Void call(JavaRDD<String> rdd, Time time) {
+      SQLContext sqlContext = 
JavaSQLContextSingleton.getInstance(rdd.context());
+
+      // Convert RDD[String] to RDD[case class] to DataFrame
+      JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
+        public JavaRow call(String word) {
+          JavaRow record = new JavaRow();
+          record.setWord(word);
+          return record;
+        }
+      });
+      DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, 
JavaRow.class);
+
+      // Register as table
+      wordsDataFrame.registerTempTable("words");
+
+      // Do word count on table using SQL and print it
+      DataFrame wordCountsDataFrame =
+          sqlContext.sql("select word, count(*) as total from words group by 
word");
+      wordCountsDataFrame.show();
+      return null;
+    }
+  }
+);
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Lazily instantiated global instance of SQLContext
+def getSqlContextInstance(sparkContext):
+    if ('sqlContextSingletonInstance' not in globals()):
+        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+    return globals()['sqlContextSingletonInstance']
+
+...
+
+# DataFrame operations inside your streaming program
+
+words = ... # DStream of strings
+
+def process(time, rdd):
+    print "========= %s =========" % str(time)
+    try:
+        # Get the singleton instance of SQLContext
+        sqlContext = getSqlContextInstance(rdd.context)
+
+        # Convert RDD[String] to RDD[Row] to DataFrame
+        rowRdd = rdd.map(lambda w: Row(word=w))
+        wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+        # Register as table
+        wordsDataFrame.registerTempTable("words")
+
+        # Do word count on table using SQL and print it
+        wordCountsDataFrame = sqlContext.sql("select word, count(*) as total 
from words group by word")
+        wordCountsDataFrame.show()
+    except:
+        pass
+
+words.foreachRDD(process)
+{% endhighlight %}
+
+See the full [source 
code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py).
+
+</div>
+</div>
+
+You can also run SQL queries on tables defined on streaming data from a 
different thread (that is, asynchronous to the running StreamingContext). Just 
make sure that you set the StreamingContext to remember sufficient amount of 
streaming data such that query can run. Otherwise the StreamingContext, which 
is unaware of the any asynchronous SQL queries, will delete off old streaming 
data before the query can complete. For example, if you want to query the last 
batch, but your query can take 5 minutes to run, then call 
`streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other 
languages).
+
+See the [DataFrames and SQL](sql-programming-guide.html) guide to learn more 
about DataFrames.
+
+***
+
+## MLlib Operations
+You can also easily use machine learning algorithms provided by 
[MLlib](mllib-guide.html). First of all, there are streaming machine learning 
algorithms (e.g. (Streaming Linear 
Regression](mllib-linear-methods.html#streaming-linear-regression), [Streaming 
KMeans](mllib-clustering.html#streaming-k-means), etc.) which can 
simultaneously learn from the streaming data as well as apply the model on the 
streaming data. Beyond these, for a much larger class of machine learning 
algorithms, you can learn a learning model offline (i.e. using historical data) 
and then apply the model online on streaming data. See the 
[MLlib](mllib-guide.html) guide for more details.
+
+***
+
 ## Caching / Persistence
 Similar to RDDs, DStreams also allow developers to persist the stream's data 
in memory. That is,
 using `persist()` method on a DStream will automatically persist every RDD of 
that DStream in
@@ -1580,9 +1845,8 @@ To run a Spark Streaming applications, you need to have 
the following.
     + *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been 
used to achieve this
       with Mesos.
 
-
-- *[Experimental in Spark 1.2] Configuring write ahead logs* - In Spark 1.2,
-  we have introduced a new experimental feature of write ahead logs for 
achieving strong
+- *[Since Spark 1.2] Configuring write ahead logs* - Since Spark 1.2,
+  we have introduced _write ahead logs_ for achieving strong
   fault-tolerance guarantees. If enabled,  all the data received from a 
receiver gets written into
   a write ahead log in the configuration checkpoint directory. This prevents 
data loss on driver
   recovery, thus ensuring zero data loss (discussed in detail in the
@@ -1668,7 +1932,7 @@ improve the performance of you application. At a high 
level, you need to conside
 2. Setting the right batch size such that the batches of data can be processed 
as fast as they
        are received (that is, data processing keeps up with the data 
ingestion).
 
-## Reducing the Processing Time of each Batch
+## Reducing the Batch Processing Times
 There are a number of optimizations that can be done in Spark to minimize the 
processing time of
 each batch. These have been discussed in detail in [Tuning 
Guide](tuning.html). This section
 highlights some of the most important ones.
@@ -1740,16 +2004,15 @@ documentation), or set the `spark.default.parallelism`
 
 ### Data Serialization
 {:.no_toc}
-The overhead of data serialization can be significant, especially when 
sub-second batch sizes are
- to be achieved. There are two aspects to it.
+The overheads of data serialization can be reduce by tuning the serialization 
formats. In case of streaming, there are two types of data that are being 
serialized.
+
+* **Input data**: By default, the input data received through Receivers is 
stored in the executors' memory with 
[StorageLevel.MEMORY_AND_DISK_SER_2](api/scala/index.html#org.apache.spark.storage.StorageLevel$).
 That is, the data is serialized into bytes to reduce GC overheads, and 
replicated for tolerating executor failures. Also, the data is kept first in 
memory, and spilled over to disk only if the memory is unsufficient to hold all 
the input data necessary for the streaming computation. This serialization 
obviously has overheads -- the receiver must deserialize the received data and 
re-serialize it using Spark's serialization format. 
 
-* **Serialization of RDD data in Spark**: Please refer to the detailed 
discussion on data
-  serialization in the [Tuning Guide](tuning.html). However, note that unlike 
Spark, by default
-  RDDs are persisted as serialized byte arrays to minimize pauses related to 
GC.
+* **Persisted RDDs generated by Streaming Operations**: RDDs generated by 
streaming computations may be persisted in memory. For example, window 
operation persist data in memory as they would be processed multiple times. 
However, unlike Spark, by default RDDs are persisted with 
[StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$)
 (i.e. serialized) to minimize GC overheads.
 
-* **Serialization of input data**: To ingest external data into Spark, data 
received as bytes
-  (say, from the network) needs to deserialized from bytes and re-serialized 
into Spark's
-  serialization format. Hence, the deserialization overhead of input data may 
be a bottleneck.
+In both cases, using Kryo serialization can reduce both CPU and memory 
overheads. See the [Spark Tuning Guide](tuning.html#data-serialization)) for 
more details. Consider registering custom classes, and disabling object 
reference tracking for Kryo (see Kryo-related configurations in the 
[Configuration Guide](configuration.html#compression-and-serialization)).
+
+In specific cases where the amount of data that needs to be retained for the 
streaming application is not large, it may be feasible to persist data (both 
types) as deserialized objects without incurring excessive GC overheads. For 
example, if you are using batch intervals of few seconds and no window 
operations, then you can try disabling serialization in persisted data by 
explicitly setting the storage level accordingly. This would reduce the CPU 
overheads due to serialization, potentially improving performance without too 
much GC overheads.
 
 ### Task Launching Overheads
 {:.no_toc}
@@ -1769,7 +2032,7 @@ thus allowing sub-second batch size to be viable.
 
 ***
 
-## Setting the Right Batch Size
+## Setting the Right Batch Interval
 For a Spark Streaming application running on a cluster to be stable, the 
system should be able to
 process data as fast as it is being received. In other words, batches of data 
should be processed
 as fast as they are being generated. Whether this is true for an application 
can be found by
@@ -1801,40 +2064,40 @@ temporary data rate increases maybe fine as long as the 
delay reduces back to a
 
 ## Memory Tuning
 Tuning the memory usage and GC behavior of Spark applications have been 
discussed in great detail
-in the [Tuning Guide](tuning.html). It is recommended that you read that. In 
this section,
-we highlight a few customizations that are strongly recommended to minimize GC 
related pauses
-in Spark Streaming applications and achieving more consistent batch processing 
times.
-
-* **Default persistence level of DStreams**: Unlike RDDs, the default 
persistence level of DStreams
-serializes the data in memory (that is,
-[StorageLevel.MEMORY_ONLY_SER](api/scala/index.html#org.apache.spark.storage.StorageLevel$)
 for
-DStream compared to
-[StorageLevel.MEMORY_ONLY](api/scala/index.html#org.apache.spark.storage.StorageLevel$)
 for RDDs).
-Even though keeping the data serialized incurs higher 
serialization/deserialization overheads,
-it significantly reduces GC pauses.
-
-* **Clearing persistent RDDs**: By default, all persistent RDDs generated by 
Spark Streaming will
- be cleared from memory based on Spark's built-in policy (LRU). If 
`spark.cleaner.ttl` is set,
- then persistent RDDs that are older than that value are periodically cleared. 
As mentioned
- [earlier](#operation), this needs to be careful set based on operations used 
in the Spark
- Streaming program. However, a smarter unpersisting of RDDs can be enabled by 
setting the
- [configuration property](configuration.html#spark-properties) 
`spark.streaming.unpersist` to
- `true`. This makes the system to figure out which RDDs are not necessary to 
be kept around and
- unpersists them. This is likely to reduce
- the RDD memory usage of Spark, potentially improving GC behavior as well.
-
-* **Concurrent garbage collector**: Using the concurrent mark-and-sweep GC 
further
-minimizes the variability of GC pauses. Even though concurrent GC is known to 
reduce the
+in the [Tuning Guide](tuning.html#memory-tuning). It is strongly recommended 
that you read that. In this section, we discuss a few tuning parameters 
specifically in the context of Spark Streaming applications.
+
+The amount of cluster memory required by a Spark Streaming application depends 
heavily on the type of transformations used. For example, if you want to use a 
window operation on last 10 minutes of data, then your cluster should have 
sufficient memory to hold 10 minutes of worth of data in memory. Or if you want 
to use `updateStateByKey` with a large number of keys, then the necessary 
memory  will be high. On the contrary, if you want to do a simple 
map-filter-store operation, then necessary memory will be low.
+
+In general, since the data received through receivers are stored with 
StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will 
spill over to the disk. This may reduce the performance of the streaming 
application, and hence it is advised to provide sufficient memory as required 
by your streaming application. Its best to try and see the memory usage on a 
small scale and estimate accordingly. 
+
+Another aspect of memory tuning is garbage collection. For a streaming 
application that require low latency, it is undesirable to have large pauses 
caused by JVM Garbage Collection. 
+
+There are a few parameters that can help you tune the memory usage and GC 
overheads.
+
+* **Persistence Level of DStreams**: As mentioned earlier in the [Data 
Serialization](#data-serialization) section, the input data and RDDs are by 
default persisted as serialized bytes. This reduces both, the memory usage and 
GC overheads, compared to deserialized persistence. Enabling Kryo serialization 
further reduces serialized sizes and memory usage. Further reduction in memory 
usage can be achieved with compression (see the Spark configuration 
`spark.rdd.compress`), at the cost of CPU time.
+
+* **Clearing old data**: By default, all input data and persisted RDDs 
generated by DStream transformations are automatically cleared. Spark Streaming 
decides when to clear the data based on the transformations that are used. For 
example, if you are using window operation of 10 minutes, then Spark Streaming 
will keep around last 10 minutes of data, and actively throw away older data. 
+Data can be retained for longer duration (e.g. interactively querying older 
data) by setting `streamingContext.remember`.
+
+* **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is 
strongly recommended for keeping GC-related pauses consistently low. Even 
though concurrent GC is known to reduce the
 overall processing throughput of the system, its use is still recommended to 
achieve more
-consistent batch processing times.
+consistent batch processing times. Make sure you set the CMS GC on both the 
driver (using `--driver-java-options` in `spark-submit`) and the executors 
(using [Spark configuration](configuration.html#runtime-environment) 
`spark.executor.extraJavaOptions`).
+
+* **Other tips**: To further reduce GC overheads, here are some more tips to 
try.
+    - Use Tachyon for off-heap storage of persisted RDDs. See more detail in 
the [Spark Programming Guide](programming-guide.html#rdd-persistence).
+    - Use more executors with smaller heap sizes. This will reduce the GC 
pressure within each JVM heap.
+
 
 
***************************************************************************************************
 
***************************************************************************************************
 
 # Fault-tolerance Semantics
 In this section, we will discuss the behavior of Spark Streaming applications 
in the event
-of node failures. To understand this, let us remember the basic 
fault-tolerance semantics of
-Spark's RDDs.
+of failures. 
+
+## Background
+{:.no_toc}
+To understand the semantics provided by Spark Streaming, let us remember the 
basic fault-tolerance semantics of Spark's RDDs.
 
 1. An RDD is an immutable, deterministically re-computable, distributed 
dataset. Each RDD
 remembers the lineage of deterministic operations that were used on a 
fault-tolerant input
@@ -1868,13 +2131,43 @@ Furthermore, there are two kinds of failures that we 
should be concerned about:
 
 With this basic knowledge, let us understand the fault-tolerance semantics of 
Spark Streaming.
 
-## Semantics with files as input source
+## Definitions
+{:.no_toc}
+The semantics of streaming systems are often captured in terms of how many 
times each record can be processed by the system. There are three types of 
guarantees that a system can provide under all possible operating conditions 
(despite failures, etc.)
+
+1. *At most once*: Each record will be either processed once or not processed 
at all.
+2. *At least once*: Each record will be processed one or more times. This is 
stronger than *at-most once* as it ensure that no data will be lost. But there 
may be duplicates.
+3. *Exactly once*: Each record will be processed exactly once - no data will 
be lost and no data will be processed multiple times. This is obviously the 
strongest guarantee of the three.
+
+## Basic Semantics
+{:.no_toc}
+In any stream processing system, broadly speaking, there are three steps in 
processing the data.
+
+1. *Receiving the data*: The data is received from sources using Receivers or 
otherwise.
+
+1. *Transforming the data*: The data received data is transformed using 
DStream and RDD transformations.
+
+1. *Pushing out the data*: The final transformed data is pushed out to 
external systems like file systems, databases, dashboards, etc.
+
+If a streaming application has to achieve end-to-end exactly-once guarantees, 
then each step has to provide exactly-once guarantee. That is, each record must 
be received exactly once, transformed exactly once, and pushed to downstream 
systems exactly once. Let's understand the semantics of these steps in the 
context of Spark Streaming.
+
+1. *Receiving the data*: Different input sources provided different 
guarantees. This is discussed in detail in the next subsection.
+
+1. *Transforming the data*: All data that has been received will be processed 
_exactly once_, thanks to the guarantees that RDDs provide. Even if there are 
failures, as long as the received input data is accessible, the final 
transformed RDDs will always have the same contents.
+
+1. *Pushing out the data*: Output operations by default ensure _at-least once_ 
semantics because it depends on the type of output operation (idempotent, or 
not) and the semantics of the downstream system (supports transactions or not). 
But users can implement their own transaction mechanisms to achieve 
_exactly-once_ semantics. This is discussed in more details later in the 
section.
+
+## Semantics of Received Data
+{:.no_toc}
+Different input sources provide different guarantees, ranging from _at-least 
once_ to _exactly once_. Read for more details.
+
+### With Files
 {:.no_toc}
 If all of the input data is already present in a fault-tolerant files system 
like
 HDFS, Spark Streaming can always recover from any failure and process all the 
data. This gives
 *exactly-once* semantics, that all the data will be processed exactly once no 
matter what fails.
 
-## Semantics with input sources based on receivers
+### With Receiver-based Sources
 {:.no_toc}
 For input sources based on receivers, the fault-tolerance semantics depend on 
both the failure
 scenario and the type of receiver.
@@ -1893,10 +2186,9 @@ receivers, data received but not replicated can get 
lost. If the driver node fai
 then besides these losses, all the past data that was received and replicated 
in memory will be
 lost. This will affect the results of the stateful transformations.
 
-To avoid this loss of past received data, Spark 1.2 introduces an experimental 
feature of _write
+To avoid this loss of past received data, Spark 1.2 introduced _write
 ahead logs_ which saves the received data to fault-tolerant storage. With the 
[write ahead logs
-enabled](#deploying-applications) and reliable receivers, there is zero data 
loss and
-exactly-once semantics.
+enabled](#deploying-applications) and reliable receivers, there is zero data 
loss. In terms of semantics, it provides at-least once guarantee. 
 
 The following table summarizes the semantics under failures:
 
@@ -1908,23 +2200,30 @@ The following table summarizes the semantics under 
failures:
   </tr>
   <tr>
     <td>
-      <b>Spark 1.1 or earlier, or</b><br/>
-      <b>Spark 1.2 without write ahead log</b>
+      <i>Spark 1.1 or earlier,</i> OR<br/>
+      <i>Spark 1.2 or later without write ahead logs</i>
     </td>
     <td>
       Buffered data lost with unreliable receivers<br/>
-      Zero data loss with reliable receivers and files<br/>
+      Zero data loss with reliable receivers<br/>
+      At-least once semantics
     </td>
     <td>
       Buffered data lost with unreliable receivers<br/>
       Past data lost with all receivers<br/>
-      Zero data loss with files
-      </td>
+      Undefined semantics
+    </td>
   </tr>
   <tr>
-    <td><b>Spark 1.2 with write ahead log</b></td>
-    <td>Zero data loss with reliable receivers and files</td>
-    <td>Zero data loss with reliable receivers and files</td>
+    <td><i>Spark 1.2 or later with write ahead logs</i></td>
+    <td>
+        Zero data loss with reliable receivers<br/>
+        At-least once semantics
+    </td>
+    <td>
+        Zero data loss with reliable receivers and files<br/>
+        At-least once semantics
+    </td>
   </tr>
   <tr>
     <td></td>
@@ -1933,17 +2232,24 @@ The following table summarizes the semantics under 
failures:
   </tr>
 </table>
 
+### With Kafka Direct API
+{:.no_toc}
+In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that 
all the Kafka data is received by Spark Streaming exactly once. Along with 
this, if you implement exactly-once output operation, you can achieve 
end-to-end exactly-once guarantees. This approach (experimental as of Spark 
1.3) is further discussed in the [Kafka Integration 
Guide](streaming-kafka-integration.html).
+
 ## Semantics of output operations
 {:.no_toc}
-Since all data is modeled as RDDs with their lineage of deterministic 
operations, any recomputation
- always leads to the same result. As a result, all DStream transformations are 
guaranteed to have
- _exactly-once_ semantics. That is, the final transformed result will be same 
even if there were
- was a worker node failure. However, output operations (like `foreachRDD`) 
have _at-least once_
- semantics, that is, the transformed data may get written to an external 
entity more than once in
- the event of a worker failure. While this is acceptable for saving to HDFS 
using the
- `saveAs***Files` operations (as the file will simply get over-written by the 
same data),
- additional transactions-like mechanisms may be necessary to achieve 
exactly-once semantics
- for output operations.
+Output operations (like `foreachRDD`) have _at-least once_ semantics, that is, 
+the transformed data may get written to an external entity more than once in
+the event of a worker failure. While this is acceptable for saving to file 
systems using the
+`saveAs***Files` operations (as the file will simply get overwritten with the 
same data),
+additional effort may be necessary to achieve exactly-once semantics. There 
are two approaches.
+
+- *Idempotent updates*: Multiple attempts always write the same data. For 
example, `saveAs***Files` always writes the same data to the generated files.
+
+- *Transactional updates*: All updates are made transactionally so that 
updates are made exactly once atomically. One way to do this would be the 
following.
+
+    - Use the batch time (available in `foreachRDD`) and the partition index 
of the transformed RDD to create an identifier. This identifier uniquely 
identifies a blob data in the streaming application.
+    - Update external system with this blob transactionally (that is, exactly 
once, atomically) using the identifier. That is, if the identifier is not 
already committed, commit the partition data and the identifier atomically. 
Else if this was already committed, skip the update. 
 
 
 
***************************************************************************************************
@@ -2001,7 +2307,11 @@ package and renamed for better clarity.
 
***************************************************************************************************
 
 # Where to Go from Here
-
+* Additional guides
+    - [Kafka Integration Guide](streaming-kafka-integration.html)
+    - [Flume Integration Guide](streaming-flume-integration.html)
+    - [Kinesis Integration Guide](streaming-kinesis-integration.html)
+    - [Custom Receiver Guide](streaming-custom-receivers.html)
 * API documentation
   - Scala docs
     * 
[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext)
 and
@@ -2023,8 +2333,8 @@ package and renamed for better clarity.
     
[ZeroMQUtils](api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html),
 and
     
[MQTTUtils](api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html)
   - Python docs
-    * 
[StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
-    * [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+    * 
[StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext)
 and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
+    * 
[KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
 
 * More examples in 
[Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
   and 
[Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to