Copilot commented on code in PR #12259:
URL: https://github.com/apache/gluten/pull/12259#discussion_r3377769934
##########
docs/velox-configuration.md:
##########
@@ -81,6 +81,7 @@ nav_order: 16
| spark.gluten.sql.enable.enhancedFeatures
| ๐ Dynamic | true | Enable some features including iceberg
native write and other features.
|
| spark.gluten.sql.rewrite.castArrayToString
| ๐ Dynamic | true | When true, rewrite `cast(array as
String)` to `concat('[', array_join(array, ', ', null), ']')` to allow
offloading to Velox.
|
| spark.gluten.velox.broadcast.build.targetBytesPerThread
| โ Static | 32MB | It is used to calculate the number of
hash table build threads. Based on our testing across various thresholds (1MB
to 128MB), we recommend a value of 32MB or 64MB, as these consistently provided
the most significant performance gains.
|
+| spark.gluten.velox.broadcastBuild.mergeBatches
| ๐ Dynamic | false | If enabled, all columnar batches in a
broadcast build relation will be serialized into a single buffer to reduce the
number of addInput calls in HashBuild operator. This can significantly improve
BHJ performance when the broadcast table has many small batches.
|
Review Comment:
The new config description highlights the performance benefit but doesnโt
mention the trade-off that merging batches increases driver-side peak memory
(and can reintroduce OOM risk) compared to per-batch serialization. It would be
helpful to document this explicitly since itโs the main reason the default
remains false.
##########
backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala:
##########
@@ -617,6 +620,16 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)
+ val VELOX_BROADCAST_BUILD_MERGE_BATCHES =
+ buildConf("spark.gluten.velox.broadcastBuild.mergeBatches")
+ .doc(
+ "If enabled, all columnar batches in a broadcast build relation will
be " +
+ "serialized into a single buffer to reduce the number of addInput
calls in " +
+ "HashBuild operator. This can significantly improve BHJ performance
when " +
+ "the broadcast table has many small batches.")
Review Comment:
This config doc should also call out the key trade-off (higher peak driver
memory / potential OOM) so users understand why the default is false and when
not to enable it.
##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/BroadcastUtils.scala:
##########
@@ -150,13 +150,56 @@ object BroadcastUtils {
}
def serializeStream(batches: Iterator[ColumnarBatch]):
ColumnarBatchSerializeResult = {
+ val mergeBatches = VeloxConfig.get.broadcastBuildMergeBatches
val filtered = batches
.filter(_.numRows() != 0)
.map(
b => {
ColumnarBatches.retain(b)
b
})
+
+ if (mergeBatches) {
+ serializeStreamMerged(filtered)
+ } else {
+ serializeStreamPerBatch(filtered)
+ }
+ }
+
+ private def serializeStreamMerged(
+ filtered: Iterator[ColumnarBatch]): ColumnarBatchSerializeResult = {
+ var numRows: Long = 0
+ val handles = new ArrayBuffer[Long]()
+ val retainedBatches = new ArrayBuffer[ColumnarBatch]()
+ filtered.foreach {
+ b =>
+ numRows += b.numRows()
+ handles +=
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, b)
+ retainedBatches += b
+ }
+ if (handles.nonEmpty) {
+ try {
+ val jniWrapper = ColumnarBatchSerializerJniWrapper
+ .create(
+ Runtimes
+ .contextInstance(BackendsApiManager.getBackendName,
"BroadcastUtils#serializeStream"))
+ val merged = jniWrapper.serializeAll(handles.toArray)
+ val useOffheapBroadcastBuildRelation =
+ VeloxConfig.get.enableBroadcastBuildRelationInOffheap
+ new ColumnarBatchSerializeResult(
+ useOffheapBroadcastBuildRelation,
+ numRows,
+ java.util.Collections.singletonList(merged))
Review Comment:
When mergeBatches is enabled, serializeAll can produce a single buffer
larger than 2GB. Downstream broadcast serialization paths (byte[] /
UnsafeByteArray) have an Integer.MAX_VALUE size limit, so this merged path can
fail even when the per-batch path would succeed. Consider falling back to
per-batch serialization when the merged payload exceeds the 2GB limit (and free
the merged buffer to avoid leaking the ArrowBuf).
##########
cpp/core/jni/JniWrapper.cc:
##########
@@ -1300,6 +1300,33 @@ JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchSeriali
JNI_METHOD_END(nullptr)
}
+JNIEXPORT jobject JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchSerializerJniWrapper_serializeAll(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlongArray handles) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ int32_t numBatches = env->GetArrayLength(handles);
+ GLUTEN_DCHECK(numBatches > 0, "serializeAll requires at least one batch");
Review Comment:
serializeAll currently relies on GLUTEN_DCHECK to enforce a non-empty
handles array. In release builds DCHECKs may be compiled out, which could lead
to undefined behavior if serializeAll is ever called with an empty (or null)
array. Prefer GLUTEN_CHECK here to fail fast with a clear exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]