This is an automated email from the ASF dual-hosted git repository.
attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6f0b1c9 [SPARK-36406][CORE] Avoid unnecessary file operations before
delete a write failed file held by DiskBlockObjectWriter
6f0b1c9 is described below
commit 6f0b1c9c9c7aec600358ee15f6d6ac8cf67864e9
Author: yangjie01 <[email protected]>
AuthorDate: Mon Dec 20 09:52:38 2021 +0100
[SPARK-36406][CORE] Avoid unnecessary file operations before delete a write
failed file held by DiskBlockObjectWriter
### What changes were proposed in this pull request?
We always do file truncate operation before delete a write failed file held
by `DiskBlockObjectWriter`, a typical process is as follows:
```
if (!success) {
// This code path only happens if an exception was thrown above before we
set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose()
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
}
```
The `revertPartialWritesAndClose` method will reverts writes that haven't
been committed yet, but it doesn't seem necessary in the current scene.
So this pr add a new method to `DiskBlockObjectWriter` named
`closeAndDelete()`, the new method just revert write metrics and delete the
write failed file.
### Why are the changes needed?
Avoid unnecessary file operations.
### Does this PR introduce _any_ user-facing change?
Add a new method to `DiskBlockObjectWriter` named `closeAndDelete().
### How was this patch tested?
Pass the Jenkins or GitHub Action
Closes #33628 from LuciferYang/SPARK-36406.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: attilapiros <[email protected]>
---
.../shuffle/sort/BypassMergeSortShuffleWriter.java | 5 +----
.../spark/storage/DiskBlockObjectWriter.scala | 26 ++++++++++++++++++++++
.../util/collection/ExternalAppendOnlyMap.scala | 7 +-----
.../spark/util/collection/ExternalSorter.scala | 7 +-----
.../spark/storage/DiskBlockObjectWriterSuite.scala | 19 ++++++++++++++++
5 files changed, 48 insertions(+), 16 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 9a5ac6f..da7a518 100644
---
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -289,10 +289,7 @@ final class BypassMergeSortShuffleWriter<K, V>
try {
for (DiskBlockObjectWriter writer : partitionWriters) {
// This method explicitly does _not_ throw exceptions:
- File file = writer.revertPartialWritesAndClose();
- if (!file.delete()) {
- logger.error("Error while deleting file {}",
file.getAbsolutePath());
- }
+ writer.closeAndDelete();
}
} finally {
partitionWriters = null;
diff --git
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 4170609..3bdae2f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
import java.io.{BufferedOutputStream, File, FileOutputStream, IOException,
OutputStream}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
+import java.nio.file.Files
import java.util.zip.Checksum
import org.apache.spark.errors.SparkCoreErrors
@@ -119,6 +120,11 @@ private[spark] class DiskBlockObjectWriter(
private var numRecordsWritten = 0
/**
+ * Keep track the number of written records committed.
+ */
+ private var numRecordsCommitted = 0L
+
+ /**
* Set the checksum that the checksumOutputStream should use
*/
def setChecksum(checksum: Checksum): Unit = {
@@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter(
// In certain compression codecs, more bytes are written after streams
are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
+ numRecordsCommitted += numRecordsWritten
numRecordsWritten = 0
fileSegment
} else {
@@ -273,6 +280,25 @@ private[spark] class DiskBlockObjectWriter(
}
/**
+ * Reverts write metrics and delete the file held by current
`DiskBlockObjectWriter`.
+ * Callers should invoke this function when there are runtime exceptions in
file
+ * writing process and the file is no longer needed.
+ */
+ def closeAndDelete(): Unit = {
+ Utils.tryWithSafeFinally {
+ if (initialized) {
+ writeMetrics.decBytesWritten(reportedPosition)
+ writeMetrics.decRecordsWritten(numRecordsCommitted + numRecordsWritten)
+ closeResources()
+ }
+ } {
+ if (!Files.deleteIfExists(file.toPath)) {
+ logWarning(s"Error deleting $file")
+ }
+ }
+ }
+
+ /**
* Writes a key-value pair.
*/
override def write(key: Any, value: Any): Unit = {
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 731131b..f24c44b 100644
---
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C](
if (!success) {
// This code path only happens if an exception was thrown above before
we set success;
// close our stuff and let the exception be thrown further
- writer.revertPartialWritesAndClose()
- if (file.exists()) {
- if (!file.delete()) {
- logWarning(s"Error deleting ${file}")
- }
- }
+ writer.closeAndDelete()
}
}
diff --git
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index eda408a..284e70e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C](
if (!success) {
// This code path only happens if an exception was thrown above before
we set success;
// close our stuff and let the exception be thrown further
- writer.revertPartialWritesAndClose()
- if (file.exists()) {
- if (!file.delete()) {
- logWarning(s"Error deleting ${file}")
- }
- }
+ writer.closeAndDelete()
}
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
index cea5501..b1f9032 100644
---
a/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala
@@ -184,4 +184,23 @@ class DiskBlockObjectWriterSuite extends SparkFunSuite
with BeforeAndAfterEach {
writer.close()
assert(segment.length === 0)
}
+
+ test("calling closeAndDelete() on a partial write file") {
+ val (writer, file, writeMetrics) = createWriter()
+
+ for (i <- 1 to 1000) {
+ writer.write(i, i)
+ }
+ val firstSegment = writer.commitAndGet()
+ assert(firstSegment.length === file.length())
+ assert(writeMetrics.bytesWritten === file.length())
+
+ for (i <- 1 to 500) {
+ writer.write(i, i)
+ }
+ writer.closeAndDelete()
+ assert(!file.exists())
+ assert(writeMetrics.bytesWritten == 0)
+ assert(writeMetrics.recordsWritten == 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]