Repository: spark
Updated Branches:
  refs/heads/master d913db16a -> a9339db99


[SPARK-21137][CORE] Spark reads many small files slowly

## What changes were proposed in this pull request?

Parallelize FileInputFormat.listStatus in Hadoop API via 
LIST_STATUS_NUM_THREADS to speed up examination of file sizes for 
wholeTextFiles et al

## How was this patch tested?

Existing tests, which will exercise the key path here: using a local file 
system.

Author: Sean Owen <[email protected]>

Closes #18441 from srowen/SPARK-21137.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9339db9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9339db9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9339db9

Branch: refs/heads/master
Commit: a9339db99f0620d4828eb903523be55dfbf2fb64
Parents: d913db1
Author: Sean Owen <[email protected]>
Authored: Mon Jul 3 19:52:39 2017 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Jul 3 19:52:39 2017 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala  | 7 ++++++-
 .../main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala    | 7 ++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a9339db9/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 50d977a..a14bad4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.task.JobContextImpl
 
 import org.apache.spark.{Partition, SparkContext}
@@ -35,8 +36,12 @@ private[spark] class BinaryFileRDD[T](
   extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, 
conf) {
 
   override def getPartitions: Array[Partition] = {
-    val inputFormat = inputFormatClass.newInstance
     val conf = getConf
+    // setMinPartitions below will call FileInputFormat.listStatus(), which 
can be quite slow when
+    // traversing a large number of directories and files. Parallelize it.
+    conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
+      Runtime.getRuntime.availableProcessors().toString)
+    val inputFormat = inputFormatClass.newInstance
     inputFormat match {
       case configurable: Configurable =>
         configurable.setConf(conf)

http://git-wip-us.apache.org/repos/asf/spark/blob/a9339db9/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
index 8e1baae..9f3d074 100644
--- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.{Text, Writable}
 import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.task.JobContextImpl
 
 import org.apache.spark.{Partition, SparkContext}
@@ -38,8 +39,12 @@ private[spark] class WholeTextFileRDD(
   extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, 
conf) {
 
   override def getPartitions: Array[Partition] = {
-    val inputFormat = inputFormatClass.newInstance
     val conf = getConf
+    // setMinPartitions below will call FileInputFormat.listStatus(), which 
can be quite slow when
+    // traversing a large number of directories and files. Parallelize it.
+    conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
+      Runtime.getRuntime.availableProcessors().toString)
+    val inputFormat = inputFormatClass.newInstance
     inputFormat match {
       case configurable: Configurable =>
         configurable.setConf(conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to