Repository: spark Updated Branches: refs/heads/branch-2.1 13adc0fc0 -> 2f68631f5
[SPARK-20848][SQL] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? >From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. >One of the threads in the pool is kept in the WAITING state, and never >stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad303f05a6d99513ea3b121375726b177199c) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f68631f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f68631f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f68631f Branch: refs/heads/branch-2.1 Commit: 2f68631f523e4db08549497d9c3264a43137bbb1 Parents: 13adc0f Author: Liang-Chi Hsieh <[email protected]> Authored: Thu May 25 00:35:40 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu May 25 00:36:22 2017 +0800 ---------------------------------------------------------------------- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2f68631f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2b4892e..70eb01c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par - parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + val pool = new ForkJoinPool(8) + parFiles.tasksupport = new ForkJoinTaskSupport(pool) parFiles.flatMap { currentFile => try { // Skips row group information since we only need the schema. @@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging { } else { throw new IOException(s"Could not read footer for file: $currentFile", e) } + } finally { + pool.shutdown() } }.seq } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
