This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new aff5e2b  [SPARK-28650][SS][DOC] Correct explanation of guarantee for 
ForeachWriter
aff5e2b is described below

commit aff5e2bdca501fc24fb7d56f966d933c96a37b5b
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Tue Aug 20 00:56:53 2019 -0700

    [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter
    
    #  What changes were proposed in this pull request?
    
    This patch modifies the explanation of guarantee for ForeachWriter as it 
doesn't guarantee same output for `(partitionId, epochId)`. Refer the 
description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) 
for more details.
    
    Spark itself still guarantees same output for same epochId (batch) if the 
preconditions are met, 1) source is always providing the same input records for 
same offset request. 2) the query is idempotent in overall (indeterministic 
calculation like now(), random() can break this).
    
    Assuming breaking preconditions as an exceptional case (the preconditions 
are implicitly required even before), we still can describe the guarantee with 
`epochId`, though it will be  harder to leverage the guarantee: 1) 
ForeachWriter should implement a feature to track whether all the partitions 
are written successfully for given `epochId` 2) There's pretty less chance to 
leverage the fact, as the chance for Spark to successfully write all partitions 
and fail to checkpoint the batch i [...]
    
    Credit to zsxwing on discovering the broken guarantee.
    
    ## How was this patch tested?
    
    This is just a documentation change, both on javadoc and guide doc.
    
    Closes #25407 from HeartSaVioR/SPARK-28650.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
    (cherry picked from commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30)
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 docs/structured-streaming-programming-guide.md             | 14 ++++++--------
 .../main/scala/org/apache/spark/sql/ForeachWriter.scala    | 13 +++++--------
 2 files changed, 11 insertions(+), 16 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 2c3348a..fa5664d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1827,7 +1827,7 @@ Here are the details of all the sinks in Spark.
     <td><b>Foreach Sink</b></td>
     <td>Append, Update, Complete</td>
     <td>None</td>
-    <td>Depends on ForeachWriter implementation</td>
+    <td>Yes (at-least-once)</td>
     <td>More details in the <a href="#using-foreach-and-foreachbatch">next 
section</a></td>
   </tr>
   <tr>
@@ -2235,13 +2235,11 @@ When the streaming query is started, Spark calls the 
function or the object’s
 
 - The close() method (if it exists) is called if an open() method exists and 
returns successfully (irrespective of the return value), except if the JVM or 
Python process crashes in the middle.
 
-- **Note:** The partitionId and epochId in the open() method can be used to 
deduplicate generated data 
-  when failures cause reprocessing of some input data. This depends on the 
execution mode of the query. 
-  If the streaming query is being executed in the micro-batch mode, then every 
partition represented 
-  by a unique tuple (partition_id, epoch_id) is guaranteed to have the same 
data. 
-  Hence, (partition_id, epoch_id) can be used to deduplicate and/or 
transactionally commit 
-  data and achieve exactly-once guarantees. However, if the streaming query is 
being executed 
-  in the continuous mode, then this guarantee does not hold and therefore 
should not be used for deduplication.
+- **Note:** Spark does not guarantee same output for (partitionId, epochId), 
so deduplication
+  cannot be achieved with (partitionId, epochId). e.g. source provides 
different number of
+  partitions for some reasons, Spark optimization changes number of 
partitions, etc.
+  See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for 
more details.
+  If you need deduplication on output, try out `foreachBatch` instead.
 
 #### Triggers
 The trigger settings of a streaming query defines the timing of streaming data 
processing, whether
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
index 52b8c83..5cf294e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala
@@ -50,14 +50,11 @@ import org.apache.spark.annotation.InterfaceStability
  *
  * Important points to note:
  * <ul>
- * <li>The `partitionId` and `epochId` can be used to deduplicate generated 
data when failures
- *     cause reprocessing of some input data. This depends on the execution 
mode of the query. If
- *     the streaming query is being executed in the micro-batch mode, then 
every partition
- *     represented by a unique tuple (partitionId, epochId) is guaranteed to 
have the same data.
- *     Hence, (partitionId, epochId) can be used to deduplicate and/or 
transactionally commit data
- *     and achieve exactly-once guarantees. However, if the streaming query is 
being executed in the
- *     continuous mode, then this guarantee does not hold and therefore should 
not be used for
- *     deduplication.
+ * <li>Spark doesn't guarantee same output for (partitionId, epochId), so 
deduplication
+ *     cannot be achieved with (partitionId, epochId). e.g. source provides 
different number of
+ *     partitions for some reason, Spark optimization changes number of 
partitions, etc.
+ *     Refer SPARK-28650 for more details. If you need deduplication on 
output, try out
+ *     `foreachBatch` instead.
  *
  * <li>The `close()` method will be called if `open()` method returns 
successfully (irrespective
  *     of the return value), except if the JVM crashes in the middle.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to