This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 99d65db03b [VL] Support Velox's `preferred_output_batch_bytes` config 
(#10661)
99d65db03b is described below

commit 99d65db03b74383c3871ba7616e2393ea21da543
Author: kevinwilfong <[email protected]>
AuthorDate: Mon Sep 15 09:42:18 2025 -0700

    [VL] Support Velox's `preferred_output_batch_bytes` config (#10661)
---
 .../org/apache/gluten/config/VeloxConfig.scala     |  8 +++++
 .../gluten/datasource/ArrowCSVFileFormat.scala     |  4 ++-
 .../gluten/execution/RowToVeloxColumnarExec.scala  | 42 ++++++++++++++++------
 .../execution/ColumnarCachedBatchSerializer.scala  |  9 +++--
 .../gluten/columnarbatch/ColumnarBatchTest.java    |  3 +-
 .../gluten/execution/MiscOperatorSuite.scala       | 23 ++++++++++++
 cpp/velox/compute/WholeStageResultIterator.cc      |  2 ++
 cpp/velox/config/VeloxConfig.h                     |  3 ++
 .../org/apache/gluten/config/GlutenConfig.scala    |  3 +-
 9 files changed, 81 insertions(+), 16 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala 
b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
index b821d2d35a..461e6f3543 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala
@@ -80,6 +80,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
 
   def enableEnhancedFeatures(): Boolean = 
ConfigJniWrapper.isEnhancedFeaturesEnabled &&
     getConf(ENABLE_ENHANCED_FEATURES)
+
+  def veloxPreferredBatchBytes: Long = 
getConf(COLUMNAR_VELOX_PREFERRED_BATCH_BYTES)
 }
 
 object VeloxConfig {
@@ -654,4 +656,10 @@ object VeloxConfig {
       .doc("Enable some features including iceberg native write and other 
features.")
       .booleanConf
       .createWithDefault(true)
+
+  val COLUMNAR_VELOX_PREFERRED_BATCH_BYTES =
+    buildConf("spark.gluten.sql.columnar.backend.velox.preferredBatchBytes")
+      .internal()
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("10MB")
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
index 52f7756381..10b7e7801f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowCSVFileFormat.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.datasource
 
 import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.config.VeloxConfig
 import org.apache.gluten.exception.SchemaMismatchException
 import org.apache.gluten.execution.RowToVeloxColumnarExec
 import org.apache.gluten.iterator.Iterators
@@ -312,7 +313,8 @@ object ArrowCSVFileFormat {
     val veloxBatch = RowToVeloxColumnarExec.toColumnarBatchIterator(
       it,
       schema,
-      batchSize
+      batchSize,
+      VeloxConfig.get.veloxPreferredBatchBytes
     )
     veloxBatch
       .map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
v))
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index adb4aeca2a..f11de5894a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.runtime.Runtimes
@@ -49,6 +49,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBas
     val numOutputBatches = longMetric("numOutputBatches")
     val convertTime = longMetric("convertTime")
     val numRows = GlutenConfig.get.maxBatchSize
+    val numBytes = VeloxConfig.get.veloxPreferredBatchBytes
     // This avoids calling `schema` in the RDD closure, so that we don't need 
to include the entire
     // plan (this) in the closure.
     val localSchema = schema
@@ -60,7 +61,8 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBas
           numInputRows,
           numOutputBatches,
           convertTime,
-          numRows)
+          numRows,
+          numBytes)
     }
   }
 
@@ -69,6 +71,7 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBas
     val numOutputBatches = longMetric("numOutputBatches")
     val convertTime = longMetric("convertTime")
     val numRows = GlutenConfig.get.maxBatchSize
+    val numBytes = VeloxConfig.get.veloxPreferredBatchBytes
     val mode = BroadcastUtils.getBroadcastMode(outputPartitioning)
     val relation = child.executeBroadcast()
     BroadcastUtils.sparkToVeloxUnsafe(
@@ -83,7 +86,9 @@ case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBas
           numInputRows,
           numOutputBatches,
           convertTime,
-          numRows))
+          numRows,
+          numBytes)
+    )
   }
 
   // For spark 3.2.
