Repository: spark
Updated Branches:
  refs/heads/master 197f9018a -> 6b68d61cf


[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to 
prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set 
a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <[email protected]>

Closes #18100 from viirya/SPARK-20848-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b68d61c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b68d61c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b68d61c

Branch: refs/heads/master
Commit: 6b68d61cf31748a088778dfdd66491b2f89a3c7b
Parents: 197f901
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu May 25 09:55:45 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu May 25 09:55:45 2017 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b68d61c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29ed890..87fbf8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 class ParquetFileFormat
   extends FileFormat
@@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
       partFiles: Seq[FileStatus],
       ignoreCorruptFiles: Boolean): Seq[Footer] = {
     val parFiles = partFiles.par
-    val pool = new ForkJoinPool(8)
+    val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
     parFiles.tasksupport = new ForkJoinTaskSupport(pool)
-    parFiles.flatMap { currentFile =>
-      try {
-        // Skips row group information since we only need the schema.
-        // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
-        // when it can't read the footer.
-        Some(new Footer(currentFile.getPath(),
-          ParquetFileReader.readFooter(
-            conf, currentFile, SKIP_ROW_GROUPS)))
-      } catch { case e: RuntimeException =>
-        if (ignoreCorruptFiles) {
-          logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
-          None
-        } else {
-          throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+    try {
+      parFiles.flatMap { currentFile =>
+        try {
+          // Skips row group information since we only need the schema.
+          // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+          // when it can't read the footer.
+          Some(new Footer(currentFile.getPath(),
+            ParquetFileReader.readFooter(
+              conf, currentFile, SKIP_ROW_GROUPS)))
+        } catch { case e: RuntimeException =>
+          if (ignoreCorruptFiles) {
+            logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+            None
+          } else {
+            throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+          }
         }
-      } finally {
-        pool.shutdown()
-      }
-    }.seq
+      }.seq
+    } finally {
+      pool.shutdown()
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to