Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3f82d65bf -> e0aa23939


[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to 
prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set 
a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <[email protected]>

Closes #18100 from viirya/SPARK-20848-followup.

(cherry picked from commit 6b68d61cf31748a088778dfdd66491b2f89a3c7b)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0aa2393
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0aa2393
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0aa2393

Branch: refs/heads/branch-2.2
Commit: e0aa23939a4cbf95f2cc83a7f5adee841b491358
Parents: 3f82d65
Author: Liang-Chi Hsieh <[email protected]>
Authored: Thu May 25 09:55:45 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Thu May 25 09:55:59 2017 +0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0aa2393/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 29ed890..87fbf8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
 
 class ParquetFileFormat
   extends FileFormat
@@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
       partFiles: Seq[FileStatus],
       ignoreCorruptFiles: Boolean): Seq[Footer] = {
     val parFiles = partFiles.par
-    val pool = new ForkJoinPool(8)
+    val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
     parFiles.tasksupport = new ForkJoinTaskSupport(pool)
-    parFiles.flatMap { currentFile =>
-      try {
-        // Skips row group information since we only need the schema.
-        // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
-        // when it can't read the footer.
-        Some(new Footer(currentFile.getPath(),
-          ParquetFileReader.readFooter(
-            conf, currentFile, SKIP_ROW_GROUPS)))
-      } catch { case e: RuntimeException =>
-        if (ignoreCorruptFiles) {
-          logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
-          None
-        } else {
-          throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+    try {
+      parFiles.flatMap { currentFile =>
+        try {
+          // Skips row group information since we only need the schema.
+          // ParquetFileReader.readFooter throws RuntimeException, instead of 
IOException,
+          // when it can't read the footer.
+          Some(new Footer(currentFile.getPath(),
+            ParquetFileReader.readFooter(
+              conf, currentFile, SKIP_ROW_GROUPS)))
+        } catch { case e: RuntimeException =>
+          if (ignoreCorruptFiles) {
+            logWarning(s"Skipped the footer in the corrupted file: 
$currentFile", e)
+            None
+          } else {
+            throw new IOException(s"Could not read footer for file: 
$currentFile", e)
+          }
         }
-      } finally {
-        pool.shutdown()
-      }
-    }.seq
+      }.seq
+    } finally {
+      pool.shutdown()
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to