Repository: spark
Updated Branches:
  refs/heads/master bc66a77bb -> f72ad303f


[SPARK-20848][SQL] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

>From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. 
>One of the threads in the pool is kept in the WAITING state, and never 
>stopped, which leads to unbounded growth in number of threads.

We should shutdown the pool after reading parquet files.

## How was this patch tested?

Added a test to ParquetFileFormatSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <[email protected]>

Closes #18073 from viirya/SPARK-20848.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f72ad303
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f72ad303
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f72ad303

Branch: refs/heads/master
Commit: f72ad303f05a6d99513ea3b121375726b177199c
Parents: bc66a77
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu May 25 00:35:40 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu May 25 00:35:40 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f72ad303/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2f3a2c6..29ed890 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging {
       partFiles: Seq[FileStatus],
       ignoreCorruptFiles: Boolean): Seq[Footer] = {
     val parFiles = partFiles.par
-    parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+    val pool = new ForkJoinPool(8)
+    parFiles.tasksupport = new ForkJoinTaskSupport(pool)
     parFiles.flatMap { currentFile =>
       try {
         // Skips row group information since we only need the schema.
@@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging {
         } else {
           throw new IOException(s"Could not read footer for file: 
$currentFile", e)
         }
+      } finally {
+        pool.shutdown()
       }
     }.seq
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to