This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d72f398  [SPARK-27254][SS] Cleanup complete but invalid output files 
in ManifestFileCommitProtocol if job is aborted
d72f398 is described below

commit d72f39897b00d0bbd7a4db9de281a1256fcf908d
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Fri Sep 27 12:35:26 2019 -0700

    [SPARK-27254][SS] Cleanup complete but invalid output files in 
ManifestFileCommitProtocol if job is aborted
    
    ## What changes were proposed in this pull request?
    
    SPARK-27210 enables ManifestFileCommitProtocol to clean up incomplete 
output files in task level if task is aborted.
    
    This patch extends the area of cleaning up, proposes 
ManifestFileCommitProtocol to clean up complete but invalid output files in job 
level if job aborts. Please note that this works as 'best-effort', not kind of 
guarantee, as we have in HadoopMapReduceCommitProtocol.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #24186 from HeartSaVioR/SPARK-27254.
    
    Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../streaming/ManifestFileCommitProtocol.scala     | 37 ++++++++++-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 74 ++++++++++++++++++++++
 2 files changed, 109 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 916bd2d..f6cc811 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 
 import scala.collection.mutable.ArrayBuffer
@@ -43,6 +44,8 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
   @transient private var fileLog: FileStreamSinkLog = _
   private var batchId: Long = _
 
+  @transient private var pendingCommitFiles: ArrayBuffer[Path] = _
+
   /**
    * Sets up the manifest log output and the batch id for this job.
    * Must be called before any other function.
@@ -54,13 +57,21 @@ class ManifestFileCommitProtocol(jobId: String, path: 
String)
 
   override def setupJob(jobContext: JobContext): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this 
function")
-    // Do nothing
+    pendingCommitFiles = new ArrayBuffer[Path]
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this 
function")
     val fileStatuses = 
taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
 
+    // We shouldn't remove the files if they're written to the metadata:
+    // `fileLog.add(batchId, fileStatuses)` could fail AFTER writing files to 
the metadata
+    // as well as there could be race
+    // so for the safety we clean up the list before calling anything incurs 
exception.
+    // The case is uncommon and we do best effort instead of guarantee, so the 
simplicity of
+    // logic here would be OK, and safe for dealing with unexpected situations.
+    pendingCommitFiles.clear()
+
     if (fileLog.add(batchId, fileStatuses)) {
       logInfo(s"Committed batch $batchId")
     } else {
@@ -70,7 +81,29 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
 
   override def abortJob(jobContext: JobContext): Unit = {
     require(fileLog != null, "setupManifestOptions must be called before this 
function")
-    // Do nothing
+    // Best effort cleanup of complete files from failed job.
+    // Since the file has UUID in its filename, we are safe to try deleting 
them
+    // as the file will not conflict with file with another attempt on the 
same task.
+    if (pendingCommitFiles.nonEmpty) {
+      pendingCommitFiles.foreach { path =>
+        try {
+          val fs = path.getFileSystem(jobContext.getConfiguration)
+          // this is to make sure the file can be seen from driver as well
+          if (fs.exists(path)) {
+            fs.delete(path, false)
+          }
+        } catch {
+          case e: IOException =>
+            logWarning(s"Fail to remove temporary file $path, continue 
removing next.", e)
+        }
+      }
+      pendingCommitFiles.clear()
+    }
+  }
+
+  override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
+    pendingCommitFiles ++= taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]]
+      .map(_.toFileStatus.getPath)
   }
 
   override def setupTask(taskContext: TaskAttemptContext): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 7d343bb..edeb416 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -22,10 +22,13 @@ import java.nio.file.Files
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.JobContext
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.execution.DataSourceScanExec
@@ -473,6 +476,77 @@ abstract class FileStreamSinkSuite extends StreamTest {
       assert(outputFiles.toList.isEmpty, "Incomplete files should be cleaned 
up.")
     }
   }
+
+  testQuietly("cleanup complete but invalid output for aborted job") {
+    withSQLConf(("spark.sql.streaming.commitProtocolClass",
+      
classOf[PendingCommitFilesTrackingManifestFileCommitProtocol].getCanonicalName))
 {
+      withTempDir { tempDir =>
+        val checkpointDir = new File(tempDir, "chk")
+        val outputDir = new File(tempDir, "output @#output")
+        val inputData = MemoryStream[Int]
+        inputData.addData(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+        val q = inputData.toDS()
+          .repartition(10)
+          .map { value =>
+            // we intend task failure after some tasks succeeds
+            if (value == 5) {
+              // put some delay to let other task commits before this task 
fails
+              Thread.sleep(100)
+              value / 0
+            } else {
+              value
+            }
+          }
+          .writeStream
+          .option("checkpointLocation", checkpointDir.getCanonicalPath)
+          .format("parquet")
+          .start(outputDir.getCanonicalPath)
+
+        intercept[StreamingQueryException] {
+          try {
+            q.processAllAvailable()
+          } finally {
+            q.stop()
+          }
+        }
+
+        import PendingCommitFilesTrackingManifestFileCommitProtocol._
+        val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
+          .filter(_.toString.endsWith(".parquet"))
+          .map(_.getFileName.toString)
+          .toSet
+        val trackingFileNames = tracking.map(new Path(_).getName).toSet
+
+        // there would be possible to have race condition:
+        // - some tasks complete while abortJob is being called
+        // we can't delete complete files for these tasks (it's OK since this 
is a best effort)
+        assert(outputFileNames.intersect(trackingFileNames).isEmpty,
+          "abortJob should clean up files reported as successful.")
+      }
+    }
+  }
+}
+
+object PendingCommitFilesTrackingManifestFileCommitProtocol {
+  val tracking: ArrayBuffer[String] = new ArrayBuffer[String]()
+
+  def cleanPendingCommitFiles(): Unit = tracking.clear()
+  def addPendingCommitFiles(paths: Seq[String]): Unit = tracking ++= paths
+}
+
+class PendingCommitFilesTrackingManifestFileCommitProtocol(jobId: String, 
path: String)
+  extends ManifestFileCommitProtocol(jobId, path) {
+  import PendingCommitFilesTrackingManifestFileCommitProtocol._
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    super.setupJob(jobContext)
+    cleanPendingCommitFiles()
+  }
+
+  override def onTaskCommit(taskCommit: FileCommitProtocol.TaskCommitMessage): 
Unit = {
+    super.onTaskCommit(taskCommit)
+    
addPendingCommitFiles(taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.path))
+  }
 }
 
 class FileStreamSinkV1Suite extends FileStreamSinkSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to