This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new a86626a  fix: Reduce RowPartition memory allocation (#244)
a86626a is described below

commit a86626a057a5834b8efac78fae41eaf1c89ee003
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Apr 6 17:07:09 2024 -0700

    fix: Reduce RowPartition memory allocation (#244)
---
 .../sql/comet/execution/shuffle/CometDiskBlockWriter.java | 15 +++++++--------
 .../org/apache/spark/shuffle/sort/RowPartition.scala      |  8 ++++----
 2 files changed, 11 insertions(+), 12 deletions(-)

diff --git 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
index d1593f7..309fcaf 100644
--- 
a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
+++ 
b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometDiskBlockWriter.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.spark.SparkConf;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
-import org.apache.spark.internal.config.package$;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.serializer.SerializationStream;
 import org.apache.spark.serializer.SerializerInstance;
@@ -102,7 +101,7 @@ public final class CometDiskBlockWriter {
   private final File file;
   private long totalWritten = 0L;
   private boolean initialized = false;
-  private final int initialBufferSize;
+  private final int columnarBatchSize;
   private final boolean isAsync;
   private final int asyncThreadNum;
   private final ExecutorService threadPool;
@@ -152,8 +151,7 @@ public final class CometDiskBlockWriter {
     this.asyncThreadNum = asyncThreadNum;
     this.threadPool = threadPool;
 
-    this.initialBufferSize =
-        (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE());
+    this.columnarBatchSize = (int) 
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
 
     this.numElementsForSpillThreshold =
         (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD().get();
@@ -264,10 +262,11 @@ public final class CometDiskBlockWriter {
     // While proceeding with possible spilling and inserting the record, we 
need to synchronize
     // it, because other threads may be spilling this writer at the same time.
     synchronized (CometDiskBlockWriter.this) {
-      if (activeWriter.numRecords() >= numElementsForSpillThreshold) {
+      if (activeWriter.numRecords() >= numElementsForSpillThreshold
+          || activeWriter.numRecords() >= columnarBatchSize) {
+        int threshold = Math.min(numElementsForSpillThreshold, 
columnarBatchSize);
         logger.info(
-            "Spilling data because number of spilledRecords crossed the 
threshold "
-                + numElementsForSpillThreshold);
+            "Spilling data because number of spilledRecords crossed the 
threshold " + threshold);
         // Spill the current writer
         doSpill(false);
         if (activeWriter.numRecords() != 0) {
@@ -347,7 +346,7 @@ public final class CometDiskBlockWriter {
     private final RowPartition rowPartition;
 
     ArrowIPCWriter() {
-      rowPartition = new RowPartition(initialBufferSize);
+      rowPartition = new RowPartition(columnarBatchSize);
 
       this.allocatedPages = new LinkedList<>();
       this.allocator = CometDiskBlockWriter.this.allocator;
diff --git 
a/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala 
b/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala
index 873e422..bce24be 100644
--- a/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala
+++ b/spark/src/main/scala/org/apache/spark/shuffle/sort/RowPartition.scala
@@ -22,8 +22,8 @@ package org.apache.spark.shuffle.sort
 import scala.collection.mutable.ArrayBuffer
 
 class RowPartition(initialSize: Int) {
-  private val rowAddresses: ArrayBuffer[Long] = new 
ArrayBuffer[Long](initialSize)
-  private val rowSizes: ArrayBuffer[Int] = new ArrayBuffer[Int](initialSize)
+  private var rowAddresses: ArrayBuffer[Long] = new 
ArrayBuffer[Long](initialSize)
+  private var rowSizes: ArrayBuffer[Int] = new ArrayBuffer[Int](initialSize)
 
   def addRow(addr: Long, size: Int): Unit = {
     rowAddresses += addr
@@ -36,7 +36,7 @@ class RowPartition(initialSize: Int) {
   def getRowSizes: Array[Int] = rowSizes.toArray
 
   def reset(): Unit = {
-    rowAddresses.clear()
-    rowSizes.clear()
+    rowAddresses = new ArrayBuffer[Long](initialSize)
+    rowSizes = new ArrayBuffer[Int](initialSize)
   }
 }

Reply via email to