Repository: spark
Updated Branches:
  refs/heads/master 6d398c05c -> 601d653bf


[SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming 
programming guide

## What changes were proposed in this pull request?

- Added clear information about triggers
- Made the semantics guarantees of watermarks more clear for streaming 
aggregations and stream-stream joins.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #20631 from tdas/SPARK-23454.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/601d653b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/601d653b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/601d653b

Branch: refs/heads/master
Commit: 601d653bff9160db8477f86d961e609fc2190237
Parents: 6d398c0
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Feb 20 18:16:10 2018 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Feb 20 18:16:10 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 214 +++++++++++++++++++-
 1 file changed, 207 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/601d653b/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 48d6d0b..9a83f15 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -904,7 +904,7 @@ windowedCounts <- count(
 </div>
 
 
-### Handling Late Data and Watermarking
+#### Handling Late Data and Watermarking
 Now consider what happens if one of the events arrives late to the application.
 For example, say, a word generated at 12:04 (i.e. event time) could be 
received by 
 the application at 12:11. The application should use the time 12:04 instead of 
12:11
@@ -925,7 +925,9 @@ specifying the event time column and the threshold on how 
late the data is expec
 event time. For a specific window starting at time `T`, the engine will 
maintain state and allow late
 data to update the state until `(max event time seen by the engine - late 
threshold > T)`. 
 In other words, late data within the threshold will be aggregated, 
-but data later than the threshold will be dropped. Let's understand this with 
an example. We can 
+but data later than the threshold will start getting dropped
+(see [later]((#semantic-guarantees-of-aggregation-with-watermarking))
+in the section for the exact guarantees). Let's understand this with an 
example. We can
 easily define watermarking on the previous example using `withWatermark()` as 
shown below.
 
 <div class="codetabs">
@@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, 
and appends the final
 counts to the Result Table/sink. For example, the final counts of window 
`12:00 - 12:10` is 
 appended to the Result Table only after the watermark is updated to `12:11`. 
 
-**Conditions for watermarking to clean aggregation state**
+##### Conditions for watermarking to clean aggregation state
+{:.no_toc}
+
 It is important to note that the following conditions must be satisfied for 
the watermarking to 
 clean the state in aggregation queries *(as of Spark 2.1.1, subject to change 
in the future)*.
 
@@ -1051,6 +1055,16 @@ from the aggregation column.
 For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is 
invalid in Append 
 output mode.
 
+##### Semantic Guarantees of Aggregation with Watermarking
+{:.no_toc}
+
+- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that 
the engine will never
+drop any data that is less than 2 hours delayed. In other words, any data less 
than 2 hours behind
+(in terms of event-time) the latest data processed till then is guaranteed to 
be aggregated.
+
+- However, the guarantee is strict only in one direction. Data delayed by more 
than 2 hours is
+not guaranteed to be dropped; it may or may not get aggregated. More delayed 
is the data, less
+likely is the engine going to process it.
 
 ### Join Operations
 Structured Streaming supports joining a streaming Dataset/DataFrame with a 
static Dataset/DataFrame
@@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it 
was with a static Datase
 containing the same data in the stream.
 
 
-#### Stream-static joins
+#### Stream-static Joins
 
 Since the introduction in Spark 2.0, Structured Streaming has supported joins 
(inner join and some
 type of outer joins) between a streaming and a static DataFrame/Dataset. Here 
is a simple example.
@@ -1269,6 +1283,12 @@ joined <- join(
 </div>
 </div>
 
+###### Semantic Guarantees of Stream-stream Inner Joins with Watermarking
+{:.no_toc}
+This is similar to the [guarantees provided by watermarking on 
aggregations](#semantic-guarantees-of-aggregation-with-watermarking).
+A watermark delay of "2 hours" guarantees that the engine will never drop any 
data that is less than
+ 2 hours delayed. But data delayed by more than 2 hours may or may not get 
processed.
+
 ##### Outer Joins with Watermarking
 While the watermark + event-time constraints is optional for inner joins, for 
left and right outer
 joins they must be specified. This is because for generating the NULL results 
in outer join, the
@@ -1347,7 +1367,14 @@ joined <- join(
 </div>
 
 
-There are a few points to note regarding outer joins.
+###### Semantic Guarantees of Stream-stream Outer Joins with Watermarking
+{:.no_toc}
+Outer joins have the same guarantees as [inner 
joins](#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking)
+regarding watermark delays and whether data will be dropped or not.
+
+###### Caveats
+{:.no_toc}
+There are a few important characteristics to note regarding how the outer 
results are generated.
 
 - *The outer NULL results will be generated with a delay that depends on the 
specified watermark
 delay and the time range condition.* This is because the engine has to wait 
for that long to ensure
@@ -1962,7 +1989,7 @@ head(sql("select * from aggregates"))
 </div>
 </div>
 
-#### Using Foreach
+##### Using Foreach
 The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`
 
([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
 which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
@@ -1979,6 +2006,172 @@ which has methods that get called whenever there is a 
sequence of rows generated
 
 - Whenever `open` is called, `close` will also be called (unless the JVM exits 
due to some error). This is true even if `open` returns false. If there is any 
error in processing and writing the data, `close` will be called with the 
error. It is your responsibility to clean up state (e.g. connections, 
transactions, etc.) that have been created in `open` such that there are no 
resource leaks.
 
+#### Triggers
+The trigger settings of a streaming query defines the timing of streaming data 
processing, whether
+the query is going to executed as micro-batch query with a fixed batch 
interval or as a continuous processing query.
+Here are the different kinds of triggers that are supported.
+
+<table class="table">
+  <tr>
+    <th>Trigger Type</th>
+    <th>Description</th>
+  </tr>
+  <tr>
+    <td><i>unspecified (default)</i></td>
+    <td>
+        If no trigger setting is explicitly specified, then by default, the 
query will be
+        executed in micro-batch mode, where micro-batches will be generated as 
soon as
+        the previous micro-batch has completed processing.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Fixed interval micro-batches</b></td>
+    <td>
+        The query will be executed with micro-batches mode, where 
micro-batches will be kicked off
+        at the user-specified intervals.
+        <ul>
+          <li>If the previous micro-batch completes within the interval, then 
the engine will wait until
+          the interval is over before kicking off the next micro-batch.</li>
+
+          <li>If the previous micro-batch takes longer than the interval to 
complete (i.e. if an
+          interval boundary is missed), then the next micro-batch will start 
as soon as the
+          previous one completes (i.e., it will not wait for the next interval 
boundary).</li>
+
+          <li>If no new data is available, then no micro-batch will be kicked 
off.</li>
+        </ul>
+    </td>
+  </tr>
+  <tr>
+    <td><b>One-time micro-batch</b></td>
+    <td>
+        The query will execute *only one* micro-batch to process all the 
available data and then
+        stop on its own. This is useful in scenarios you want to periodically 
spin up a cluster,
+        process everything that is available since the last period, and then 
shutdown the
+        cluster. In some case, this may lead to significant cost savings.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Continuous with fixed checkpoint 
interval</b><br/><i>(experimental)</i></td>
+    <td>
+        The query will be executed in the new low-latency, continuous 
processing mode. Read more
+        about this in the <a 
href="#continuous-processing-experimental">Continuous Processing section</a> 
below.
+    </td>
+  </tr>
+</table>
+
+Here are a few code examples.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start()
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start()
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start()
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start()
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start();
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start();
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start();
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start();
+
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+
+# Default trigger (runs micro-batch as soon as it can)
+df.writeStream \
+  .format("console") \
+  .start()
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream \
+  .format("console") \
+  .trigger(processingTime='2 seconds') \
+  .start()
+
+# One-time trigger
+df.writeStream \
+  .format("console") \
+  .trigger(once=True) \
+  .start()
+
+# Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(continuous='1 second')
+  .start()
+
+{% endhighlight %}
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Default trigger (runs micro-batch as soon as it can)
+write.stream(df, "console")
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+write.stream(df, "console", trigger.processingTime = "2 seconds")
+
+# One-time trigger
+write.stream(df, "console", trigger.once = TRUE)
+
+# Continuous trigger is not yet supported
+{% endhighlight %}
+</div>
+</div>
+
+
 ## Managing Streaming Queries
 The `StreamingQuery` object created when a query is started can be used to 
monitor and manage the query. 
 
@@ -2516,7 +2709,10 @@ write.stream(aggDF, "memory", outputMode = "complete", 
checkpointLocation = "pat
 </div>
 </div>
 
-# Continuous Processing [Experimental]
+# Continuous Processing
+## [Experimental]
+{:.no_toc}
+
 **Continuous processing** is a new, experimental streaming execution mode 
introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with 
at-least-once fault-tolerance guarantees. Compare this with the default 
*micro-batch processing* engine which can achieve exactly-once guarantees but 
achieve latencies of ~100ms at best. For some types of queries (discussed 
below), you can choose which mode to execute them in without modifying the 
application logic (i.e. without changing the DataFrame/Dataset operations). 
 
 To run a supported query in continuous processing mode, all you need to do is 
specify a **continuous trigger** with the desired checkpoint interval as a 
parameter. For example, 
@@ -2589,6 +2785,8 @@ spark \
 A checkpoint interval of 1 second means that the continuous processing engine 
will records the progress of the query every second. The resulting checkpoints 
are in a format compatible with the micro-batch engine, hence any query can be 
restarted with any trigger. For example, a supported query started with the 
micro-batch mode can be restarted in continuous mode, and vice versa. Note that 
any time you switch to continuous mode, you will get at-least-once 
fault-tolerance guarantees.
 
 ## Supported Queries
+{:.no_toc}
+
 As of Spark 2.3, only the following type of queries are supported in the 
continuous processing mode.
 
 - *Operations*: Only map-like Dataset/DataFrame operations are supported in 
continuous mode, that is, only projections (`select`, `map`, `flatMap`, 
`mapPartitions`, etc.) and selections (`where`, `filter`, etc.).
@@ -2606,6 +2804,8 @@ As of Spark 2.3, only the following type of queries are 
supported in the continu
 See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections 
for more details on them. While the console sink is good for testing, the 
end-to-end low-latency processing can be best observed with Kafka as the source 
and sink, as this allows the engine to process the data and make the results 
available in the output topic within milliseconds of the input data being 
available in the input topic.
 
 ## Caveats
+{:.no_toc}
+
 - Continuous processing engine launches multiple long-running tasks that 
continuously read data from sources, process it and continuously write to 
sinks. The number of tasks required by the query depends on how many partitions 
the query can read from the sources in parallel. Therefore, before starting a 
continuous processing query, you must ensure there are enough cores in the 
cluster to all the tasks in parallel. For example, if you are reading from a 
Kafka topic that has 10 partitions, then the cluster must have at least 10 
cores for the query to make progress.
 - Stopping a continuous processing stream may produce spurious task 
termination warnings. These can be safely ignored.
 - There are currently no automatic retries of failed tasks. Any failure will 
lead to the query being stopped and it needs to be manually restarted from the 
checkpoint.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to