This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 65d822c44d93 [SPARK-45435][DOC] Document that lazy checkpoint may not 
be a consistent snapshot
65d822c44d93 is described below

commit 65d822c44d93077b68d8738c261fcaf9288cc960
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Mon Jan 15 14:55:39 2024 +0800

    [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent 
snapshot
    
    ### What changes were proposed in this pull request?
    
    Some may want to use checkpoint to get a consistent snapshot of the Dataset 
/ RDD. Warn that this is not the case with lazy checkpoint, because checkpoint 
is computed only at the end of the first action, and the data used during the 
first action may be different because of non-determinism and retries.
    
    `doCheckpoint` is only called at the end of 
[SparkContext.runJob](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/SparkContext.scala#L2426).
 This may cause recomputation both of data of [local checkpoint 
data](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala#L54)
 and [reliable checkpoint data](https://github.com/apache/sp [...]
    
    ### Why are the changes needed?
    
    Document a gnarly edge case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, change to documentation of public APIs.
    
    ### How was this patch tested?
    
    Doc only change.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43247 from juliuszsompolski/SPARK-45435-doc.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala       | 14 ++++++++++++++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala    | 16 ++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9518433a7f69..d73fb1b9bc3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1649,6 +1649,13 @@ abstract class RDD[T: ClassTag](
    * RDDs will be removed. This function must be called before any job has been
    * executed on this RDD. It is strongly recommended that this RDD is 
persisted in
    * memory, otherwise saving it on a file will require recomputation.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
    */
   def checkpoint(): Unit = RDDCheckpointData.synchronized {
     // NOTE: we use a global lock here due to complexities downstream with 
ensuring
@@ -1678,6 +1685,13 @@ abstract class RDD[T: ClassTag](
    * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
    *
    * The checkpoint directory set through `SparkContext#setCheckpointDir` is 
not used.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
    */
   def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
     if (Utils.isDynamicAllocationEnabled(conf) &&
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d792cdbcf865..0038f1a510b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -688,6 +688,14 @@ class Dataset[T] private[sql](
    * plan may grow exponentially. It will be saved to files inside the 
checkpoint
    * directory set with `SparkContext#setCheckpointDir`.
    *
+   * @param eager Whether to checkpoint this dataframe immediately
+   *
+   * @note When checkpoint is used with eager = false, the final data that is 
checkpointed after
+   *       the first action may be different from the data that was used 
during the job due to non
+   *       deteminism of the underlying operation and retries. If checkpoint 
is used to achieve
+   *       saving a deterministic snapshot of the data, eager = true should be 
used. Otherwise,
+   *       it is only deterministic after the first execution, after the 
checkpoint was finalized.
+   *
    * @group basic
    * @since 2.1.0
    */
@@ -710,6 +718,14 @@ class Dataset[T] private[sql](
    * plan may grow exponentially. Local checkpoints are written to executor 
storage and despite
    * potentially faster they are unreliable and may compromise job completion.
    *
+   * @param eager Whether to checkpoint this dataframe immediately
+   *
+   * @note When checkpoint is used with eager = false, the final data that is 
checkpointed after
+   *       the first action may be different from the data that was used 
during the job due to non
+   *       deteminism of the underlying operation and retries. If checkpoint 
is used to achieve
+   *       saving a deterministic snapshot of the data, eager = true should be 
used. Otherwise,
+   *       it is only deterministic after the first execution, after the 
checkpoint was finalized.
+   *
    * @group basic
    * @since 2.3.0
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to