Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3424636587


##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t 
numDistinct) const {
 
 void HashTableBuilder::abandonHashBuildDedup() {
   abandonHashBuildDedup_ = true;
-  uniqueTable_->setAllowDuplicates(true);
+  // uniqueTable_->setAllowDuplicates(true);

Review Comment:
   `abandonHashBuildDedup()` marks dedup as abandoned, but commenting out 
`uniqueTable_->setAllowDuplicates(true)` leaves the hash table in a 
no-duplicates mode. That can silently drop duplicate build rows for the same 
key, producing wrong join results once dedup is abandoned.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         offload,
         buildThreadsValue)
     }
+
+    // Check if we should build hash table on driver (Spark-native approach)
+    // Only do this for HashedRelationBroadcastMode and when offload is enabled
+    val shouldBuildOnDriver = 
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+      mode.isInstanceOf[HashedRelationBroadcastMode] &&
+      offload
+
+    if (shouldBuildOnDriver) {
+      // Try to get broadcast join context from logical plan tag
+      // In multi-join scenarios, there may be multiple contexts. Find the one 
that matches
+      // the current broadcast child's output.
+      val joinContextOpt: Option[BroadcastJoinContextInfo] =
+        findLogicalLink(child).flatMap {
+          logicalPlan =>
+            logicalPlan.getTagValue(
+              BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+            ).flatMap {
+              contexts =>
+                val childOutputSet = AttributeSet(newOutput)
+                // Find the context whose build output matches the child's 
output
+                contexts.find {
+                  ctx =>
+                    val buildOutputMatches = 
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+                      ctx.buildOutputSet.subsetOf(childOutputSet)
+                    buildOutputMatches
+                }
+            }
+        }
+
+      joinContextOpt match {
+        case Some(joinContext) =>
+          // We have join context information - build hash table on driver
+          logInfo(
+            s"Building hash table on driver in BroadcastExchangeExec " +
+              s"with join context: $joinContext")
+
+          // Create a broadcast ID for this hash table
+          val broadcastId = 
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+          // Convert Spark JoinType to Substrait JoinType
+          val substraitJoinType = joinContext.joinType match {
+            case _: InnerLike =>
+              JoinRel.JoinType.JOIN_TYPE_INNER
+            case FullOuter =>
+              JoinRel.JoinType.JOIN_TYPE_OUTER
+            case LeftOuter |
+                RightOuter =>
+              JoinRel.JoinType.JOIN_TYPE_LEFT
+            case LeftSemi |

Review Comment:
   `RightOuter` is currently mapped to `JOIN_TYPE_LEFT`, but the native build 
path converts `JOIN_TYPE_RIGHT` to `velox::core::JoinType::kRight` (see 
JniHashTable.cc). Using `JOIN_TYPE_LEFT` for right outer joins can build a hash 
table with the wrong join semantics and lead to incorrect results when 
driver-side build is enabled.



##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
   return JniHashTableContext::getInstance().callJavaGet(hashTableId);
 }
 
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    return HashTableSerializer::serializedSize<false>(hashTableFalse);
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* 
data, size_t size) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+  VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+    return;
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+    const uint8_t* data,
+    size_t size,
+    facebook::velox::memory::MemoryPool* memoryPool,
+    bool ignoreNullKeys,
+    bool joinHasNullKeys) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+  auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : 
defaultLeafVeloxMemoryPool();

Review Comment:
   `HashTableBuilder` stores the provided memory pool as a raw pointer 
(`pool_`). When `memoryPool` is non-null, `addLeafChild()` returns a new pool 
owned only by the returned `shared_ptr`; if that `shared_ptr` doesn't outlive 
the builder, this becomes a potential use-after-free. Prefer using the provided 
`memoryPool` directly (caller-owned), or ensure the child pool's lifetime is 
tied to the returned builder.



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable

Review Comment:
   This switches the default Velox repo/branch to a personal fork 
(`JkSelf/velox`). That makes builds non-reproducible and risks breakage if the 
fork/branch is deleted or rebased. Defaults should point to an 
official/upstream repo/branch; contributors can still override via 
`--velox_repo/--velox_branch` when testing.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+    var serializedData: UnsafeByteArray,
+    var numRows: Long,
+    var ignoreNullKeys: Boolean,
+    var joinHasNullKeys: Boolean,
+    var bloomFilterBlocksByteSize: Long,
+    var hashProbeDynamicFiltersProduced: Long,
+    var buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }

Review Comment:
   `SerializedBroadcastHashTable` serializes and broadcasts `buildSideRelation` 
along with the serialized hash table bytes. For `ColumnarBuildSideRelation` / 
`UnsafeColumnarBuildSideRelation`, that likely includes the original 
broadcasted batch data, which can substantially inflate broadcast size and 
memory (raw data + serialized hash table). Consider minimizing what gets 
serialized here (e.g., store only what DPP needs, or gate serializing the 
build-side payload behind a config).



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -55,6 +60,23 @@ object VeloxBroadcastBuildSideCache
       .removalListener(this)
       .build[String, BroadcastHashTable]()
 
+  // Cache for driver-side serialized hash tables to avoid rebuilding for 
reuse exchange
+  private val driverSerializedCache: Cache[String, 
SerializedBroadcastHashTable] =
+    Caffeine.newBuilder
+      .expireAfterAccess(expiredTime, TimeUnit.SECONDS)
+      .removalListener(
+        new RemovalListener[String, SerializedBroadcastHashTable] {
+          override def onRemoval(
+              key: String,
+              value: SerializedBroadcastHashTable,
+              cause: RemovalCause): Unit = {
+            if (value != null && value.serializedData != null) {
+              value.serializedData.release()
+            }

Review Comment:
   Releasing `value.serializedData` from the Caffeine eviction listener is 
unsafe because the cached `SerializedBroadcastHashTable` instance is also 
returned and embedded in the broadcast value. If the cache entry expires/evicts 
while the broadcast is still in use (or in local mode where tasks may share the 
same JVM object), this will null out the buffer and break 
deserialization/probing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to