This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 51615b62007 [SPARK-41596][SS][DOCS] Document the new feature "Async 
Progress Tracking" to Structured Streaming guide doc
51615b62007 is described below

commit 51615b62007add3eca004f33890d8f39b3b58698
Author: Jerry Peng <[email protected]>
AuthorDate: Wed Jan 18 10:31:43 2023 +0900

    [SPARK-41596][SS][DOCS] Document the new feature "Async Progress Tracking" 
to Structured Streaming guide doc
    
    ### What changes were proposed in this pull request?
    
    Add documentation on how async progress tracking should be used
    
    ### Why are the changes needed?
    
    Documentation on async progress tracking is missing
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    n/a
    
    Closes #39538 from jerrypeng/SPARK-41596.
    
    Authored-by: Jerry Peng <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 docs/img/async-progress.png                    | Bin 0 -> 29770 bytes
 docs/structured-streaming-programming-guide.md |  56 +++++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git a/docs/img/async-progress.png b/docs/img/async-progress.png
new file mode 100644
index 00000000000..ebefc8e4aba
Binary files /dev/null and b/docs/img/async-progress.png differ
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 447e08bcb7f..78176c9cb8e 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -3570,6 +3570,62 @@ the effect of the change is not well-defined. For all of 
them:
     if you save your state as Avro-encoded bytes, then you are free to change 
the Avro-state-schema between query
     restarts as the binary state will always be restored successfully.
 
+# Asynchronous Progress Tracking
+## What is it?
+
+Asynchronous progress tracking allows streaming queries to checkpoint progress 
asynchronously and in parallel to the actual data processing within a 
micro-batch, reducing latency associated with maintaining the offset log and 
commit log.
+
+![Async Progress Tracking](img/async-progress.png)
+
+## How does it work?
+
+Structured Streaming relies on persisting and managing offsets as progress 
indicators for query processing. Offset management operation directly impacts 
processing latency, because no data processing can occur until these operations 
are complete. Asynchronous progress tracking enables streaming queries to 
checkpoint progress without being impacted by these offset management 
operations.
+
+## How to use it?
+
+The code snippet below provides an example of how to use this feature:
+```scala
+val stream = spark.readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+      .option("subscribe", "in")
+      .load()
+val query = stream.writeStream
+     .format("kafka")
+       .option("topic", "out")
+     .option("checkpointLocation", "/tmp/checkpoint")
+       .option("asyncProgressTrackingEnabled", "true")
+     .start()
+```
+
+The table below describes the configurations for this feature and default 
values associated with them.
+
+| Option    | Value           | Default | Description       | 
+|-------------|-----------------|------------|---------------------|
+|asyncProgressTrackingEnabled|true/false|false|enable or disable asynchronous 
progress tracking|
+|asyncProgressCheckpointingInterval|minutes|1|the interval in which we commit 
offsets and completion commits|
+
+## Limitations
+The initial version of the feature has the following limitations:
+
+* Asynchronous progress tracking is only supported in stateless queries using 
Kafka Sink
+* Exactly once end-to-end processing will not be supported with this 
asynchronous progress tracking because offset ranges for batch can be changed 
in case of failure. Though many sinks, such as Kafka sink, do not support 
writing exactly once anyways.
+
+## Switching the setting off
+Turning the async progress tracking off may cause the following exception to 
be thrown
+
+```scala
+java.lang.IllegalStateException: batch x doesn't exist
+```
+
+Also the following error message may be printed in the driver logs:
+
+```
+The offset log for batch x doesn't exist, which is required to restart the 
query from the latest batch x from the offset log. Please ensure there are two 
subsequent offset logs available for the latest batch via manually deleting the 
offset file(s). Please also ensure the latest batch for commit log is equal or 
one batch earlier than the latest batch for offset log.
+```
+
+This is caused by the fact that when async progress tracking is enabled, the 
framework will not checkpoint progress for every batch as would be done if 
async progress tracking is not used. To solve this problem simply re-enable 
“asyncProgressTrackingEnabled” and set “asyncProgressCheckpointingInterval” to 
0 and run the streaming query until at least two micro-batches have been 
processed. Async progress tracking can be now safely disabled and restarting 
query should proceed normally.
+
 # Continuous Processing
 ## [Experimental]
 {:.no_toc}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to