Repository: spark
Updated Branches:
  refs/heads/branch-2.1 13adc0fc0 -> 2f68631f5


[SPARK-20848][SQL] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

>From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. 
>One of the threads in the pool is kept in the WAITING state, and never 
>stopped, which leads to unbounded growth in number of threads.

We should shutdown the pool after reading parquet files.

## How was this patch tested?

Added a test to ParquetFileFormatSuite.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <[email protected]>

Closes #18073 from viirya/SPARK-20848.

(cherry picked from commit f72ad303f05a6d99513ea3b121375726b177199c)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f68631f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f68631f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f68631f

Branch: refs/heads/branch-2.1
Commit: 2f68631f523e4db08549497d9c3264a43137bbb1
Parents: 13adc0f
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu May 25 00:35:40 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu May 25 00:36:22 2017 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/parquet/ParquetFileFormat.scala   | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f68631f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2b4892e..70eb01c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -496,7 +496,8 @@ object ParquetFileFormat extends Logging {
       partFiles: Seq[FileStatus],
       ignoreCorruptFiles: Boolean): Seq[Footer] = {
     val parFiles = partFiles.par
-    parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+    val pool = new ForkJoinPool(8)
+    parFiles.tasksupport = new ForkJoinTaskSupport(pool)
     parFiles.flatMap { currentFile =>
       try {
         // Skips row group information since we only need the schema.
@@ -512,6 +513,8 @@ object ParquetFileFormat extends Logging {
         } else {
           throw new IOException(s"Could not read footer for file: 
$currentFile", e)
         }
+      } finally {
+        pool.shutdown()
       }
     }.seq
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to