Copilot commented on code in PR #11993:
URL: https://github.com/apache/gluten/pull/11993#discussion_r3425204154


##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
   return JniHashTableContext::getInstance().callJavaGet(hashTableId);
 }
 
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    return HashTableSerializer::serializedSize<false>(hashTableFalse);
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* 
data, size_t size) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+  VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+    return;
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+    const uint8_t* data,
+    size_t size,
+    facebook::velox::memory::MemoryPool* memoryPool,
+    bool ignoreNullKeys,
+    bool joinHasNullKeys) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+  auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : 
defaultLeafVeloxMemoryPool();
+
+  std::unique_ptr<facebook::velox::exec::BaseHashTable> hashTable;
+  if (ignoreNullKeys) {
+    auto derived = HashTableSerializer::deserialize<true>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  } else {
+    auto derived = HashTableSerializer::deserialize<false>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  }
+
+  std::vector<std::shared_ptr<const 
facebook::velox::core::FieldAccessTypedExpr>> emptyKeys;
+  std::vector<uint32_t> emptyChannels;
+
+  auto keyTypes = hashTable->rows()->keyTypes();
+  std::vector<std::string> names;
+  for (size_t i = 0; i < keyTypes.size(); ++i) {
+    names.push_back("key" + std::to_string(i));
+  }
+  auto rowType = facebook::velox::ROW(std::move(names), std::move(keyTypes));
+
+  auto builder = std::make_shared<HashTableBuilder>(
+      facebook::velox::core::JoinType::kInner,
+      false,
+      false,
+      -1,
+      emptyKeys,
+      emptyChannels,
+      false,
+      rowType,
+      pool.get(),

Review Comment:
   After changing `pool` to a raw `MemoryPool*` (to avoid a short-lived child 
pool), the HashTableBuilder should also be constructed with that raw pointer 
(not `pool.get()`). Otherwise this won’t compile and/or would reintroduce 
lifetime issues.



##########
cpp/velox/jni/JniHashTable.cc:
##########
@@ -163,4 +164,89 @@ long getJoin(const std::string& hashTableId) {
   return JniHashTableContext::getInstance().callJavaGet(hashTableId);
 }
 
+size_t serializedHashTableSize(std::shared_ptr<HashTableBuilder> builder) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    return HashTableSerializer::serializedSize<false>(hashTableFalse);
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  return HashTableSerializer::serializedSize<true>(hashTableTrue);
+}
+
+void serializeHashTableTo(std::shared_ptr<HashTableBuilder> builder, uint8_t* 
data, size_t size) {
+  VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null");
+  VELOX_CHECK_NOT_NULL(data, "Serialized buffer cannot be null");
+
+  auto hashTable = builder->hashTable();
+  VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null");
+
+  auto* hashTableFalse = 
dynamic_cast<facebook::velox::exec::HashTable<false>*>(hashTable.get());
+  if (hashTableFalse != nullptr) {
+    HashTableSerializer::serializeTo<false>(hashTableFalse, data, size);
+    return;
+  }
+
+  auto* hashTableTrue = 
dynamic_cast<facebook::velox::exec::HashTable<true>*>(hashTable.get());
+  VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either 
HashTable<false> or HashTable<true>");
+  HashTableSerializer::serializeTo<true>(hashTableTrue, data, size);
+}
+
+std::shared_ptr<HashTableBuilder> deserializeHashTable(
+    const uint8_t* data,
+    size_t size,
+    facebook::velox::memory::MemoryPool* memoryPool,
+    bool ignoreNullKeys,
+    bool joinHasNullKeys) {
+  VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null");
+  VELOX_CHECK_GT(size, 0, "Invalid data size");
+
+  auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : 
defaultLeafVeloxMemoryPool();
+
+  std::unique_ptr<facebook::velox::exec::BaseHashTable> hashTable;
+  if (ignoreNullKeys) {
+    auto derived = HashTableSerializer::deserialize<true>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  } else {
+    auto derived = HashTableSerializer::deserialize<false>(data, size, 
pool.get());
+    hashTable = std::move(derived);
+  }

Review Comment:
   `deserializeHashTable` creates a leaf child MemoryPool and only keeps it in 
a local shared_ptr. `HashTableBuilder` stores the pool as a raw pointer 
(`pool_`), so when this function returns the child pool is destroyed and the 
builder/hash table hold a dangling pointer (use-after-free). Use the provided 
`memoryPool` directly (or the global default pool) without creating a 
short-lived child pool.



##########
ep/build-velox/src/get-velox.sh:
##########
@@ -17,9 +17,9 @@
 set -exu
 
 CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
-VELOX_REPO=https://github.com/IBM/velox.git
-VELOX_BRANCH=dft-2026_06_06
-VELOX_ENHANCED_BRANCH=ibm-2026_06_06
+VELOX_REPO=https://github.com/JkSelf/velox.git
+VELOX_BRANCH=dft-2026_06_06-serialize-hashtable
+VELOX_ENHANCED_BRANCH=ibm-2026_06_06-serialize-hashtable

Review Comment:
   The default Velox source is now fetched from a personal GitHub fork 
(JkSelf/velox). This makes builds non-reproducible and risks supply-chain 
issues for downstream users/CI. The default should point to the project’s 
maintained upstream/fork (and keep personal forks only via the existing 
--velox_repo override).



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala:
##########
@@ -886,6 +912,167 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
         offload,
         buildThreadsValue)
     }
