Repository: spark Updated Branches: refs/heads/master ca458618d -> dadf0138b
[SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a partition ## What changes were proposed in this pull request? This pr is to add a config to control the maximum number of files as even small files have a non-trivial fixed cost. The current packing can put a lot of small files together which cases straggler tasks. ## How was this patch tested? I added tests to check if many files get split into partitions in FileSourceStrategySuite. Author: Takeshi YAMAMURO <[email protected]> Closes #12068 from maropu/SPARK-14259. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dadf0138 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dadf0138 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dadf0138 Branch: refs/heads/master Commit: dadf0138b3f6fd618677a2c26f40ab66b7a1139d Parents: ca45861 Author: Takeshi YAMAMURO <[email protected]> Authored: Wed Mar 30 16:02:48 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Wed Mar 30 16:02:48 2016 -0700 ---------------------------------------------------------------------- .../datasources/FileSourceStrategy.scala | 7 ++- .../org/apache/spark/sql/internal/SQLConf.scala | 7 +++ .../datasources/FileSourceStrategySuite.scala | 47 ++++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 4448796..d653408 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -136,7 +136,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { case _ => val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes - logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes") + val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition + logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"max #files: $maxFileNumInPartition") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => @@ -174,7 +176,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { // Assign files to partitions using "First Fit Decreasing" (FFD) // TODO: consider adding a slop factor here? splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { + if (currentSize + file.length > maxSplitBytes || + currentFiles.length >= maxFileNumInPartition) { closePartition() addFile(file) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ca6ba4c..d06e908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -524,6 +524,11 @@ object SQLConf { doc = "The maximum number of bytes to pack into a single partition when reading files.", isPublic = true) + val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition", + defaultValue = Some(32), + doc = "The maximum number of files to pack into a single partition when reading files.", + isPublic = true) + val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse", defaultValue = Some(true), doc = "When true, the planner will try to find out duplicated exchanges and re-use them.", @@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) + def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def useFileScan: Boolean = getConf(USE_FILE_SCAN) http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 1fa1573..45620bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -121,6 +121,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("Unpartitioned table, many files that get split") { + val table = + createTable( + files = Seq( + "file1" -> 2, + "file2" -> 2, + "file3" -> 1, + "file4" -> 1, + "file5" -> 1, + "file6" -> 1)) + + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3", + SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") { + checkScan(table.select('c1)) { partitions => + // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] + assert(partitions.size == 4, "when checking partitions") + assert(partitions(0).files.size == 1, "when checking partition 1") + assert(partitions(1).files.size == 2, "when checking partition 2") + assert(partitions(2).files.size == 2, "when checking partition 3") + assert(partitions(3).files.size == 1, "when checking partition 4") + + // First partition reads (file1) + assert(partitions(0).files(0).start == 0) + assert(partitions(0).files(0).length == 2) + + // Second partition reads (file2, file3) + assert(partitions(1).files(0).start == 0) + assert(partitions(1).files(0).length == 2) + assert(partitions(1).files(1).start == 0) + assert(partitions(1).files(1).length == 1) + + // Third partition reads (file4, file5) + assert(partitions(2).files(0).start == 0) + assert(partitions(2).files(0).length == 1) + assert(partitions(2).files(1).start == 0) + assert(partitions(2).files(1).length == 1) + + // Final partition reads (file6) + assert(partitions(3).files(0).start == 0) + assert(partitions(3).files(0).length == 1) + } + + checkPartitionSchema(StructType(Nil)) + checkDataSchema(StructType(Nil).add("c1", IntegerType)) + } + } + test("partitioned table") { val table = createTable( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
