Repository: spark Updated Branches: refs/heads/master 197f9018a -> 6b68d61cf
[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18100 from viirya/SPARK-20848-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b68d61c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b68d61c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b68d61c Branch: refs/heads/master Commit: 6b68d61cf31748a088778dfdd66491b2f89a3c7b Parents: 197f901 Author: Liang-Chi Hsieh <[email protected]> Authored: Thu May 25 09:55:45 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu May 25 09:55:45 2017 +0800 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFileFormat.scala | 42 ++++++++++---------- 1 file changed, 22 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6b68d61c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 29ed890..87fbf8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat @@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging { partFiles: Seq[FileStatus], ignoreCorruptFiles: Boolean): Seq[Footer] = { val parFiles = partFiles.par - val pool = new ForkJoinPool(8) + val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8) parFiles.tasksupport = new ForkJoinTaskSupport(pool) - parFiles.flatMap { currentFile => - try { - // Skips row group information since we only need the schema. - // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, - // when it can't read the footer. - Some(new Footer(currentFile.getPath(), - ParquetFileReader.readFooter( - conf, currentFile, SKIP_ROW_GROUPS))) - } catch { case e: RuntimeException => - if (ignoreCorruptFiles) { - logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) - None - } else { - throw new IOException(s"Could not read footer for file: $currentFile", e) + try { + parFiles.flatMap { currentFile => + try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), + ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) + } catch { case e: RuntimeException => + if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) + None + } else { + throw new IOException(s"Could not read footer for file: $currentFile", e) + } } - } finally { - pool.shutdown() - } - }.seq + }.seq + } finally { + pool.shutdown() + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
