This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f0b1c9  [SPARK-36406][CORE] Avoid unnecessary file operations before 
delete a write failed file held by DiskBlockObjectWriter
6f0b1c9 is described below

commit 6f0b1c9c9c7aec600358ee15f6d6ac8cf67864e9
Author: yangjie01 <[email protected]>
AuthorDate: Mon Dec 20 09:52:38 2021 +0100

    [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write 
failed file held by DiskBlockObjectWriter
    
    ### What changes were proposed in this pull request?
    We always do file truncate operation before delete a write failed file held 
by `DiskBlockObjectWriter`, a typical process is as follows:
    
    ```
    if (!success) {
      // This code path only happens if an exception was thrown above before we 
set success;
      // close our stuff and let the exception be thrown further
      writer.revertPartialWritesAndClose()
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting ${file}")
        }
      }
    }
    ```
    The `revertPartialWritesAndClose` method will reverts writes that haven't 
been committed yet,  but it doesn't seem necessary in the current scene.
    
    So this pr add a new method  to `DiskBlockObjectWriter` named 
`closeAndDelete()`,  the new method just revert write metrics and delete the 
write failed file.
    
    ### Why are the changes needed?
    Avoid unnecessary file operations.
    
    ### Does this PR introduce _any_ user-facing change?
    Add a new method  to `DiskBlockObjectWriter` named `closeAndDelete().
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #33628 from LuciferYang/SPARK-36406.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: attilapiros <[email protected]>
---
 .../shuffle/sort/BypassMergeSortShuffleWriter.java |  5 +----
 .../spark/storage/DiskBlockObjectWriter.scala      | 26 ++++++++++++++++++++++
 .../util/collection/ExternalAppendOnlyMap.scala    |  7 +-----
 .../spark/util/collection/ExternalSorter.scala     |  7 +-----
 .../spark/storage/DiskBlockObjectWriterSuite.scala | 19 ++++++++++++++++
 5 files changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 9a5ac6f..da7a518 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -289,10 +289,7 @@ final class BypassMergeSortShuffleWriter<K, V>
           try {
             for (DiskBlockObjectWriter writer : partitionWriters) {
               // This method explicitly does _not_ throw exceptions:
-              File file = writer.revertPartialWritesAndClose();
-              if (!file.delete()) {
-                logger.error("Error while deleting file {}", 
file.getAbsolutePath());
-              }
+              writer.closeAndDelete();
             }
           } finally {
             partitionWriters = null;
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 4170609..3bdae2f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, 
OutputStream}
 import java.nio.channels.{ClosedByInterruptException, FileChannel}
+import java.nio.file.Files
 import java.util.zip.Checksum
 
 import org.apache.spark.errors.SparkCoreErrors
@@ -119,6 +120,11 @@ private[spark] class DiskBlockObjectWriter(
   private var numRecordsWritten = 0
 
   /**
+   * Keep track the number of written records committed.
+   */
+  private var numRecordsCommitted = 0L
+
+  /**
    * Set the checksum that the checksumOutputStream should use
    */
   def setChecksum(checksum: Checksum): Unit = {
@@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter(
       // In certain compression codecs, more bytes are written after streams 
are closed
       writeMetrics.incBytesWritten(committedPosition - reportedPosition)
       reportedPosition = committedPosition
+      numRecordsCommitted += numRecordsWritten
       numRecordsWritten = 0
       fileSegment
     } else {
@@ -273,6 +280,25 @@ private[spark] class DiskBlockObjectWriter(
   }
 
   /**
+   * Reverts write metrics and delete the file held by current 
`DiskBlockObjectWriter`.
+   * Callers should invoke this function when there are runtime exceptions in 
file
+   * writing process and the file is no longer needed.
+   */
+  def closeAndDelete(): Unit = {
+    Utils.tryWithSafeFinally {
+      if (initialized) {
+        writeMetrics.decBytesWritten(reportedPosition)
+        writeMetrics.decRecordsWritten(numRecordsCommitted + numRecordsWritten)
+        closeResources()
+      }
+    } {
+      if (!Files.deleteIfExists(file.toPath)) {
+        logWarning(s"Error deleting $file")
+      }
+    }
+  }
+
+  /**
    * Writes a key-value pair.
    */
   override def write(key: Any, value: Any): Unit = {
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 731131b..f24c44b 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C](
       if (!success) {
         // This code path only happens if an exception was thrown above before 
we set success;
         // close our stuff and let the exception be thrown further
-        writer.revertPartialWritesAndClose()
-        if (file.exists()) {
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file}")
-          }
-        }
+        writer.closeAndDelete()
       }
     }
 
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index eda408a..284e70e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C](
       if (!success) {
         // This code path only happens if an exception was thrown above before 
we set success;
         // close our stuff and let the exception be thrown further
-        writer.revertPartialWritesAndClose()
-        if (file.exists()) {
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file}")
-          }
-        }
+        writer.closeAndDelete()
       }
     }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index cea5501..b1f9032 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -184,4 +184,23 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite 
with BeforeAndAfterEach {
     writer.close()
     assert(segment.length === 0)
   }
+
+  test("calling closeAndDelete() on a partial write file") {
+    val (writer, file, writeMetrics) = createWriter()
+
+    for (i <- 1 to 1000) {
+      writer.write(i, i)
+    }
+    val firstSegment = writer.commitAndGet()
+    assert(firstSegment.length === file.length())
+    assert(writeMetrics.bytesWritten === file.length())
+
+    for (i <- 1 to 500) {
+      writer.write(i, i)
+    }
+    writer.closeAndDelete()
+    assert(!file.exists())
+    assert(writeMetrics.bytesWritten == 0)
+    assert(writeMetrics.recordsWritten == 0)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to