Repository: spark Updated Branches: refs/heads/master d913db16a -> a9339db99
[SPARK-21137][CORE] Spark reads many small files slowly ## What changes were proposed in this pull request? Parallelize FileInputFormat.listStatus in Hadoop API via LIST_STATUS_NUM_THREADS to speed up examination of file sizes for wholeTextFiles et al ## How was this patch tested? Existing tests, which will exercise the key path here: using a local file system. Author: Sean Owen <[email protected]> Closes #18441 from srowen/SPARK-21137. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9339db9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9339db9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9339db9 Branch: refs/heads/master Commit: a9339db99f0620d4828eb903523be55dfbf2fb64 Parents: d913db1 Author: Sean Owen <[email protected]> Authored: Mon Jul 3 19:52:39 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Jul 3 19:52:39 2017 +0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala | 7 ++++++- .../main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a9339db9/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 50d977a..a14bad4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} @@ -35,8 +36,12 @@ private[spark] class BinaryFileRDD[T]( extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { - val inputFormat = inputFormatClass.newInstance val conf = getConf + // setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when + // traversing a large number of directories and files. Parallelize it. + conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS, + Runtime.getRuntime.availableProcessors().toString) + val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(conf) http://git-wip-us.apache.org/repos/asf/spark/blob/a9339db9/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index 8e1baae..9f3d074 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.{Text, Writable} import org.apache.hadoop.mapreduce.InputSplit +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} @@ -38,8 +39,12 @@ private[spark] class WholeTextFileRDD( extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { - val inputFormat = inputFormatClass.newInstance val conf = getConf + // setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when + // traversing a large number of directories and files. Parallelize it. + conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS, + Runtime.getRuntime.availableProcessors().toString) + val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(conf) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
