Repository: spark Updated Branches: refs/heads/master bc66a77bb -> f72ad303f
[SPARK-20848][SQL] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? >From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. >One of the threads in the pool is kept in the WAITING state, and never >stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18073 from viirya/SPARK-20848. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f72ad303 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f72ad303 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f72ad303 Branch: refs/heads/master Commit: f72ad303f05a6d99513ea3b121375726b177199c Parents: bc66a77 Author: Liang-Chi Hsieh <[email protected]> Authored: Thu May 25 00:35:40 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu May 25 00:35:40 2017 +0800 ---------------------------------------------------------------------- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f72ad303/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2f3a2c6..29ed890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par - parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + val pool = new ForkJoinPool(8) + parFiles.tasksupport = new ForkJoinTaskSupport(pool) parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. @@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging { } else { throw new IOException(s"Could not read footer for file: $currentFile", e) } + } finally { + pool.shutdown() } }.seq } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
