Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3509980953
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
offload,
buildThreadsValue)
}
+
+ // Check if we should build hash table on driver (Spark-native approach)
+ // Only do this for HashedRelationBroadcastMode and when offload is enabled
+ val shouldBuildOnDriver =
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+ mode.isInstanceOf[HashedRelationBroadcastMode] &&
+ offload
+
+ if (shouldBuildOnDriver) {
+ // Try to get broadcast join context from logical plan tag
+ // In multi-join scenarios, there may be multiple contexts. Find the one
that matches
+ // the current broadcast child's output.
+ val joinContextOpt: Option[BroadcastJoinContextInfo] =
+ findLogicalLink(child).flatMap {
+ logicalPlan =>
+ logicalPlan.getTagValue(
+ BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+ ).flatMap {
+ contexts =>
+ val childOutputSet = AttributeSet(newOutput)
+ // Find the context whose build output matches the child's
output
+ contexts.find {
+ ctx =>
+ val buildOutputMatches =
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+ ctx.buildOutputSet.subsetOf(childOutputSet)
+ buildOutputMatches
+ }
Review Comment:
`contexts.find` selects the first matching `BroadcastJoinContextInfo` when
multiple joins share the same broadcast build-side output. With exchange reuse,
this can pick the wrong join context (different join keys / conditions),
causing the driver-built hash table to be constructed with incorrect keys and
producing wrong results. Prefer disabling driver-side build when the context
match is ambiguous (0 or >1 matches), and only proceed when exactly one context
matches the build-side output.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+ var serializedData: UnsafeByteArray,
+ var numRows: Long,
+ var ignoreNullKeys: Boolean,
+ var joinHasNullKeys: Boolean,
+ var droppedDuplicates: Boolean,
+ var bloomFilterBlocksByteSize: Long,
+ var hashProbeDynamicFiltersProduced: Long,
+ var buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeBoolean(droppedDuplicates)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ serializedData.writeExternal(out)
+ out.writeObject(buildSideRelation)
+ }
+
+ override def readExternal(in: ObjectInput): Unit = {
+ numRows = in.readLong()
+ ignoreNullKeys = in.readBoolean()
+ joinHasNullKeys = in.readBoolean()
+ droppedDuplicates = in.readBoolean()
+ bloomFilterBlocksByteSize = in.readLong()
+ hashProbeDynamicFiltersProduced = in.readLong()
+ val data = new UnsafeByteArray()
+ data.readExternal(in)
+ serializedData = data
+ buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation]
+ }
+
+ /**
+ * Deserialize the hash table on executor side. The serialized Velox hash
table is already in a
+ * prepared, probe-ready form, so executor side only needs deserialization
without re-running
+ * prepareJoinTable.
+ *
+ * @return
+ * Hash table builder handle
+ */
+ def deserialize(): Long = {
+ HashJoinBuilder.deserializeHashTableDirect(
+ serializedData.address(),
+ Math.toIntExact(serializedData.size()),
+ ignoreNullKeys,
+ joinHasNullKeys)
+ }
+
+ /** Get the size of serialized data in bytes. */
+ def sizeInBytes: Long = serializedData.size()
+}
+
+object SerializedBroadcastHashTable {
+ def apply(
+ serializedData: UnsafeByteArray,
+ numRows: Long,
+ ignoreNullKeys: Boolean,
+ joinHasNullKeys: Boolean,
+ droppedDuplicates: Boolean,
+ bloomFilterBlocksByteSize: Long,
+ hashProbeDynamicFiltersProduced: Long,
+ buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable =
+ new SerializedBroadcastHashTable(
+ serializedData,
+ numRows,
+ ignoreNullKeys,
+ joinHasNullKeys,
+ droppedDuplicates,
+ bloomFilterBlocksByteSize,
+ hashProbeDynamicFiltersProduced,
+ buildSideRelation)
+
+ /**
+ * Build and serialize a hash table on the driver.
+ *
+ * @param hashTableHandle
+ * Handle to the built hash table
+ * @param buildSideRelation
+ * The build side relation for metadata
+ * @return
+ * Serialized broadcast hash table
+ */
+ def fromHashTable(
+ hashTableHandle: Long,
+ buildSideRelation: BuildSideRelation,
+ droppedDuplicates: Boolean,
+ numRows: Long): SerializedBroadcastHashTable = {
+ try {
+ val serializedSize =
HashJoinBuilder.serializedHashTableSizeDirect(hashTableHandle)
+ val byteBuffer = JniUnsafeByteBuffer.allocate(serializedSize)
+ HashJoinBuilder.serializeHashTableDirect(
+ hashTableHandle,
+ byteBuffer.address(),
+ byteBuffer.size())
Review Comment:
`SerializedBroadcastHashTable` ultimately deserializes using an `int` size
(see `deserialize()` calling `Math.toIntExact(serializedData.size())`), but
`fromHashTable` allows allocating/serializing arbitrarily large buffers via
`JniUnsafeByteBuffer.allocate(serializedSize)`. Add an explicit upper bound
check to fail fast with a clear error when the serialized hash table exceeds
`Int.MaxValue` bytes (otherwise serialization/deserialization can overflow or
throw at runtime).
##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -58,6 +66,23 @@ object VeloxBroadcastBuildSideCache
.removalListener(this)
.build[String, BroadcastHashTable]()
+ // Cache for driver-side serialized hash tables to avoid rebuilding for
reuse exchange
+ private val driverSerializedCache: Cache[String,
SerializedBroadcastHashTable] =
+ Caffeine.newBuilder
+ .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+ .removalListener(
+ new RemovalListener[String, SerializedBroadcastHashTable] {
+ override def onRemoval(
+ key: String,
+ value: SerializedBroadcastHashTable,
+ cause: RemovalCause): Unit = {
+ if (value != null && value.serializedData != null) {
+ value.serializedData.release()
+ }
+ }
+ }
Review Comment:
`driverSerializedCache` evicts entries by calling
`value.serializedData.release()`, but the cached `SerializedBroadcastHashTable`
instance is also returned and embedded in the Spark broadcast variable.
Evicting the cache while the broadcast is still live will close the underlying
ArrowBuf and can trigger use-after-free / crashes when executors fetch or
deserialize the broadcast.
##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.JniUnsafeByteBuffer
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+ var serializedData: UnsafeByteArray,
+ var numRows: Long,
+ var ignoreNullKeys: Boolean,
+ var joinHasNullKeys: Boolean,
+ var droppedDuplicates: Boolean,
+ var bloomFilterBlocksByteSize: Long,
+ var hashProbeDynamicFiltersProduced: Long,
+ var buildSideRelation: BuildSideRelation)
+ extends Externalizable {
+
+ def this() = this(null, 0, false, false, false, 0, 0, null) // Required for
Externalizable
+
+ override def writeExternal(out: ObjectOutput): Unit = {
+ out.writeLong(numRows)
+ out.writeBoolean(ignoreNullKeys)
+ out.writeBoolean(joinHasNullKeys)
+ out.writeBoolean(droppedDuplicates)
+ out.writeLong(bloomFilterBlocksByteSize)
+ out.writeLong(hashProbeDynamicFiltersProduced)
+ serializedData.writeExternal(out)
+ out.writeObject(buildSideRelation)
+ }
Review Comment:
`SerializedBroadcastHashTable.writeExternal` serializes both the serialized
hash table bytes and the full `buildSideRelation`. For
`ColumnarBuildSideRelation`/`UnsafeColumnarBuildSideRelation`, that relation
itself contains the full broadcasted build-side data, so this increases
broadcast payload size (raw data + serialized hash table) and can negate some
of the intended benefit / risk hitting broadcast size limits. Consider
replacing `buildSideRelation` with a lightweight metadata-only relation, or
persisting only what’s needed for `transform()`/DPP rather than the full
build-side batches.
##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,8 +17,8 @@
set -exu
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_30
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_30_hashtable_ser
VELOX_ENHANCED_BRANCH=ibm-2026_06_30
Review Comment:
The default `VELOX_REPO`/`VELOX_BRANCH` is changed to a personal
fork/branch. This makes builds non-reproducible for other contributors and CI,
and is risky for an Apache project. Prefer keeping an upstream/default repo
(e.g., IBM/velox) and use `--velox_repo/--velox_branch` (or
`UPSTREAM_VELOX_PR_ID`) when testing a fork/PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]