This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6064368  [SPARK-27018][CORE] Fix incorrect removal of checkpointed 
file in PeriodicCheckpointer
6064368 is described below

commit 6064368415636e8668d8db835a218872f6846d98
Author: zhengruifeng <[email protected]>
AuthorDate: Mon Jun 24 09:34:01 2019 -0500

    [SPARK-27018][CORE] Fix incorrect removal of checkpointed file in 
PeriodicCheckpointer
    
    ## What changes were proposed in this pull request?
    remove the oldest checkpointed file only if next checkpoint exists.
    I think this patch needs back-porting.
    
    ## How was this patch tested?
    existing test
    
    local check in spark-shell with following suite:
    ```
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.classification.GBTClassifier
    
    case class Row(features: org.apache.spark.ml.linalg.Vector, label: Int)
    
    sc.setCheckpointDir("/checkpoints")
    val trainingData = sc.parallelize(1 to 2426874, 256).map(x => 
Row(Vectors.dense(x, x + 1, x * 2 % 10), if (x % 5 == 0) 1 else 0)).toDF
    val classifier = new GBTClassifier()
      .setLabelCol("label")
      .setFeaturesCol("features")
      .setProbabilityCol("probability")
      .setMaxIter(100)
      .setMaxDepth(10)
      .setCheckpointInterval(2)
    
    classifier.fit(trainingData)
    ```
    
    Closes #24870 from zhengruifeng/ck_update.
    
    Authored-by: zhengruifeng <[email protected]>
    Signed-off-by: Sean Owen <[email protected]>
---
 core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala 
b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala
index ce06e18..c105f32 100644
--- a/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala
+++ b/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala
@@ -100,7 +100,7 @@ private[spark] abstract class PeriodicCheckpointer[T](
       var canDelete = true
       while (checkpointQueue.size > 1 && canDelete) {
         // Delete the oldest checkpoint only if the next checkpoint exists.
-        if (isCheckpointed(checkpointQueue.head)) {
+        if (isCheckpointed(checkpointQueue(1))) {
           removeCheckpointFile()
         } else {
           canDelete = false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to