This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 22d0d63998 [GLUTEN-11231][CORE] Improve `regeneratePartition` for 
`PartitionsUtil` (#11232)
22d0d63998 is described below

commit 22d0d63998730375625c18dd413f6cf491ccd2bd
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Dec 3 13:19:20 2025 +0800

    [GLUTEN-11231][CORE] Improve `regeneratePartition` for `PartitionsUtil` 
(#11232)
---
 .../org/apache/gluten/utils/PartitionsUtil.scala   | 85 +++++++++++-----------
 1 file changed, 41 insertions(+), 44 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
index 0f9c5deb23..b960482490 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PartitionsUtil.scala
@@ -158,60 +158,57 @@ object PartitionsUtil {
       inputPartitions: Seq[FilePartition],
       smallFileThreshold: Double): Seq[FilePartition] = {
 
-    // Flatten and sort descending by file size.
-    val filesSorted: Seq[(PartitionedFile, Long)] =
-      inputPartitions
-        .flatMap(_.files)
-        .map(f => (f, f.length))
-        .sortBy(_._2)(Ordering.Long.reverse)
-
     val partitions = 
Array.fill(inputPartitions.size)(mutable.ArrayBuffer.empty[PartitionedFile])
 
     def addToBucket(
         heap: mutable.PriorityQueue[(Long, Int, Int)],
         file: PartitionedFile,
-        sz: Long): Unit = {
-      val (load, numFiles, idx) = heap.dequeue()
+        fileSize: Long): Unit = {
+      val (size, numFiles, idx) = heap.dequeue()
       partitions(idx) += file
-      heap.enqueue((load + sz, numFiles + 1, idx))
+      heap.enqueue((size + fileSize, numFiles + 1, idx))
     }
 
-    // First by load, then by numFiles.
-    val heapByFileSize =
-      mutable.PriorityQueue.empty[(Long, Int, Int)](
-        Ordering
-          .by[(Long, Int, Int), (Long, Int)] {
-            case (load, numFiles, _) =>
-              (load, numFiles)
-          }
-          .reverse
-      )
+    def initializeHeap(
+        ordering: Ordering[(Long, Int, Int)]): mutable.PriorityQueue[(Long, 
Int, Int)] = {
+      val heap = mutable.PriorityQueue.empty[(Long, Int, Int)](ordering)
+      inputPartitions.indices.foreach(i => heap.enqueue((0L, 0, i)))
+      heap
+    }
+
+    // Flatten and sort descending by file size.
+    val filesSorted: Seq[(PartitionedFile, Long)] =
+      inputPartitions
+        .flatMap(_.files)
+        .map(f => (f, f.length))
+        .sortBy(_._2)(Ordering.Long.reverse)
+
+    // First by size, then by number of files.
+    val sizeFirstOrdering = Ordering
+      .by[(Long, Int, Int), (Long, Int)] { case (size, numFiles, _) => (size, 
numFiles) }
+      .reverse
 
     if (smallFileThreshold > 0) {
       val smallFileTotalSize = filesSorted.map(_._2).sum * smallFileThreshold
-      // First by numFiles, then by load.
-      val heapByFileNum =
-        mutable.PriorityQueue.empty[(Long, Int, Int)](
-          Ordering
-            .by[(Long, Int, Int), (Int, Long)] {
-              case (load, numFiles, _) =>
-                (numFiles, load)
-            }
-            .reverse
-        )
-
-      inputPartitions.indices.foreach(i => heapByFileNum.enqueue((0L, 0, i)))
+      // First by number of files, then by size.
+      val numFirstOrdering = Ordering
+        .by[(Long, Int, Int), (Int, Long)] { case (size, numFiles, _) => 
(numFiles, size) }
+        .reverse
+      val heapByFileNum = initializeHeap(numFirstOrdering)
 
       var numSmallFiles = 0
       var smallFileSize = 0L
-      // Enqueue small files to the least number of files and the least load.
-      filesSorted.reverse.takeWhile(f => f._2 + smallFileSize <= 
smallFileTotalSize).foreach {
-        case (file, sz) =>
-          addToBucket(heapByFileNum, file, sz)
-          numSmallFiles += 1
-          smallFileSize += sz
-      }
+      // Distribute small files evenly across partitions to achieve load 
balancing of small files.
+      filesSorted.reverseIterator
+        .takeWhile(f => f._2 + smallFileSize <= smallFileTotalSize)
+        .foreach {
+          case (file, fileSize) =>
+            addToBucket(heapByFileNum, file, fileSize)
+            numSmallFiles += 1
+            smallFileSize += fileSize
+        }
 
+      val heapByFileSize = mutable.PriorityQueue.empty[(Long, Int, 
Int)](sizeFirstOrdering)
       // Move buckets from heapByFileNum to heapByFileSize.
       while (heapByFileNum.nonEmpty) {
         heapByFileSize.enqueue(heapByFileNum.dequeue())
@@ -219,15 +216,15 @@ object PartitionsUtil {
 
       // Finally, enqueue remaining files.
       filesSorted.take(filesSorted.size - numSmallFiles).foreach {
-        case (file, sz) =>
-          addToBucket(heapByFileSize, file, sz)
+        case (file, fileSize) =>
+          addToBucket(heapByFileSize, file, fileSize)
       }
     } else {
-      inputPartitions.indices.foreach(i => heapByFileSize.enqueue((0L, 0, i)))
+      val heapByFileSize = initializeHeap(sizeFirstOrdering)
 
       filesSorted.foreach {
-        case (file, sz) =>
-          addToBucket(heapByFileSize, file, sz)
+        case (file, fileSize) =>
+          addToBucket(heapByFileSize, file, fileSize)
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to