@@ -96,7 +101,8 @@ object RowToVeloxColumnarExec {
   def toColumnarBatchIterator(
       it: Iterator[InternalRow],
       schema: StructType,
-      columnBatchSize: Int): Iterator[ColumnarBatch] = {
+      columnBatchSize: Int,
+      columnBatchBytes: Long): Iterator[ColumnarBatch] = {
     val numInputRows = new SQLMetric("numInputRows")
     val numOutputBatches = new SQLMetric("numOutputBatches")
     val convertTime = new SQLMetric("convertTime")
@@ -106,7 +112,8 @@ object RowToVeloxColumnarExec {
       numInputRows,
       numOutputBatches,
       convertTime,
-      columnBatchSize)
+      columnBatchSize,
+      columnBatchBytes)
   }
 
   def toColumnarBatchIterator(
@@ -115,7 +122,8 @@ object RowToVeloxColumnarExec {
       numInputRows: SQLMetric,
       numOutputBatches: SQLMetric,
       convertTime: SQLMetric,
-      columnBatchSize: Int): Iterator[ColumnarBatch] = {
+      columnBatchSize: Int,
+      columnBatchBytes: Long): Iterator[ColumnarBatch] = {
     if (it.isEmpty) {
       return Iterator.empty
     }
@@ -165,7 +173,7 @@ object RowToVeloxColumnarExec {
         val rowLength = new ListBuffer[Long]()
         var rowCount = 0
         var offset = 0L
-        while (rowCount < columnBatchSize && !finished) {
+        while (rowCount < columnBatchSize && offset < columnBatchBytes && 
!finished) {
           if (!it.hasNext) {
             finished = true
           } else {
@@ -180,14 +188,26 @@ object RowToVeloxColumnarExec {
               // maybe we should optimize to list ArrayBuf to native to avoid 
buf close and allocate
               // 31760L origins from 
BaseVariableWidthVector.lastValueAllocationSizeInBytes
               // experimental value
-              val estimatedBufSize = Math.max(
-                Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 31760L 
* columnBatchSize),
-                sizeInBytes.toDouble * 10)
+              val estimatedBufSize = Math.min(
+                Math.max(
+                  Math.min(sizeInBytes.toDouble * columnBatchSize * 1.2, 
31760L * columnBatchSize),
+                  sizeInBytes.toDouble * 10),
+                // Limit the size of the buffer to columnBatchBytes or the 
size of the first row,
+                // whichever is greater so we always have enough space for the 
first row.
+                Math.max(columnBatchBytes, sizeInBytes)
+              )
               arrowBuf = arrowAllocator.buffer(estimatedBufSize.toLong)
             }
 
             if ((offset + sizeInBytes) > arrowBuf.capacity()) {
-              val tmpBuf = arrowAllocator.buffer((offset + sizeInBytes) * 2)
+              val bufSize = if (offset + sizeInBytes > columnBatchBytes) {
+                // If adding the current row causes the batch size to exceed 
columnBatchBytes add
+                // just enough space to add the current row.
+                offset + sizeInBytes
+              } else {
+                Math.min((offset + sizeInBytes * 2), columnBatchBytes)
+              }
+              val tmpBuf = arrowAllocator.buffer(bufSize)
               tmpBuf.setBytes(0, arrowBuf, 0, offset)
               arrowBuf.close()
               arrowBuf = tmpBuf
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
index b5533695a0..f8d6bd886b 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
-import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -125,7 +125,12 @@ class ColumnarCachedBatchSerializer extends 
CachedBatchSerializer with Logging {
 
     val numRows = conf.columnBatchSize
     val rddColumnarBatch = input.mapPartitions {
-      it => RowToVeloxColumnarExec.toColumnarBatchIterator(it, localSchema, 
numRows)
+      it =>
+        RowToVeloxColumnarExec.toColumnarBatchIterator(
+          it,
+          localSchema,
+          numRows,
+          VeloxConfig.get.veloxPreferredBatchBytes)
     }
     convertColumnarBatchToCachedBatch(rddColumnarBatch, schema, storageLevel, 
conf)
   }
diff --git 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
index f02caf8f2d..fba8458a4b 100644
--- 
a/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
+++ 
b/backends-velox/src/test/java/org/apache/gluten/columnarbatch/ColumnarBatchTest.java
@@ -229,7 +229,8 @@ public class ColumnarBatchTest extends VeloxBackendTestBase 
{
               RowToVeloxColumnarExec.toColumnarBatchIterator(
                       
JavaConverters.<InternalRow>asScalaIterator(batch.rowIterator()),
                       structType,
-                      numRows)
+                      numRows,
+                      Integer.MAX_VALUE)
                   .next();
           Assert.assertEquals("[true,15]\n[false,14]", 
ColumnarBatches.toString(veloxBatch, 0, 2));
           Assert.assertEquals(
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index e14dd1fcb5..58a81af4a9 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -2158,4 +2158,27 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
         }
       })
   }
+
+  test("RowToVeloxColumnar preferredBatchBytes") {
+    Seq("1", "80", "100000000").foreach(
+      preferredBatchBytes => {
+        withSQLConf(
+          VeloxConfig.COLUMNAR_VELOX_PREFERRED_BATCH_BYTES.key -> 
preferredBatchBytes
+        ) {
+          val df = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 
10).toDF("Col").select($"Col".plus(1))
+          assert(df.collect().length == 10)
+          val ops = collect(df.queryExecution.executedPlan) { case p: 
RowToVeloxColumnarExec => p }
+          assert(ops.size == 1)
+          val op = ops.head
+          val metrics = op.metrics
+          // Each row consumes 16 bytes as an UnsafeRow.
+          val expectedNumBatches = preferredBatchBytes match {
+            case "1" => 10
+            case "80" => 2
+            case _ => 1
+          }
+          assert(metrics("numOutputBatches").value == expectedNumBatches)
+        }
+      })
+  }
 }
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 9b07e4f826..7e3af2a3fe 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -486,6 +486,8 @@ std::unordered_map<std::string, std::string> 
WholeStageResultIterator::getQueryC
       std::to_string(veloxCfg_->get<uint32_t>(kSparkBatchSize, 4096));
   configs[velox::core::QueryConfig::kMaxOutputBatchRows] =
       std::to_string(veloxCfg_->get<uint32_t>(kSparkBatchSize, 4096));
+  configs[velox::core::QueryConfig::kPreferredOutputBatchBytes] =
+      std::to_string(veloxCfg_->get<uint64_t>(kVeloxPreferredBatchBytes, 10L 
<< 20));
   try {
     configs[velox::core::QueryConfig::kSparkAnsiEnabled] = 
veloxCfg_->get<std::string>(kAnsiEnabled, "false");
     configs[velox::core::QueryConfig::kSessionTimezone] = 
veloxCfg_->get<std::string>(kSessionTimezone, "");
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index ca85cc65ec..e37c99987e 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -177,4 +177,7 @@ const std::string kCudfMemoryResourceDefault =
 const std::string kCudfMemoryPercent = 
"spark.gluten.sql.columnar.backend.velox.cudf.memoryPercent";
 const int32_t kCudfMemoryPercentDefault = 50;
 
+/// Preferred size of batches in bytes to be returned by operators.
+const std::string kVeloxPreferredBatchBytes = 
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes";
+
 } // namespace gluten
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index c2339dda43..3b8e7c6568 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -486,7 +486,8 @@ object GlutenConfig {
     "spark.gluten.sql.columnar.backend.velox.enableSystemExceptionStacktrace",
     "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages",
     "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
-    
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks"
+    
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
+    "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"
   )
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to