Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fd66a3b7b -> a5a8a86e2


Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

This reverts commit f5f21e8c4261c0dfe8e3e788a30b38b188a18f67.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5a8a86e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5a8a86e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5a8a86e

Branch: refs/heads/branch-2.3
Commit: a5a8a86e213c34d6fb32f0ae52db24d8f1ef0905
Parents: fd66a3b
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Wed Feb 14 10:59:36 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Feb 14 10:59:36 2018 -0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 29 ++++++--------------
 .../datasources/FileSourceStrategySuite.scala   | 15 ++++++----
 2 files changed, 17 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7732e2..aa66ee7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -445,29 +445,16 @@ case class FileSourceScanExec(
       currentSize = 0
     }
 
-    def addFile(file: PartitionedFile): Unit = {
-        currentFiles += file
-        currentSize += file.length + openCostInBytes
-    }
-
-    var frontIndex = 0
-    var backIndex = splitFiles.length - 1
-
-    while (frontIndex <= backIndex) {
-      addFile(splitFiles(frontIndex))
-      frontIndex += 1
-      while (frontIndex <= backIndex &&
-             currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
-        addFile(splitFiles(frontIndex))
-        frontIndex += 1
-      }
-      while (backIndex > frontIndex &&
-             currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
-        addFile(splitFiles(backIndex))
-        backIndex -= 1
+    // Assign files to partitions using "Next Fit Decreasing"
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
       }
-      closePartition()
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
     }
+    closePartition()
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a5a8a86e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bfccc93..c1d61b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
         SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(file1, file6), (file2, file3), (file4, 
file5)]
-        assert(partitions.size == 3, "when checking partitions")
-        assert(partitions(0).files.size == 2, "when checking partition 1")
+        // Files should be laid out [(file1), (file2, file3), (file4, file5), 
(file6)]
+        assert(partitions.size == 4, "when checking partitions")
+        assert(partitions(0).files.size == 1, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
         assert(partitions(2).files.size == 2, "when checking partition 3")
+        assert(partitions(3).files.size == 1, "when checking partition 4")
 
-        // First partition reads (file1, file6)
+        // First partition reads (file1)
         assert(partitions(0).files(0).start == 0)
         assert(partitions(0).files(0).length == 2)
-        assert(partitions(0).files(1).start == 0)
-        assert(partitions(0).files(1).length == 1)
 
         // Second partition reads (file2, file3)
         assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
         assert(partitions(2).files(0).length == 1)
         assert(partitions(2).files(1).start == 0)
         assert(partitions(2).files(1).length == 1)
+
+        // Final partition reads (file6)
+        assert(partitions(3).files(0).start == 0)
+        assert(partitions(3).files(0).length == 1)
       }
 
       checkPartitionSchema(StructType(Nil))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to