This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 406c533  [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager 
leaking crc files
406c533 is described below

commit 406c5331ff8937120af465219c8f443ee00a97fb
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Thu Aug 22 23:10:16 2019 -0700

    [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc 
files
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the leak of crc files from CheckpointFileManager when 
FileContextBasedCheckpointFileManager is being used.
    
    Spark hits the Hadoop bug, 
[HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems 
to be a long-standing issue.
    
    This is there're two `renameInternal` methods:
    
    ```
    public void renameInternal(Path src, Path dst)
    public void renameInternal(final Path src, final Path dst, boolean 
overwrite)
    ```
    
    which should be overridden to handle all cases but ChecksumFs only 
overrides method with 2 params, so when latter is called 
FilterFs.renameInternal(...) is called instead, and it will do rename with 
RawLocalFs as underlying filesystem.
    
    The bug is related to FileContext, so FileSystemBasedCheckpointFileManager 
is not affected.
    
    [SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a 
workaround for this bug, but 
[SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to 
bring regression.
    
    This PR deletes crc file as "best-effort" when renaming, as failing to 
delete crc file is not that critical to fail the task.
    
    ### Why are the changes needed?
    
    This PR prevents crc files not being cleaned up even purging batches. Too 
many files in same directory often hurts performance, as well as each crc file 
occupies more space than its own size so possible to occupy nontrivial amount 
of space when batches go up to 100000+.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Some unit tests are modified to check leakage of crc files.
    
    Closes #25488 from HeartSaVioR/SPARK-28025.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../streaming/CheckpointFileManager.scala          | 14 ++++++++++
 .../streaming/CheckpointFileManagerSuite.scala     | 16 ++++++++++++
 .../execution/streaming/HDFSMetadataLogSuite.scala | 30 ++++++++++++++++++----
 3 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index fe6362d..26f42b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -327,6 +327,8 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
   override def renameTempFile(srcPath: Path, dstPath: Path, 
overwriteIfPossible: Boolean): Unit = {
     import Options.Rename._
     fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE)
+    // TODO: this is a workaround of HADOOP-16255 - remove this when 
HADOOP-16255 is resolved
+    mayRemoveCrcFile(srcPath)
   }
 
 
@@ -343,5 +345,17 @@ class FileContextBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuratio
     case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + 
ChecksumFs
     case _ => false
   }
+
+  private def mayRemoveCrcFile(path: Path): Unit = {
+    try {
+      val checksumFile = new Path(path.getParent, s".${path.getName}.crc")
+      if (exists(checksumFile)) {
+        // checksum file exists, deleting it
+        delete(checksumFile)
+      }
+    } catch {
+      case NonFatal(_) => // ignore, we are removing crc file as "best-effort"
+    }
+  }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
index c57b40c..79bcd49 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala
@@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends 
SparkFunSuite with SQLHelper {
       assert(fm.exists(path))
       fm.createAtomic(path, overwriteIfPossible = true).close()  // should not 
throw exception
 
+      // crc file should not be leaked when origin file doesn't exist.
+      // The implementation of Hadoop filesystem may filter out checksum file, 
so
+      // listing files from local filesystem.
+      val fileNames = new File(path.getParent.toString).listFiles().toSeq
+        .filter(p => p.isFile).map(p => p.getName)
+      val crcFiles = fileNames.filter(n => n.startsWith(".") && 
n.endsWith(".crc"))
+      val originFileNamesForExistingCrcFiles = crcFiles.map { name =>
+        // remove first "." and last ".crc"
+        name.substring(1, name.length - 4)
+      }
+
+      // Check all origin files exist for all crc files.
+      
assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet),
+        s"Some of origin files for crc files don't exist - crc files: 
$crcFiles / " +
+          s"expected origin files: $originFileNamesForExistingCrcFiles / 
actual files: $fileNames")
+
       // Open and delete
       fm.open(path).close()
       fm.delete(path)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index c09756c..67dd88c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -25,6 +25,7 @@ import scala.language.implicitConversions
 import org.scalatest.concurrent.Waiters._
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.UninterruptibleThread
 
@@ -58,6 +59,21 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
   }
 
   test("HDFSMetadataLog: purge") {
+    testPurge()
+  }
+
+  Seq(
+    classOf[FileSystemBasedCheckpointFileManager],
+    classOf[FileContextBasedCheckpointFileManager]
+  ).map(_.getCanonicalName).foreach { cls =>
+    test(s"HDFSMetadataLog: purge - explicit file manager - $cls") {
+      withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key 
-> cls) {
+        testPurge()
+      }
+    }
+  }
+
+  private def testPurge(): Unit = {
     withTempDir { temp =>
       val metadataLog = new HDFSMetadataLog[String](spark, 
temp.getAbsolutePath)
       assert(metadataLog.add(0, "batch0"))
@@ -74,12 +90,16 @@ class HDFSMetadataLogSuite extends SharedSparkSession {
       assert(metadataLog.get(2).isDefined)
       assert(metadataLog.getLatest().get._1 == 2)
 
-      // There should be exactly one file, called "2", in the metadata 
directory.
+      // There should be at most two files, called "2", and optionally crc 
file,
+      // in the metadata directory.
       // This check also tests for regressions of SPARK-17475
-      val allFiles = new File(metadataLog.metadataPath.toString).listFiles()
-        .filter(!_.getName.startsWith(".")).toSeq
-      assert(allFiles.size == 1)
-      assert(allFiles(0).getName() == "2")
+      val allFiles = new 
File(metadataLog.metadataPath.toString).listFiles().toSeq
+      assert(allFiles.size <= 2)
+      assert(allFiles.exists(_.getName == "2"))
+      if (allFiles.size == 2) {
+        // there's possibly crc file being left as well
+        assert(allFiles.exists(_.getName == ".2.crc"))
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to