Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3410635311


##########
cpp/velox/jni/VeloxJniWrapper.cc:
##########
@@ -1162,6 +1171,115 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHa
   ObjectStore::release(tableHandler);
   JNI_METHOD_END()
 }
+
+JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_vectorized_HashJoinBuilder_serializeHashTable( // NOLINT
+    JNIEnv* env,
+    jclass,
+    jlong hashTableHandle) {
+  JNI_METHOD_START
+  auto builder = 
ObjectStore::retrieve<gluten::HashTableBuilder>(hashTableHandle);
+  auto serialized = gluten::serializeHashTable(builder);
+  return gluten::getHashTableObjStore()->save(serialized);
+  JNI_METHOD_END(kInvalidObjectHandle)
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_gluten_vectorized_HashJoinBuilder_deserializeHashTableWithIgnoreNullKeys(
 // NOLINT
+    JNIEnv* env,
+    jclass,
+    jbyteArray serializedData,
+    jboolean ignoreNullKeys,
+    jboolean joinHasNullKeys) {
+  JNI_METHOD_START
+
+  jsize dataSize = env->GetArrayLength(serializedData);
+  jbyte* dataPtr = env->GetByteArrayElements(serializedData, nullptr);
+
+  if (dataPtr == nullptr) {
+    throw gluten::GlutenException("Failed to get serialized data");
+  }
+
+  auto builder = gluten::deserializeHashTable(
+      reinterpret_cast<const uint8_t*>(dataPtr),
+      static_cast<size_t>(dataSize),
+      nullptr,
+      static_cast<bool>(ignoreNullKeys),
+      static_cast<bool>(joinHasNullKeys));
+
+  env->ReleaseByteArrayElements(serializedData, dataPtr, JNI_ABORT);
+
+  return gluten::getHashTableObjStore()->save(builder);

Review Comment:
   `GetByteArrayElements` needs to be paired with a release even if 
`deserializeHashTable(...)` throws; otherwise the JVM byte[] can remain pinned 
and leak until GC. This file already uses `getByteArrayElementsSafe` elsewhere 
and can reuse it here for RAII-style release.



##########
cpp/velox/operators/hashjoin/HashTableSerializer.cc:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/hashjoin/HashTableSerializer.h"
+#include <cstring>
+#include <sstream>
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten {
+
+template <bool ignoreNullKeys>
+HashTableSerializer::SerializedHashTable HashTableSerializer::serialize(
+    const facebook::velox::exec::HashTable<ignoreNullKeys>* hashTable) {
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  std::ostringstream oss(std::ios::binary);
+
+  hashTable->serialize(oss);

Review Comment:
   `std::ostringstream` is constructed with `std::ios::binary` only, which 
omits `std::ios::out`. This can leave the stream not opened for output and 
produce an empty serialization buffer.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+case class SerializedBroadcastHashTable(
+    serializedData: Array[Byte],
+    numRows: Long,
+    ignoreNullKeys: Boolean,
+    joinHasNullKeys: Boolean,
+    bloomFilterBlocksByteSize: Long,
+    hashProbeDynamicFiltersProduced: Long,
+    buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    out.writeInt(serializedData.length)
+    out.write(serializedData)
+    out.writeObject(buildSideRelation)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    val numRows = in.readLong()
+    val ignoreNullKeys = in.readBoolean()
+    val joinHasNullKeys = in.readBoolean()
+    val bloomFilterBlocksByteSize = in.readLong()
+    val hashProbeDynamicFiltersProduced = in.readLong()
+    val dataLength = in.readInt()
+    val data = new Array[Byte](dataLength)
+    in.readFully(data)
+    val relation = in.readObject().asInstanceOf[BuildSideRelation]
+
+    // Use reflection to set final fields
+    val numRowsField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("numRows")
+    numRowsField.setAccessible(true)
+    numRowsField.set(this, numRows)
+
+    val dataField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("serializedData")
+    dataField.setAccessible(true)
+    dataField.set(this, data)
+
+    val relationField = 
classOf[SerializedBroadcastHashTable].getDeclaredField("buildSideRelation")
+    relationField.setAccessible(true)
+    relationField.set(this, relation)
+
+    val ignoreNullKeysField =
+      classOf[SerializedBroadcastHashTable].getDeclaredField("ignoreNullKeys")
+    ignoreNullKeysField.setAccessible(true)
+    ignoreNullKeysField.set(this, ignoreNullKeys)
+
+    val joinHasNullKeysField =
+      classOf[SerializedBroadcastHashTable].getDeclaredField("joinHasNullKeys")
+    joinHasNullKeysField.setAccessible(true)
+    joinHasNullKeysField.set(this, joinHasNullKeys)
+
+    val bloomFilterBlocksByteSizeField =
+      
classOf[SerializedBroadcastHashTable].getDeclaredField("bloomFilterBlocksByteSize")
+    bloomFilterBlocksByteSizeField.setAccessible(true)
+    bloomFilterBlocksByteSizeField.set(this, bloomFilterBlocksByteSize)
+
+    val hashProbeDynamicFiltersProducedField =
+      
classOf[SerializedBroadcastHashTable].getDeclaredField("hashProbeDynamicFiltersProduced")
+    hashProbeDynamicFiltersProducedField.setAccessible(true)
+    hashProbeDynamicFiltersProducedField.set(this, 
hashProbeDynamicFiltersProduced)
+  }
+
+  /**
+   * Deserialize the hash table on executor side. The serialized Velox hash 
table is already in a
+   * prepared, probe-ready form, so executor side only needs deserialization 
without re-running
+   * prepareJoinTable.
+   *
+   * @return
+   *   Hash table builder handle
+   */
+  def deserialize(): Long = {
+    HashJoinBuilder.deserializeHashTableWithIgnoreNullKeys(
+      serializedData,
+      ignoreNullKeys,
+      joinHasNullKeys)
+  }
+
+  /** Get the size of serialized data in bytes. */
+  def sizeInBytes: Long = serializedData.length.toLong
+}
+
+object SerializedBroadcastHashTable {
+
+  /**
+   * Build and serialize a hash table on the driver.
+   *
+   * @param hashTableHandle
+   *   Handle to the built hash table
+   * @param buildSideRelation
+   *   The build side relation for metadata
+   * @return
+   *   Serialized broadcast hash table
+   */
+  def fromHashTable(
+      hashTableHandle: Long,
+      buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable = {
+
+    // Serialize the hash table
+    val serializedHandle = HashJoinBuilder.serializeHashTable(hashTableHandle)
+
+    try {
+      // Get serialized data
+      val serializedData = HashJoinBuilder
+        .getSerializedData(serializedHandle)
+      val numRows = HashJoinBuilder
+        .getSerializedSize(serializedHandle)
+      val ignoreNullKeys = HashJoinBuilder
+        .getSerializedIgnoreNullKeys(serializedHandle)
+      val joinHasNullKeys = HashJoinBuilder
+        .getSerializedJoinHasNullKeys(serializedHandle)

Review Comment:
   `numRows` is populated using `HashJoinBuilder.getSerializedSize`, but the 
native method returns the serialized byte size (see JNI implementation). This 
misreports bytes as row count and makes logs/metrics misleading.



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala:
##########
@@ -238,6 +238,91 @@ case class ColumnarBuildSideRelation(
       }
     }
 
+  /**
+   * Build hash table with provided runtime (for driver-side build). This 
version doesn't rely on
+   * TaskContext and can be called from the driver.
+   */
+  def buildHashTableWithRuntime(
+      broadcastContext: BroadcastHashJoinContext,
+      runtime: org.apache.gluten.runtime.Runtime): (Long, 
ColumnarBuildSideRelation) =
+    synchronized {
+      if (hashTableData == 0) {
+        val startTime = System.nanoTime()
+        val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
+        val serializeHandle: Long = {
+          val allocator = ArrowBufferAllocators.globalInstance()
+          val cSchema = ArrowSchema.allocateNew(allocator)
+          val arrowSchema = SparkArrowUtil.toArrowSchema(
+            SparkShimLoader.getSparkShims.structFromAttributes(output),
+            SQLConf.get.sessionLocalTimeZone)
+          ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
+          val handle = jniWrapper
+            .init(cSchema.memoryAddress())
+          cSchema.close()
+          handle
+        }
+
+        val batchArray = new ArrayBuffer[Long]
+
+        var batchId = 0
+        while (batchId < batches.size) {
+          batchArray.append(jniWrapper.deserialize(serializeHandle, 
batches(batchId)))
+          batchId += 1
+        }
+
+        logDebug(
+          s"BHJ value size: " +
+            s"${broadcastContext.buildHashTableId} = ${batches.length}")
+
+        val (keys, newOutput) = if (newBuildKeys.isEmpty) {
+          (
+            broadcastContext.buildSideJoinKeys.asJava,
+            broadcastContext.buildSideStructure.asJava
+          )
+        } else {
+          (
+            newBuildKeys.asJava,
+            output.asJava
+          )
+        }
+
+        val joinKeys = keys.asScala.map {
+          key =>
+            val attr = ConverterUtils.getAttrFromExpr(key)
+            ConverterUtils.genColumnNameWithExprId(attr)
+        }.toArray
+
+        val hashJoinBuilder = HashJoinBuilder.create(runtime)
+
+        // Build the hash table
+        hashTableData = hashJoinBuilder
+          .nativeBuild(
+            broadcastContext.buildHashTableId,
+            batchArray.toArray,
+            joinKeys,
+            broadcastContext.filterBuildColumns,
+            broadcastContext.filterPropagatesNulls,
+            broadcastContext.substraitJoinType.ordinal(),
+            broadcastContext.hasMixedFiltCondition,
+            broadcastContext.isExistenceJoin,
+            SubstraitUtil.toNameStruct(newOutput).toByteArray,
+            broadcastContext.isNullAwareAntiJoin,
+            broadcastContext.bloomFilterPushdownSize,
+            buildThreads
+          )
+
+        jniWrapper.close(serializeHandle)
+

Review Comment:
   `serializeHandle` is closed only on the success path. If `nativeBuild(...)` 
throws, the serializer handle (and potentially native resources) will leak. 
Wrap the build in a try/finally to ensure `jniWrapper.close(serializeHandle)` 
always runs.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -75,6 +86,108 @@ object VeloxBroadcastBuildSideCache
       )
   }
 
+  /**
+   * Build hash table on driver and serialize for broadcasting. This version 
is called from
+   * BroadcastExchangeExec and doesn't need a broadcast variable.
+   *
+   * This is the Spark-native approach where hash table is built in 
BroadcastExchangeExec.
+   */
+  def buildAndSerializeOnDriverInBroadcastExchange(
+      relation: BuildSideRelation,
+      broadcastContext: BroadcastHashJoinContext): 
SerializedBroadcastHashTable = {
+
+    val broadcastId = broadcastContext.buildHashTableId
+
+    val cached = driverSerializedCache.getIfPresent(broadcastId)
+    if (cached != null) {
+      logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId")
+      return cached
+    }
+
+    def resetRelation(): Unit = relation match {
+      case r: ColumnarBuildSideRelation => r.reset()
+      case r: UnsafeColumnarBuildSideRelation => r.reset()
+      case _ =>
+    }
+
+    relation.synchronized {
+      val cachedAfterLock = driverSerializedCache.getIfPresent(broadcastId)
+      if (cachedAfterLock != null) {
+        logInfo(s"Reusing cached serialized hash table for broadcast ID: 
$broadcastId (after lock)")
+        return cachedAfterLock
+      }
+
+      logInfo(
+        s"Building hash table on driver in BroadcastExchangeExec " +
+          s"for broadcast ID: $broadcastId")
+
+      val backendName = BackendsApiManager.getBackendName
+
+      val runtime = org.apache.gluten.runtime.Runtime.createStandalone(
+        backendName,
+        "DriverBroadcastHashTableBuild"
+      )
+
+      try {
+        resetRelation()
+        val startBuildTime = System.currentTimeMillis()
+        val (hashTableHandle, _) = relation match {
+          case r: ColumnarBuildSideRelation =>
+            r.buildHashTableWithRuntime(broadcastContext, runtime)
+          case r: UnsafeColumnarBuildSideRelation =>
+            r.buildHashTableWithRuntime(broadcastContext, runtime)
+          case other =>
+            throw new IllegalArgumentException(
+              s"Unsupported relation type for driver-side build: 
${other.getClass.getName}")
+        }
+        try {
+          val buildTimeMs = System.currentTimeMillis() - startBuildTime
+          broadcastContext.buildHashTableTimeMetric.foreach(_ += buildTimeMs)
+          val startSerializeTime = System.currentTimeMillis()

Review Comment:
   `buildHashTableTimeMetric` is updated inside 
`buildHashTableWithRuntime(...)` already; updating it again here double-counts 
driver build time.



##########
cpp/velox/operators/hashjoin/HashTableSerializer.cc:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "operators/hashjoin/HashTableSerializer.h"
+#include <cstring>
+#include <sstream>
+#include "velox/common/base/Exceptions.h"
+
+namespace gluten {
+
+template <bool ignoreNullKeys>
+HashTableSerializer::SerializedHashTable HashTableSerializer::serialize(
+    const facebook::velox::exec::HashTable<ignoreNullKeys>* hashTable) {
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  std::ostringstream oss(std::ios::binary);
+
+  hashTable->serialize(oss);
+
+  SerializedHashTable result;
+  std::string str = oss.str();
+  result.size = str.size();
+  result.data = std::make_unique<uint8_t[]>(result.size);
+  std::memcpy(result.data.get(), str.data(), result.size);
+
+  return result;
+}
+
+template <bool ignoreNullKeys>
+std::unique_ptr<facebook::velox::exec::HashTable<ignoreNullKeys>>
+HashTableSerializer::deserialize(const uint8_t* data, size_t size, 
facebook::velox::memory::MemoryPool* pool) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid serialized data size");
+  VELOX_CHECK_NOT_NULL(pool, "Memory pool cannot be null");
+
+  std::string str(reinterpret_cast<const char*>(data), size);
+  std::istringstream iss(str, std::ios::binary);
+
+  return facebook::velox::exec::HashTable<ignoreNullKeys>::deserialize(iss, 
pool);

Review Comment:
   `std::istringstream` is constructed with `std::ios::binary` only, which 
omits `std::ios::in`. Use `binary|in` so the stream is opened for reading.



##########
backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala:
##########
@@ -129,8 +130,16 @@ class HashJoinMetricsUpdater(override val metrics: 
Map[String, SQLMetric])
     hashProbeSpilledPartitions += hashProbeMetrics.spilledPartitions
     hashProbeSpilledFiles += hashProbeMetrics.spilledFiles
     hashProbeReplacedWithDynamicFilterRows += 
hashProbeMetrics.numReplacedWithDynamicFilterRows
-    hashProbeDynamicFiltersProduced += 
hashProbeMetrics.numDynamicFiltersProduced
-    bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
+
+    // Only accumulate dynamic filter metrics when driver-side build is 
disabled.
+    // When driver-side build is enabled, these metrics are set directly from 
the
+    // serialized hash table in HashJoinExecTransformer to avoid double 
counting.
+    val isDriverSideBuildEnabled =
+      VeloxConfig.get.enableDriverSideBroadcastHashTableBuild
+    if (!isDriverSideBuildEnabled) {
+      hashProbeDynamicFiltersProduced += 
hashProbeMetrics.numDynamicFiltersProduced
+      bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize
+    }

Review Comment:
   Dynamic filter metrics are skipped based solely on the global config flag. 
If driver-side build is enabled but this particular join falls back to 
executor-side build (e.g. offload=false, non-hash broadcast mode, missing join 
context), the metrics will incorrectly remain 0.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +911,161 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         offload,
         buildThreadsValue)
     }
+
+    // Check if we should build hash table on driver (Spark-native approach)
+    // Only do this for HashedRelationBroadcastMode and when offload is enabled
+    val shouldBuildOnDriver = 
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+      mode.isInstanceOf[HashedRelationBroadcastMode] &&
+      offload
+
+    if (shouldBuildOnDriver) {
+      // Try to get broadcast join context from logical plan tag
+      // In multi-join scenarios, there may be multiple contexts. Find the one 
that matches
+      // the current broadcast child's output.
+      val joinContextOpt: 
Option[org.apache.gluten.extension.BroadcastJoinContextInfo] =
+        findLogicalLink(child).flatMap {
+          logicalPlan =>
+            logicalPlan.getTagValue(
+              
org.apache.gluten.extension.BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+            ).flatMap {
+              contexts =>
+                val childOutputSet = AttributeSet(newOutput)
+                // Find the context whose build output matches the child's 
output
+                contexts.find {
+                  ctx =>
+                    val buildOutputMatches = 
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+                      ctx.buildOutputSet.subsetOf(childOutputSet)
+                    buildOutputMatches
+                }
+            }
+        }
+
+      joinContextOpt match {
+        case Some(joinContext) =>
+          // We have join context information - build hash table on driver
+          logInfo(
+            s"Building hash table on driver in BroadcastExchangeExec " +
+              s"with join context: $joinContext")
+
+          // Create a broadcast ID for this hash table
+          val broadcastId = 
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+          // Convert Spark JoinType to Substrait JoinType
+          val substraitJoinType = joinContext.joinType match {
+            case _: org.apache.spark.sql.catalyst.plans.InnerLike =>
+              JoinRel.JoinType.JOIN_TYPE_INNER
+            case org.apache.spark.sql.catalyst.plans.FullOuter =>
+              JoinRel.JoinType.JOIN_TYPE_OUTER
+            case org.apache.spark.sql.catalyst.plans.LeftOuter |
+                org.apache.spark.sql.catalyst.plans.RightOuter =>
+              JoinRel.JoinType.JOIN_TYPE_LEFT
+            case org.apache.spark.sql.catalyst.plans.LeftSemi |
+                org.apache.spark.sql.catalyst.plans.ExistenceJoin(_) =>
+              JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
+            case org.apache.spark.sql.catalyst.plans.LeftAnti =>
+              JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+            case _ =>
+              JoinRel.JoinType.UNRECOGNIZED
+          }

Review Comment:
   Join type conversion for driver-side hash table build currently maps both 
LeftOuter and RightOuter to `JOIN_TYPE_LEFT`. This diverges from the existing 
`needSwitchChildren`-aware conversion in `HashJoinExecTransformer` and can pass 
the wrong join type into native hash table build when the build side is 
switched.



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable

Review Comment:
   The build script now defaults to cloning Velox from a personal fork 
(`github.com/JkSelf/velox.git`). This is risky for reproducibility and 
supply-chain/security; project build scripts should default to an 
official/upstream repo (or clearly vendor the required patch), and only use 
forks via explicit overrides.



##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t 
numDistinct) const {
 
 void HashTableBuilder::abandonHashBuildDedup() {
   abandonHashBuildDedup_ = true;
-  uniqueTable_->setAllowDuplicates(true);
+  // uniqueTable_->setAllowDuplicates(true);
   lookup_.reset();
 }

Review Comment:
   `abandonHashBuildDedup()` resets `lookup_` so the build path will start 
inserting rows directly. Commenting out `setAllowDuplicates(true)` leaves the 
table configured for de-dup, which can keep paying duplicate-check costs (and 
may violate assumptions of the non-dedup insert path).



##########
backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala:
##########
@@ -208,6 +210,92 @@ class UnsafeColumnarBuildSideRelation(
       }
     }
 
+  /**
+   * Build hash table with provided runtime (for driver-side build). This 
version doesn't rely on
+   * TaskContext and can be called from the driver.
+   */
+  def buildHashTableWithRuntime(
+      broadcastContext: BroadcastHashJoinContext,
+      runtime: org.apache.gluten.runtime.Runtime): (Long, BuildSideRelation) =
+    synchronized {
+      if (hashTableData == 0) {
+        val startTime = System.nanoTime()
+        val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime)
+        val serializeHandle: Long = {
+          val allocator = ArrowBufferAllocators.globalInstance()
+          val cSchema = ArrowSchema.allocateNew(allocator)
+          val arrowSchema = SparkArrowUtil.toArrowSchema(
+            SparkShimLoader.getSparkShims.structFromAttributes(output),
+            SQLConf.get.sessionLocalTimeZone)
+          ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema)
+          val handle = jniWrapper
+            .init(cSchema.memoryAddress())
+          cSchema.close()
+          handle
+        }
+
+        val batchArray = new ArrayBuffer[Long]
+
+        var batchId = 0
+        while (batchId < batches.size) {
+          val (offset, length) = (batches(batchId).address(), 
batches(batchId).size())
+          batchArray.append(jniWrapper.deserializeDirect(serializeHandle, 
offset, length.toInt))
+          batchId += 1
+        }
+
+        logDebug(
+          s"BHJ value size: " +
+            s"${broadcastContext.buildHashTableId} = ${batches.size}")
+
+        val (keys, newOutput) = if (newBuildKeys.isEmpty) {
+          (
+            broadcastContext.buildSideJoinKeys.asJava,
+            broadcastContext.buildSideStructure.asJava
+          )
+        } else {
+          (
+            newBuildKeys.asJava,
+            output.asJava
+          )
+        }
+
+        val joinKeys = keys.asScala.map {
+          key =>
+            val attr = ConverterUtils.getAttrFromExpr(key)
+            ConverterUtils.genColumnNameWithExprId(attr)
+        }.toArray
+
+        val hashJoinBuilder = HashJoinBuilder.create(runtime)
+
+        // Build the hash table
+        hashTableData = hashJoinBuilder
+          .nativeBuild(
+            broadcastContext.buildHashTableId,
+            batchArray.toArray,
+            joinKeys,
+            broadcastContext.filterBuildColumns,
+            broadcastContext.filterPropagatesNulls,
+            broadcastContext.substraitJoinType.ordinal(),
+            broadcastContext.hasMixedFiltCondition,
+            broadcastContext.isExistenceJoin,
+            SubstraitUtil.toNameStruct(newOutput).toByteArray,
+            broadcastContext.isNullAwareAntiJoin,
+            broadcastContext.bloomFilterPushdownSize,
+            buildThreads
+          )
+
+        jniWrapper.close(serializeHandle)
+

Review Comment:
   `serializeHandle` is closed only on the success path. If `nativeBuild(...)` 
throws, the serializer handle (and potentially native resources) will leak. 
Wrap the build in a try/finally to ensure `jniWrapper.close(serializeHandle)` 
always runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to