This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 406c533 [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files 406c533 is described below commit 406c5331ff8937120af465219c8f443ee00a97fb Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Thu Aug 22 23:10:16 2019 -0700 [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files ### What changes were proposed in this pull request? This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used. Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue. This is there're two `renameInternal` methods: ``` public void renameInternal(Path src, Path dst) public void renameInternal(final Path src, final Path dst, boolean overwrite) ``` which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem. The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected. [SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression. This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task. ### Why are the changes needed? This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 100000+. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Some unit tests are modified to check leakage of crc files. Closes #25488 from HeartSaVioR/SPARK-28025. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../streaming/CheckpointFileManager.scala | 14 ++++++++++ .../streaming/CheckpointFileManagerSuite.scala | 16 ++++++++++++ .../execution/streaming/HDFSMetadataLogSuite.scala | 30 ++++++++++++++++++---- 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index fe6362d..26f42b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -327,6 +327,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) + // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved + mayRemoveCrcFile(srcPath) } @@ -343,5 +345,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false } + + private def mayRemoveCrcFile(path: Path): Unit = { + try { + val checksumFile = new Path(path.getParent, s".${path.getName}.crc") + if (exists(checksumFile)) { + // checksum file exists, deleting it + delete(checksumFile) + } + } catch { + case NonFatal(_) => // ignore, we are removing crc file as "best-effort" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index c57b40c..79bcd49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper { assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + // crc file should not be leaked when origin file doesn't exist. + // The implementation of Hadoop filesystem may filter out checksum file, so + // listing files from local filesystem. + val fileNames = new File(path.getParent.toString).listFiles().toSeq + .filter(p => p.isFile).map(p => p.getName) + val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc")) + val originFileNamesForExistingCrcFiles = crcFiles.map { name => + // remove first "." and last ".crc" + name.substring(1, name.length - 4) + } + + // Check all origin files exist for all crc files. + assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet), + s"Some of origin files for crc files don't exist - crc files: $crcFiles / " + + s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames") + // Open and delete fm.open(path).close() fm.delete(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index c09756c..67dd88c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -25,6 +25,7 @@ import scala.language.implicitConversions import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.UninterruptibleThread @@ -58,6 +59,21 @@ class HDFSMetadataLogSuite extends SharedSparkSession { } test("HDFSMetadataLog: purge") { + testPurge() + } + + Seq( + classOf[FileSystemBasedCheckpointFileManager], + classOf[FileContextBasedCheckpointFileManager] + ).map(_.getCanonicalName).foreach { cls => + test(s"HDFSMetadataLog: purge - explicit file manager - $cls") { + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) { + testPurge() + } + } + } + + private def testPurge(): Unit = { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -74,12 +90,16 @@ class HDFSMetadataLogSuite extends SharedSparkSession { assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) - // There should be exactly one file, called "2", in the metadata directory. + // There should be at most two files, called "2", and optionally crc file, + // in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles() - .filter(!_.getName.startsWith(".")).toSeq - assert(allFiles.size == 1) - assert(allFiles(0).getName() == "2") + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size <= 2) + assert(allFiles.exists(_.getName == "2")) + if (allFiles.size == 2) { + // there's possibly crc file being left as well + assert(allFiles.exists(_.getName == ".2.crc")) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org