Repository: spark Updated Branches: refs/heads/master 71cfba04a -> 9841ae031
[SPARK-23345][SQL] Remove open stream record even closing it fails ## What changes were proposed in this pull request? When `DebugFilesystem` closes opened stream, if any exception occurs, we still need to remove the open stream record from `DebugFilesystem`. Otherwise, it goes to report leaked filesystem connection. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #20524 from viirya/SPARK-23345. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9841ae03 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9841ae03 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9841ae03 Branch: refs/heads/master Commit: 9841ae0313cbee1f083f131f9446808c90ed5a7b Parents: 71cfba0 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Wed Feb 7 09:48:49 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Feb 7 09:48:49 2018 -0800 ---------------------------------------------------------------------- core/src/test/scala/org/apache/spark/DebugFilesystem.scala | 7 +++++-- .../scala/org/apache/spark/sql/test/SharedSparkSession.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9841ae03/core/src/test/scala/org/apache/spark/DebugFilesystem.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala index 91355f7..a5bdc95 100644 --- a/core/src/test/scala/org/apache/spark/DebugFilesystem.scala +++ b/core/src/test/scala/org/apache/spark/DebugFilesystem.scala @@ -103,8 +103,11 @@ class DebugFilesystem extends LocalFileSystem { override def markSupported(): Boolean = wrapped.markSupported() override def close(): Unit = { - wrapped.close() - removeOpenStream(wrapped) + try { + wrapped.close() + } finally { + removeOpenStream(wrapped) + } } override def read(): Int = wrapped.read() http://git-wip-us.apache.org/repos/asf/spark/blob/9841ae03/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala index 0b4629a..e758c86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala @@ -111,7 +111,7 @@ trait SharedSparkSession spark.sharedState.cacheManager.clearCache() // files can be closed from other threads, so wait a bit // normally this doesn't take more than 1s - eventually(timeout(10.seconds)) { + eventually(timeout(10.seconds), interval(2.seconds)) { DebugFilesystem.assertNoOpenStreams() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org