Repository: spark Updated Branches: refs/heads/branch-1.3 f93d4d992 -> 25fae8e7e
[SPARK-5423][Core] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file This PR adds a `finalize` method in DiskMapIterator to clean up the resources even if some exception happens during processing data. Author: zsxwing <[email protected]> Closes #4219 from zsxwing/SPARK-5423 and squashes the following commits: d4b2ca6 [zsxwing] Cleanup resources in DiskMapIterator.finalize to ensure deleting the temp file (cherry picked from commit 90095bf3ce9304d09a32ceffaa99069079071b59) Signed-off-by: Ubuntu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25fae8e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25fae8e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25fae8e7 Branch: refs/heads/branch-1.3 Commit: 25fae8e7e6c93b7817771342d370b73b40dcf92e Parents: f93d4d9 Author: zsxwing <[email protected]> Authored: Thu Feb 19 18:37:31 2015 +0000 Committer: Ubuntu <[email protected]> Committed: Thu Feb 19 18:37:47 2015 +0000 ---------------------------------------------------------------------- .../util/collection/ExternalAppendOnlyMap.scala | 52 ++++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/25fae8e7/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a0f5a6..fc7e86e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -387,6 +387,15 @@ class ExternalAppendOnlyMap[K, V, C]( private var batchIndex = 0 // Which batch we're in private var fileStream: FileInputStream = null + @volatile private var closed = false + + // A volatile variable to remember which DeserializationStream is using. Need to set it when we + // open a DeserializationStream. But we should use `deserializeStream` rather than + // `deserializeStreamToBeClosed` to read the content because touching a volatile variable will + // reduce the performance. It must be volatile so that we can see its correct value in the + // `finalize` method, which could run in any thread. + @volatile private var deserializeStreamToBeClosed: DeserializationStream = null + // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams private var deserializeStream = nextBatchStream() @@ -401,6 +410,7 @@ class ExternalAppendOnlyMap[K, V, C]( // we're still in a valid batch. if (batchIndex < batchOffsets.length - 1) { if (deserializeStream != null) { + deserializeStreamToBeClosed = null deserializeStream.close() fileStream.close() deserializeStream = null @@ -419,7 +429,11 @@ class ExternalAppendOnlyMap[K, V, C]( val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) - ser.deserializeStream(compressedStream) + // Before returning the stream, assign it to `deserializeStreamToBeClosed` so that we can + // close it in `finalize` and also avoid to touch the volatile `deserializeStreamToBeClosed` + // during reading the (K, C) pairs. + deserializeStreamToBeClosed = ser.deserializeStream(compressedStream) + deserializeStreamToBeClosed } else { // No more batches left cleanup() @@ -468,14 +482,34 @@ class ExternalAppendOnlyMap[K, V, C]( item } - // TODO: Ensure this gets called even if the iterator isn't drained. - private def cleanup() { - batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - deserializeStream = null - fileStream = null - ds.close() - file.delete() + // TODO: Now only use `finalize` to ensure `close` gets called to clean up the resources. In the + // future, we need some mechanism to ensure this gets called once the resources are not used. + private def cleanup(): Unit = { + if (!closed) { + closed = true + batchIndex = batchOffsets.length // Prevent reading any other batch + fileStream = null + try { + val ds = deserializeStreamToBeClosed + deserializeStreamToBeClosed = null + deserializeStream = null + if (ds != null) { + ds.close() + } + } finally { + if (file.exists()) { + file.delete() + } + } + } + } + + override def finalize(): Unit = { + try { + cleanup() + } finally { + super.finalize() + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