+
+    // Check if we should build hash table on driver (Spark-native approach)
+    // Only do this for HashedRelationBroadcastMode and when offload is enabled
+    val shouldBuildOnDriver = 
VeloxConfig.get.enableDriverSideBroadcastHashTableBuild &&
+      mode.isInstanceOf[HashedRelationBroadcastMode] &&
+      offload
+
+    if (shouldBuildOnDriver) {
+      // Try to get broadcast join context from logical plan tag
+      // In multi-join scenarios, there may be multiple contexts. Find the one 
that matches
+      // the current broadcast child's output.
+      val joinContextOpt: Option[BroadcastJoinContextInfo] =
+        findLogicalLink(child).flatMap {
+          logicalPlan =>
+            logicalPlan.getTagValue(
+              BroadcastJoinContextTag.BROADCAST_JOIN_CONTEXT
+            ).flatMap {
+              contexts =>
+                val childOutputSet = AttributeSet(newOutput)
+                // Find the context whose build output matches the child's 
output
+                contexts.find {
+                  ctx =>
+                    val buildOutputMatches = 
childOutputSet.subsetOf(ctx.buildOutputSet) &&
+                      ctx.buildOutputSet.subsetOf(childOutputSet)
+                    buildOutputMatches
+                }
+            }
+        }
+
+      joinContextOpt match {
+        case Some(joinContext) =>
+          // We have join context information - build hash table on driver
+          logInfo(
+            s"Building hash table on driver in BroadcastExchangeExec " +
+              s"with join context: $joinContext")
+
+          // Create a broadcast ID for this hash table
+          val broadcastId = 
s"broadcast_exchange_${child.id}_${System.identityHashCode(mode)}"
+
+          // Convert Spark JoinType to Substrait JoinType
+          val substraitJoinType = joinContext.joinType match {
+            case _: InnerLike =>
+              JoinRel.JoinType.JOIN_TYPE_INNER
+            case FullOuter =>
+              JoinRel.JoinType.JOIN_TYPE_OUTER
+            case LeftOuter |
+                RightOuter =>
+              JoinRel.JoinType.JOIN_TYPE_LEFT

Review Comment:
   The Spark JoinType → Substrait JoinType mapping is incorrect for 
`RightOuter` (currently mapped to `JOIN_TYPE_LEFT`). This can lead to building 
the hash table with the wrong join semantics when driver-side build is enabled.



##########
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala:
##########
@@ -37,6 +38,10 @@ case class BroadcastHashTable(pointer: Long, relation: 
BuildSideRelation)
  *
  * The complicated part is due to reuse exchange, where multiple BHJ IDs 
correspond to a
  * `BuildSideRelation`.
+ *
+ * This implementation supports two modes:
+ *   1. Driver-side build (new): Hash table is built and serialized on driver, 
then broadcast 2.

Review Comment:
   Typo in the mode description: "then broadcast 2." reads like an accidental 
character and is unclear. It should say "broadcast to executors" (or similar).



##########
gluten-core/src/main/scala/org/apache/gluten/extension/GlutenJoinKeysCapture.scala:
##########
@@ -35,28 +35,103 @@ case class GlutenJoinKeysCapture() extends SparkStrategy {
 
     plan match {
 
-      case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, left, right, _) =>
-        if (leftKeys.nonEmpty) {
-          left.setTagValue(JoinKeysTag.ORIGINAL_JOIN_KEYS, leftKeys)
-        }
-        if (rightKeys.nonEmpty) {
-          right.setTagValue(JoinKeysTag.ORIGINAL_JOIN_KEYS, rightKeys)
-        }
+      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, _, 
left, right, _) =>
+        // Set broadcast join context for the build side
+        // This information will be used by BroadcastExchangeExec to build 
hash table
+        val buildRight = chooseBuildRight(joinType, left, right)
+        val buildPlan = if (buildRight) right else left

Review Comment:
   `ExtractEquiJoinKeys` includes join hint information, but this strategy 
ignores it (`... condition, _, left, right, _`) and then infers `buildRight` 
from stats. For queries using BROADCAST hints (or other join strategy hints), 
this can record the wrong build side context and prevent driver-side build from 
triggering for the intended build side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to