Repository: spark Updated Branches: refs/heads/master 8e674331d -> 25c01c548
[STREAMING] [MINOR] Close files correctly when iterator is finished in streaming WAL recovery Currently there's no chance to close the file correctly after the iteration is finished, change to `CompletionIterator` to avoid resource leakage. Author: jerryshao <[email protected]> Closes #6050 from jerryshao/close-file-correctly and squashes the following commits: 52dfaf5 [jerryshao] Close files correctly when iterator is finished Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25c01c54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25c01c54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25c01c54 Branch: refs/heads/master Commit: 25c01c54840a9ab768f8b917de7edc2bc2d61b9e Parents: 8e67433 Author: jerryshao <[email protected]> Authored: Mon May 11 14:38:58 2015 -0700 Committer: Tathagata Das <[email protected]> Committed: Mon May 11 14:38:58 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/25c01c54/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 9985fed..87ba4f8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -26,7 +26,7 @@ import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.{Logging, SparkConf} /** @@ -124,7 +124,8 @@ private[streaming] class FileBasedWriteAheadLog( logFilesToRead.iterator.map { file => logDebug(s"Creating log reader with $file") - new FileBasedWriteAheadLogReader(file, hadoopConf) + val reader = new FileBasedWriteAheadLogReader(file, hadoopConf) + CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _) } flatMap { x => x } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
