Yang Jie created SPARK-33424:
--------------------------------

             Summary: Doubts about the use of the 
"DiskBlockObjectWriter#revertPartialWritesAndClose" method in Spark Code
                 Key: SPARK-33424
                 URL: https://issues.apache.org/jira/browse/SPARK-33424
             Project: Spark
          Issue Type: Question
          Components: Spark Core
    Affects Versions: 3.1.0
            Reporter: Yang Jie


Although there are some similar discussions in SPARK-17562, but I still have 
some questions.

I found "DiskBlockObjectWriter#revertPartialWritesAndClose" method is called in 
5 places,

Two of the call points are in the 
"ExternalAppendOnlyMap#spillMemoryIteratorToDisk" method, and two similar call 
points are in the "ExternalSorter#spillMemoryIteratorToDisk" method, and the 
last  is in the "BypassMergeSortShuffleWriter#stop" method.

Let's take the use of "ExternalAppendOnlyMap#spillMemoryIteratorToDisk" as an 
example:

 
{code:java}
val (blockId, file) = diskBlockManager.createTempLocalBlock()
val writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, 
writeMetrics)
var objectsWritten = 0

// List of batch sizes (bytes) in the order they are written to disk
val batchSizes = new ArrayBuffer[Long]

// Flush the disk writer's contents to disk, and update relevant variables
def flush(): Unit = {
  val segment = writer.commitAndGet()
  batchSizes += segment.length
  _diskBytesSpilled += segment.length
  objectsWritten = 0
}

var success = false
try {
  while (inMemoryIterator.hasNext) {
    val kv = inMemoryIterator.next()
    writer.write(kv._1, kv._2)
    objectsWritten += 1

    if (objectsWritten == serializerBatchSize) {
      flush()
    }
  }
  if (objectsWritten > 0) {
    flush()
    writer.close()
  } else {
    writer.revertPartialWritesAndClose() // The first call point 
  }
  success = true
} finally {
  if (!success) {
    // This code path only happens if an exception was thrown above before we 
set success;
    // close our stuff and let the exception be thrown further
    writer.revertPartialWritesAndClose() // The second call point
    if (file.exists()) {
      if (!file.delete()) {
        logWarning(s"Error deleting ${file}")
      }
    }
  }
}

new DiskMapIterator(file, blockId, batchSizes)
{code}
 

There are two questions about the above code:

1. Can the first call "writer.revertPartialWritesAndClose() " be replaced by 
"writer.close()"?

I think there are two possibilities to get into this branch:
 * One possibility is all data has been flush(), I think we can call 
"writer.close()" directly because all data has been flushed, 
"committedPosition"  of DiskBlockObjectWriter should eq file.length.
 * Another is inMemoryIterator is empty, in this scenario whether calling 
"revertPartialWritesAndClose()" or calling "close()", the file.length is both 0.

And if use "writer.close()"  instead of "writer.revertPartialWritesAndClose() " 
, all UTs will passed, so what is the specific scenario that must call the 
"revertPartialWritesAndClose() " method?

2. For the 2nd call point, the main goal is to roll back writeMetrics in 
DiskBlockObjectWriter? 
If we want to delete this file,  Is the truncate operation in the 
"revertPartialWritesAndClose() " method really necessary?In this scenario, 
should we just roll back writeMetrics without truncate file to reduce one disk 
operation?
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to