Yang Jie created SPARK-33424:
--------------------------------
Summary: Doubts about the use of the
"DiskBlockObjectWriter#revertPartialWritesAndClose" method in Spark Code
Key: SPARK-33424
URL: https://issues.apache.org/jira/browse/SPARK-33424
Project: Spark
Issue Type: Question
Components: Spark Core
Affects Versions: 3.1.0
Reporter: Yang Jie
Although there are some similar discussions in SPARK-17562, but I still have
some questions.
I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in
5 places,
Two of the call points are in the
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, and two similar call
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the
last is in the "BypassMergeSortShuffleWriter#stop" method.
Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an
example:
{code:java}
val (blockId, file) = diskBlockManager.createTempLocalBlock()
val writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize,
writeMetrics)
var objectsWritten = 0
// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]
// Flush the disk writer's contents to disk, and update relevant variables
def flush(): Unit = {
val segment = writer.commitAndGet()
batchSizes += segment.length
_diskBytesSpilled += segment.length
objectsWritten = 0
}
var success = false
try {
while (inMemoryIterator.hasNext) {
val kv = inMemoryIterator.next()
writer.write(kv._1, kv._2)
objectsWritten += 1
if (objectsWritten == serializerBatchSize) {
flush()
}
}
if (objectsWritten > 0) {
flush()
writer.close()
} else {
writer.revertPartialWritesAndClose() // The first call point
}
success = true
} finally {
if (!success) {
// This code path only happens if an exception was thrown above before we
set success;
// close our stuff and let the exception be thrown further
writer.revertPartialWritesAndClose() // The second call point
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
}
}
new DiskMapIterator(file, blockId, batchSizes)
{code}
There are two questions about the above code:
1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by
"writer.close()"?
I think there are two possibilities to get into this branch:
* One possibility is all data has been flush(), I think we can call
"writer.close()" directly because all data has been flushed,
"committedPosition" of DiskBlockObjectWriter should eq file.length.
* Another is inMemoryIterator is empty, in this scenario whether calling
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 0.
And if use "writer.close()" instead of "writer.revertPartialWritesAndClose() "
, all UTs will passed, so what is the specific scenario that must call the
"revertPartialWritesAndClose() " method?
2. For the 2nd call point, the main goal is to roll back writeMetrics in
DiskBlockObjectWriter?
If we want to delete this file, Is the truncate operation in the
"revertPartialWritesAndClose() " method really necessary?In this scenario,
should we just roll back writeMetrics without truncate file to reduce one disk
operation?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]