Copilot commented on code in PR #12259:
URL: https://github.com/apache/gluten/pull/12259#discussion_r3377769934


##########
docs/velox-configuration.md:
##########
@@ -81,6 +81,7 @@ nav_order: 16
 | spark.gluten.sql.enable.enhancedFeatures                                     
    | ๐Ÿ”„ Dynamic    | true              | Enable some features including iceberg 
native write and other features.                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                   
                 |
 | spark.gluten.sql.rewrite.castArrayToString                                   
    | ๐Ÿ”„ Dynamic    | true              | When true, rewrite `cast(array as 
String)` to `concat('[', array_join(array, ', ', null), ']')` to allow 
offloading to Velox.                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                 
                 |
 | spark.gluten.velox.broadcast.build.targetBytesPerThread                      
    | โš“ Static      | 32MB              | It is used to calculate the number of 
hash table build threads. Based on our testing across various thresholds (1MB 
to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided 
the most significant performance gains.                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                 |
+| spark.gluten.velox.broadcastBuild.mergeBatches                               
    | ๐Ÿ”„ Dynamic    | false             | If enabled, all columnar batches in a 
broadcast build relation will be serialized into a single buffer to reduce the 
number of addInput calls in HashBuild operator. This can significantly improve 
BHJ performance when the broadcast table has many small batches.                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                      
                 |

Review Comment:
   The new config description highlights the performance benefit but doesnโ€™t 
mention the trade-off that merging batches increases driver-side peak memory 
(and can reintroduce OOM risk) compared to per-batch serialization. It would be 
helpful to document this explicitly since itโ€™s the main reason the default 
remains false.



##########
backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala:
##########
@@ -617,6 +620,16 @@ object VeloxConfig extends ConfigRegistry {
       .booleanConf
       .createWithDefault(false)
 
+  val VELOX_BROADCAST_BUILD_MERGE_BATCHES =
+    buildConf("spark.gluten.velox.broadcastBuild.mergeBatches")
+      .doc(
+        "If enabled, all columnar batches in a broadcast build relation will 
be " +
+          "serialized into a single buffer to reduce the number of addInput 
calls in " +
+          "HashBuild operator. This can significantly improve BHJ performance 
when " +
+          "the broadcast table has many small batches.")

Review Comment:
   This config doc should also call out the key trade-off (higher peak driver 
memory / potential OOM) so users understand why the default is false and when 
not to enable it.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala:
##########
@@ -150,13 +150,56 @@ object BroadcastUtils {
   }
 
   def serializeStream(batches: Iterator[ColumnarBatch]): 
ColumnarBatchSerializeResult = {
+    val mergeBatches = VeloxConfig.get.broadcastBuildMergeBatches
     val filtered = batches
       .filter(_.numRows() != 0)
       .map(
         b => {
           ColumnarBatches.retain(b)
           b
         })
+
+    if (mergeBatches) {
+      serializeStreamMerged(filtered)
+    } else {
+      serializeStreamPerBatch(filtered)
+    }
+  }
+
+  private def serializeStreamMerged(
+      filtered: Iterator[ColumnarBatch]): ColumnarBatchSerializeResult = {
+    var numRows: Long = 0
+    val handles = new ArrayBuffer[Long]()
+    val retainedBatches = new ArrayBuffer[ColumnarBatch]()
+    filtered.foreach {
+      b =>
+        numRows += b.numRows()
+        handles += 
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, b)
+        retainedBatches += b
+    }
+    if (handles.nonEmpty) {
+      try {
+        val jniWrapper = ColumnarBatchSerializerJniWrapper
+          .create(
+            Runtimes
+              .contextInstance(BackendsApiManager.getBackendName, 
"BroadcastUtils#serializeStream"))
+        val merged = jniWrapper.serializeAll(handles.toArray)
+        val useOffheapBroadcastBuildRelation =
+          VeloxConfig.get.enableBroadcastBuildRelationInOffheap
+        new ColumnarBatchSerializeResult(
+          useOffheapBroadcastBuildRelation,
+          numRows,
+          java.util.Collections.singletonList(merged))

Review Comment:
   When mergeBatches is enabled, serializeAll can produce a single buffer 
larger than 2GB. Downstream broadcast serialization paths (byte[] / 
UnsafeByteArray) have an Integer.MAX_VALUE size limit, so this merged path can 
fail even when the per-batch path would succeed. Consider falling back to 
per-batch serialization when the merged payload exceeds the 2GB limit (and free 
the merged buffer to avoid leaking the ArrowBuf).



##########
cpp/core/jni/JniWrapper.cc:
##########
@@ -1300,6 +1300,33 @@ JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchSeriali
   JNI_METHOD_END(nullptr)
 }
 
+JNIEXPORT jobject JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_serializeAll(
 // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlongArray handles) {
+  JNI_METHOD_START
+  auto ctx = getRuntime(env, wrapper);
+  int32_t numBatches = env->GetArrayLength(handles);
+  GLUTEN_DCHECK(numBatches > 0, "serializeAll requires at least one batch");

Review Comment:
   serializeAll currently relies on GLUTEN_DCHECK to enforce a non-empty 
handles array. In release builds DCHECKs may be compiled out, which could lead 
to undefined behavior if serializeAll is ever called with an empty (or null) 
array. Prefer GLUTEN_CHECK here to fail fast with a clear exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to