Repository: spark
Updated Branches:
refs/heads/master fc64e83f9 -> 81a305dd0
[SPARK-25753][CORE] fix reading small files via BinaryFileRDD
## What changes were proposed in this pull request?
This is a follow up of #21601, `StreamFileInputFormat` and
`WholeTextFileInputFormat` have the same problem.
`Minimum split size pernode 5123456 cannot be larger than maximum split size
4194304
java.io.IOException: Minimum split size pernode 5123456 cannot be larger than
maximum split size 4194304
at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:
201)
at
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:52)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:252)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)`
## How was this patch tested?
Added a unit test
Closes #22725 from 10110346/maxSplitSize_node_rack.
Authored-by: liuxian <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81a305dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81a305dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81a305dd
Branch: refs/heads/master
Commit: 81a305dd0418f6e0136b4e38ffe91e0b76c8806e
Parents: fc64e83
Author: liuxian <[email protected]>
Authored: Mon Oct 22 08:53:18 2018 -0500
Committer: Thomas Graves <[email protected]>
Committed: Mon Oct 22 08:53:18 2018 -0500
----------------------------------------------------------------------
.../org/apache/spark/input/PortableDataStream.scala | 12 ++++++++++++
core/src/test/scala/org/apache/spark/FileSuite.scala | 13 +++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
----------------------------------------------------------------------
diff --git
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index ab020aa..5b33c11 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -52,6 +52,18 @@ private[spark] abstract class StreamFileInputFormat[T]
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen +
openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitSize = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
+
+ // For small files we need to ensure the min split size per node & rack <=
maxSplitSize
+ val jobConfig = context.getConfiguration
+ val minSplitSizePerNode =
jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
+ val minSplitSizePerRack =
jobConfig.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
+
+ if (maxSplitSize < minSplitSizePerNode) {
+ super.setMinSplitSizeNode(maxSplitSize)
+ }
+ if (maxSplitSize < minSplitSizePerRack) {
+ super.setMinSplitSizeRack(maxSplitSize)
+ }
super.setMaxSplitSize(maxSplitSize)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/81a305dd/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala
b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 81b18c7..34efcdf 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -320,6 +320,19 @@ class FileSuite extends SparkFunSuite with
LocalSparkContext {
}
}
+ test("minimum split size per node and per rack should be less than or equal
to maxSplitSize") {
+ sc = new SparkContext("local", "test")
+ val testOutput = Array[Byte](1, 2, 3, 4, 5)
+ val outFile = writeBinaryData(testOutput, 1)
+ sc.hadoopConfiguration.setLong(
+ "mapreduce.input.fileinputformat.split.minsize.per.node", 5123456)
+ sc.hadoopConfiguration.setLong(
+ "mapreduce.input.fileinputformat.split.minsize.per.rack", 5123456)
+
+ val (_, data) = sc.binaryFiles(outFile.getAbsolutePath).collect().head
+ assert(data.toArray === testOutput)
+ }
+
test("fixed record length binary file as byte array") {
sc = new SparkContext("local", "test")
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]