Repository: spark
Updated Branches:
  refs/heads/master 9fa4a1ed3 -> 1055c94cd


[SPARK-24610] fix reading small files via wholeTextFiles

## What changes were proposed in this pull request?
The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s 
being read using the `wholeTextFiles` method. While this works well for large 
files, for smaller files where the maxSplitSize is smaller than the defaults 
being used with configs like hive-site.xml or explicitly passed in the form of 
`mapreduce.input.fileinputformat.split.minsize.per.node` or 
`mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an 
exception.

```java
java.io.IOException: Minimum split size pernode 123456 cannot be larger than 
maximum split size 9962
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at 
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and 
minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.

Author: Dhruve Ashar <dhruveas...@gmail.com>

Closes #21601 from dhruve/bug/SPARK-24610.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1055c94c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1055c94c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1055c94c

Branch: refs/heads/master
Commit: 1055c94cdf072bfce5e36bb6552fe9b148bb9d17
Parents: 9fa4a1e
Author: Dhruve Ashar <dhruveas...@gmail.com>
Authored: Thu Jul 12 15:36:02 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Thu Jul 12 15:36:02 2018 -0500

----------------------------------------------------------------------
 .../spark/input/WholeTextFileInputFormat.scala  | 13 +++
 .../input/WholeTextFileInputFormatSuite.scala   | 96 ++++++++++++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1055c94c/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala 
b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index f47cd38..04c5c4b 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -53,6 +53,19 @@ private[spark] class WholeTextFileInputFormat
     val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
     val maxSplitSize = Math.ceil(totalLen * 1.0 /
       (if (minPartitions == 0) 1 else minPartitions)).toLong
+
+    // For small files we need to ensure the min split size per node & rack <= 
maxSplitSize
+    val config = context.getConfiguration
+    val minSplitSizePerNode = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+    val minSplitSizePerRack = 
config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+    if (maxSplitSize < minSplitSizePerNode) {
+      super.setMinSplitSizeNode(maxSplitSize)
+    }
+
+    if (maxSplitSize < minSplitSizePerRack) {
+      super.setMinSplitSizeRack(maxSplitSize)
+    }
     super.setMaxSplitSize(maxSplitSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1055c94c/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
new file mode 100644
index 0000000..817dc08
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/input/WholeTextFileInputFormatSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.input
+
+import java.io.{DataOutputStream, File, FileOutputStream}
+
+import scala.collection.immutable.IndexedSeq
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Tests the correctness of
+ * [[org.apache.spark.input.WholeTextFileInputFormat 
WholeTextFileInputFormat]]. A temporary
+ * directory containing files is created as fake input which is deleted in the 
end.
+ */
+class WholeTextFileInputFormatSuite extends SparkFunSuite with 
BeforeAndAfterAll with Logging {
+  private var sc: SparkContext = _
+
+  override def beforeAll() {
+    super.beforeAll()
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "test", conf)
+  }
+
+  override def afterAll() {
+    try {
+      sc.stop()
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  private def createNativeFile(inputDir: File, fileName: String, contents: 
Array[Byte],
+                               compress: Boolean) = {
+    val path = s"${inputDir.toString}/$fileName"
+    val out = new DataOutputStream(new FileOutputStream(path))
+    out.write(contents, 0, contents.length)
+    out.close()
+  }
+
+  test("for small files minimum split size per node and per rack should be 
less than or equal to " +
+    "maximum split size.") {
+    var dir : File = null;
+    try {
+      dir = Utils.createTempDir()
+      logInfo(s"Local disk address is ${dir.toString}.")
+
+      // Set the minsize per node and rack to be larger than the size of the 
input file.
+      sc.hadoopConfiguration.setLong(
+        "mapreduce.input.fileinputformat.split.minsize.per.node", 123456)
+      sc.hadoopConfiguration.setLong(
+        "mapreduce.input.fileinputformat.split.minsize.per.rack", 123456)
+
+      WholeTextFileInputFormatSuite.files.foreach { case (filename, contents) 
=>
+        createNativeFile(dir, filename, contents, false)
+      }
+      // ensure spark job runs successfully without exceptions from the 
CombineFileInputFormat
+      assert(sc.wholeTextFiles(dir.toString).count == 3)
+    } finally {
+      Utils.deleteRecursively(dir)
+    }
+  }
+}
+
+/**
+ * Files to be tested are defined here.
+ */
+object WholeTextFileInputFormatSuite {
+  private val testWords: IndexedSeq[Byte] = "Spark is easy to 
use.\n".map(_.toByte)
+
+  private val fileNames = Array("part-00000", "part-00001", "part-00002")
+  private val fileLengths = Array(10, 100, 1000)
+
+  private val files = fileLengths.zip(fileNames).map { case (upperBound, 
filename) =>
+    filename -> 
Stream.continually(testWords.toList.toStream).flatten.take(upperBound).toArray
+  }.toMap
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to