This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 65d822c44d93 [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot 65d822c44d93 is described below commit 65d822c44d93077b68d8738c261fcaf9288cc960 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Mon Jan 15 14:55:39 2024 +0800 [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot ### What changes were proposed in this pull request? Some may want to use checkpoint to get a consistent snapshot of the Dataset / RDD. Warn that this is not the case with lazy checkpoint, because checkpoint is computed only at the end of the first action, and the data used during the first action may be different because of non-determinism and retries. `doCheckpoint` is only called at the end of [SparkContext.runJob](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/SparkContext.scala#L2426). This may cause recomputation both of data of [local checkpoint data](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala#L54) and [reliable checkpoint data](https://github.com/apache/sp [...] ### Why are the changes needed? Document a gnarly edge case. ### Does this PR introduce _any_ user-facing change? Yes, change to documentation of public APIs. ### How was this patch tested? Doc only change. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43247 from juliuszsompolski/SPARK-45435-doc. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 14 ++++++++++++++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9518433a7f69..d73fb1b9bc3b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1649,6 +1649,13 @@ abstract class RDD[T: ClassTag]( * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. + * + * The data is only checkpointed when `doCheckpoint()` is called, and this only happens at the + * end of the first action execution on this RDD. The final data that is checkpointed after the + * first action may be different from the data that was used during the action, due to + * non-determinism of the underlying operation and retries. If the purpose of the checkpoint is + * to achieve saving a deterministic snapshot of the data, an eager action may need to be called + * first on the RDD to trigger the checkpoint. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring @@ -1678,6 +1685,13 @@ abstract class RDD[T: ClassTag]( * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. * * The checkpoint directory set through `SparkContext#setCheckpointDir` is not used. + * + * The data is only checkpointed when `doCheckpoint()` is called, and this only happens at the + * end of the first action execution on this RDD. The final data that is checkpointed after the + * first action may be different from the data that was used during the action, due to + * non-determinism of the underlying operation and retries. If the purpose of the checkpoint is + * to achieve saving a deterministic snapshot of the data, an eager action may need to be called + * first on the RDD to trigger the checkpoint. */ def localCheckpoint(): this.type = RDDCheckpointData.synchronized { if (Utils.isDynamicAllocationEnabled(conf) && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d792cdbcf865..0038f1a510b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -688,6 +688,14 @@ class Dataset[T] private[sql]( * plan may grow exponentially. It will be saved to files inside the checkpoint * directory set with `SparkContext#setCheckpointDir`. * + * @param eager Whether to checkpoint this dataframe immediately + * + * @note When checkpoint is used with eager = false, the final data that is checkpointed after + * the first action may be different from the data that was used during the job due to non + * deteminism of the underlying operation and retries. If checkpoint is used to achieve + * saving a deterministic snapshot of the data, eager = true should be used. Otherwise, + * it is only deterministic after the first execution, after the checkpoint was finalized. + * * @group basic * @since 2.1.0 */ @@ -710,6 +718,14 @@ class Dataset[T] private[sql]( * plan may grow exponentially. Local checkpoints are written to executor storage and despite * potentially faster they are unreliable and may compromise job completion. * + * @param eager Whether to checkpoint this dataframe immediately + * + * @note When checkpoint is used with eager = false, the final data that is checkpointed after + * the first action may be different from the data that was used during the job due to non + * deteminism of the underlying operation and retries. If checkpoint is used to achieve + * saving a deterministic snapshot of the data, eager = true should be used. Otherwise, + * it is only deterministic after the first execution, after the checkpoint was finalized. + * * @group basic * @since 2.3.0 */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org