Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3419935177


##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.{broadcast, SparkContext}

Review Comment:
   This import/type usage is not valid Scala for referencing the 
org.apache.spark.broadcast package (it will not compile as written). Use the 
proper Broadcast type import (e.g., org.apache.spark.broadcast.Broadcast) and 
declare broadcasted: Broadcast[BuildSideRelation], or import the package with 
`import org.apache.spark.broadcast` (without braces) if you want to reference 
broadcast.Broadcast.



##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
   return JniHashTableContext::getInstance().callJavaGet(hashTableId);
 }
 
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    return HashTableSerializer::serializedSize<false>(hashTableFalse);
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* 
data, size_t size) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+  VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+    return;
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+    const uint8_t* data,
+    size_t size,
+    facebook::velox::memory::MemoryPool* memoryPool,
+    bool ignoreNullKeys,
+    bool joinHasNullKeys) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+  auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : 
defaultLeafVeloxMemoryPool();
+
+  std::unique_ptr<facebook::velox::exec::BaseHashTable> hashTable;
+  if (ignoreNullKeys) {
+    auto derived = HashTableSerializer::deserialize<true>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  } else {
+    auto derived = HashTableSerializer::deserialize<false>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  }
+
+  std::vector<std::shared_ptr<const 
facebook::velox::core::FieldAccessTypedExpr>> emptyKeys;
+  std::vector<uint32_t> emptyChannels;
+
+  auto keyTypes = hashTable->rows()->keyTypes();
+  std::vector<std::string> names;
+  for (size_t i = 0; i < keyTypes.size(); ++i) {
+    names.push_back("key" + std::to_string(i));
+  }
+  auto rowType = facebook::velox::ROW(std::move(names), std::move(keyTypes));
+
+  auto builder = std::make_shared<HashTableBuilder>(
+      facebook::velox::core::JoinType::kInner,
+      false,
+      false,
+      -1,
+      emptyKeys,
+      emptyChannels,
+      false,
+      rowType,
+      pool.get(),

Review Comment:
   The deserialization path creates a local `pool` (shared_ptr) and passes 
`pool.get()` into HashTableBuilder, but the shared_ptr is destroyed when 
deserializeHashTable() returns. If HashTableBuilder stores the raw MemoryPool* 
(typical), this leaves a dangling pointer and can cause UAF in any later 
allocation/free. Ensure the memory pool has the same lifetime as the returned 
builder (e.g., store the shared_ptr alongside the builder in a holder saved to 
ObjectStore, pass an externally-owned pool from Runtime, or refactor 
HashTableBuilder construction to retain shared ownership).



##########
backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.HashJoinBuilder
+
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.unsafe.UnsafeByteArray
+
+import java.io.{Externalizable, ObjectInput, ObjectOutput}
+
+/**
+ * Serialized broadcast hash table that can be efficiently broadcast to 
executors. This is built on
+ * the driver and contains the serialized hash table data.
+ */
+class SerializedBroadcastHashTable(
+    var serializedData: UnsafeByteArray,
+    var numRows: Long,
+    var ignoreNullKeys: Boolean,
+    var joinHasNullKeys: Boolean,
+    var bloomFilterBlocksByteSize: Long,
+    var hashProbeDynamicFiltersProduced: Long,
+    var buildSideRelation: BuildSideRelation)
+  extends Externalizable {
+
+  def this() = this(null, 0, false, false, 0, 0, null) // Required for 
Externalizable
+
+  override def writeExternal(out: ObjectOutput): Unit = {
+    out.writeLong(numRows)
+    out.writeBoolean(ignoreNullKeys)
+    out.writeBoolean(joinHasNullKeys)
+    out.writeLong(bloomFilterBlocksByteSize)
+    out.writeLong(hashProbeDynamicFiltersProduced)
+    serializedData.writeExternal(out)
+    out.writeObject(buildSideRelation)
+  }
+
+  override def readExternal(in: ObjectInput): Unit = {
+    numRows = in.readLong()
+    ignoreNullKeys = in.readBoolean()
+    joinHasNullKeys = in.readBoolean()
+    bloomFilterBlocksByteSize = in.readLong()
+    hashProbeDynamicFiltersProduced = in.readLong()
+    val data = new UnsafeByteArray()
+    data.readExternal(in)
+    serializedData = data
+    buildSideRelation = in.readObject().asInstanceOf[BuildSideRelation]
+  }
+
+  /**
+   * Deserialize the hash table on executor side. The serialized Velox hash 
table is already in a
+   * prepared, probe-ready form, so executor side only needs deserialization 
without re-running
+   * prepareJoinTable.
+   *
+   * @return
+   *   Hash table builder handle
+   */
+  def deserialize(): Long = {
+    HashJoinBuilder.deserializeHashTableDirect(
+      serializedData.address(),
+      Math.toIntExact(serializedData.size()),
+      ignoreNullKeys,
+      joinHasNullKeys)
+  }
+
+  /** Get the size of serialized data in bytes. */
+  def sizeInBytes: Long = serializedData.size()
+}
+
+object SerializedBroadcastHashTable {
+  def apply(
+      serializedData: UnsafeByteArray,
+      numRows: Long,
+      ignoreNullKeys: Boolean,
+      joinHasNullKeys: Boolean,
+      bloomFilterBlocksByteSize: Long,
+      hashProbeDynamicFiltersProduced: Long,
+      buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable =
+    new SerializedBroadcastHashTable(
+      serializedData,
+      numRows,
+      ignoreNullKeys,
+      joinHasNullKeys,
+      bloomFilterBlocksByteSize,
+      hashProbeDynamicFiltersProduced,
+      buildSideRelation)
+
+  /**
+   * Build and serialize a hash table on the driver.
+   *
+   * @param hashTableHandle
+   *   Handle to the built hash table
+   * @param buildSideRelation
+   *   The build side relation for metadata
+   * @return
+   *   Serialized broadcast hash table
+   */
+  def fromHashTable(
+      hashTableHandle: Long,
+      buildSideRelation: BuildSideRelation,
+      numRows: Long): SerializedBroadcastHashTable = {
+    try {
+      val serializedData = HashJoinBuilder
+        .serializeHashTableDirect(hashTableHandle)
+        .toUnsafeByteArray()
+      val ignoreNullKeys = HashJoinBuilder
+        .getHashTableIgnoreNullKeys(hashTableHandle)
+      val joinHasNullKeys = HashJoinBuilder
+        .getHashTableJoinHasNullKeys(hashTableHandle)
+
+      val bloomFilterBlocksByteSize = HashJoinBuilder
+        .getHashTableBloomFilterBlocksByteSize(hashTableHandle)
+      val hashProbeDynamicFiltersProduced = if (bloomFilterBlocksByteSize > 0) 
1L else 0L

Review Comment:
   Setting `hashProbeDynamicFiltersProduced` to `1` whenever bloom filter data 
exists is not a reliable measure of how many dynamic filters were produced (and 
conflates 'bloom filter exists' with 'filters produced'). This will make the 
reported metrics inaccurate. Prefer retrieving the exact count from native (add 
a JNI getter) or leave it unset/0 and continue accumulating the probe-side 
metrics even when using driver-side serialized hash tables.



##########
docs/velox-configuration.md:
##########
@@ -26,7 +26,8 @@ nav_order: 16
 | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource                  
    | ⚓ Static      | async             | GPU RMM memory resource.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                    
                 |
 | spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes         
    | 🔄 Dynamic    | 1028MB            | Maximum bytes to prefetch in CPU 
memory during GPU shuffle read while waiting for GPU available.                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                         
                 |
 | spark.gluten.sql.columnar.backend.velox.directorySizeGuess                   
    | ⚓ Static      | 32KB              | Deprecated, rename to 
spark.gluten.sql.columnar.backend.velox.footerEstimatedSize                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                 |
-| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | 🔄 Dynamic    | true              | Enable validation fallback for 
TimestampNTZ type. When true (default), any plan containing TimestampNTZ will 
fall back to Spark execution. Set to false during development/testing of 
TimestampNTZ support to allow native execution.                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                    
                 |
+| spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild    
    | 🔄 Dynamic    | true              | Enable driver-side broadcast hash 
table build. When enabled, the hash table is built and serialized on the 
driver, then broadcast to executors. When disabled, each executor builds its 
own hash table from the broadcast data.                                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                  
                 |
+| spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation         
    | 🔄 Dynamic    | false             | Enable validation fallback for 
TimestampNTZ type. When true, any plan containing TimestampNTZ will fall back 
to Spark execution. When false, allows native execution for TimestampNTZ scan.  
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                             
                 |

Review Comment:
   The documented default for 
`spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation` is 
changed to `false` here, but the code in VeloxConfig still defines the default 
as `true` (per this PR’s diffs). Please align the doc default with the actual 
config default (or change the code default if that was intended), since this 
PR’s main scope is BHJ optimization and this mismatch will mislead users.



##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
   return JniHashTableContext::getInstance().callJavaGet(hashTableId);
 }
 
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    return HashTableSerializer::serializedSize<false>(hashTableFalse);
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* 
data, size_t size) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+  VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+    return;
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+    const uint8_t* data,
+    size_t size,
+    facebook::velox::memory::MemoryPool* memoryPool,
+    bool ignoreNullKeys,
+    bool joinHasNullKeys) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+  auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : 
defaultLeafVeloxMemoryPool();

Review Comment:
   The deserialization path creates a local `pool` (shared_ptr) and passes 
`pool.get()` into HashTableBuilder, but the shared_ptr is destroyed when 
deserializeHashTable() returns. If HashTableBuilder stores the raw MemoryPool* 
(typical), this leaves a dangling pointer and can cause UAF in any later 
allocation/free. Ensure the memory pool has the same lifetime as the returned 
builder (e.g., store the shared_ptr alongside the builder in a holder saved to 
ObjectStore, pass an externally-owned pool from Runtime, or refactor 
HashTableBuilder construction to retain shared ownership).



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.{broadcast, SparkContext}
+import org.apache.spark.sql.execution.SerializedHashTableBroadcastRelation
+import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * RDD for handling serialized broadcast hash tables built on the driver. This 
RDD deserializes the
+ * hash table on each executor.
+ */
+case class VeloxSerializedBroadcastRDD(
+    @transient private val sc: SparkContext,
+    broadcasted: broadcast.Broadcast[BuildSideRelation],

Review Comment:
   This import/type usage is not valid Scala for referencing the 
org.apache.spark.broadcast package (it will not compile as written). Use the 
proper Broadcast type import (e.g., org.apache.spark.broadcast.Broadcast) and 
declare broadcasted: Broadcast[BuildSideRelation], or import the package with 
`import org.apache.spark.broadcast` (without braces) if you want to reference 
broadcast.Broadcast.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala:
##########
@@ -177,9 +178,73 @@ case class BroadcastHashJoinExecTransformer(
         buildBroadcastTableId,
         isNullAwareAntiJoin,
         bloomFilterPushdownSize,
-        metrics.get("buildHashTableTime")
+        metrics.get("buildHashTableTime"),
+        metrics.get("serializeHashTableTime"),
+        metrics.get("deserializeHashTableTime"),
+        metrics.get("serializedHashTableSize")

Review Comment:
   The join context requests metrics keys like `serializeHashTableTime` (and 
possibly `serializedHashTableSize`) but the Velox hash-join metrics list in 
this PR only adds `deserializeHashTableTime`. If these metrics are intended to 
be reported at the join operator level, add them to the corresponding metrics 
generator; otherwise remove/rename these keys to avoid permanently-None metrics 
wiring and confusion.



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable

Review Comment:
   Switching the build script to a personal fork (`JkSelf/velox.git`) reduces 
build reproducibility and can introduce supply-chain risk for downstream 
users/CI. Prefer referencing an official upstream/org fork (or an internal 
mirror) and/or make the fork/branch configurable via environment variables 
while keeping the default pointed at the project’s standard Velox source.



##########
cpp/velox/operators/hashjoin/HashTableBuilder.cc:
##########
@@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t 
numDistinct) const {
 
 void HashTableBuilder::abandonHashBuildDedup() {
   abandonHashBuildDedup_ = true;
-  uniqueTable_->setAllowDuplicates(true);
+  // uniqueTable_->setAllowDuplicates(true);

Review Comment:
   Commenting out `uniqueTable_->setAllowDuplicates(true)` changes behavior 
when abandoning dedup: the table may continue enforcing uniqueness or assume 
dedup invariants, which can lead to dropped build rows or incorrect join 
results. If dedup is abandoned, the builder should explicitly switch the 
underlying table to allow duplicates (or provide an equivalent safe fallback 
path) instead of silently skipping this step.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to