This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c88d1ead3 [VL] Fix broadcast hash table reuse for reused exchanges 
(#12264)
3c88d1ead3 is described below

commit 3c88d1ead34ff8fab9b144d14ed104c7c8c84831
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Jun 22 16:11:26 2026 +0800

    [VL] Fix broadcast hash table reuse for reused exchanges (#12264)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    |  4 -
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  9 ++-
 .../gluten/execution/HashJoinExecTransformer.scala |  9 ++-
 .../execution/VeloxBroadcastBuildSideCache.scala   | 15 ++--
 .../sql/execution/ColumnarBuildSideRelation.scala  | 21 ++---
 .../unsafe/UnsafeColumnarBuildSideRelation.scala   | 21 ++---
 .../gluten/execution/VeloxHashJoinSuite.scala      | 93 +++++++++++++++++++++-
 .../gluten/backendsapi/BackendSettingsApi.scala    |  2 -
 .../execution/ColumnarBroadcastExchangeExec.scala  |  4 +-
 9 files changed, 143 insertions(+), 35 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index aa3b94ebb2..01cae32c0c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -491,10 +491,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
     (conf.isUseGlutenShuffleManager || 
conf.shuffleManagerSupportsColumnarShuffle)
   }
 
-  override def enableHashTableBuildOncePerExecutor(): Boolean = {
-    VeloxConfig.get.enableBroadcastBuildOncePerExecutor
-  }
-
   override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
     t =>
       if (super.supportHashBuildJoinTypeOnLeft(t)) {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index be21337d99..2448e04e0c 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -890,8 +890,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
 
   override def doCanonicalizeForBroadcastMode(mode: BroadcastMode): 
BroadcastMode = {
     mode match {
-      case hash: HashedRelationBroadcastMode =>
-        // Node: It's different with vanilla Spark.
+      case hash: HashedRelationBroadcastMode
+          // TODO: Build keys are sensitive when 
`enableBroadcastBuildOncePerExecutor` is enabled.
+          // For expression join keys, the build HashRelation needs 
pre-projection, in which case
+          // the reuse of broadcast exchange may cause incorrect results.
+          // Remove this limitation after supporting join-key pre-projection 
plan rewriting.
+          if !VeloxConfig.get.enableBroadcastBuildOncePerExecutor =>
+        // Note: It's different with vanilla Spark.
         // Vanilla Spark build HashRelation at driver side, so it is build 
keys sensitive.
         // But we broadcast byte array and build HashRelation at executor side,
         // the build keys are actually meaningless for the broadcast value.
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
index 1554c4ddd3..64a61598b4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
@@ -197,4 +197,11 @@ case class BroadcastHashJoinContext(
     buildHashTableId: String,
     isNullAwareAntiJoin: Boolean = false,
     bloomFilterPushdownSize: Long,
-    buildHashTableTimeMetric: Option[SQLMetric] = None)
+    buildHashTableTimeMetric: Option[SQLMetric] = None) {
+  def droppedDuplicates: Boolean = {
+    !hasMixedFiltCondition && (
+      substraitJoinType == JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI ||
+        substraitJoinType == JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+    )
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
index 535fd8900e..f394c7314a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
@@ -30,7 +30,10 @@ import com.github.benmanes.caffeine.cache.{Cache, Caffeine, 
RemovalCause, Remova
 
 import java.util.concurrent.TimeUnit
 
-case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation)
+case class BroadcastHashTable(
+    pointer: Long,
+    relation: BuildSideRelation,
+    droppedDuplicates: Boolean)
 
 /**
  * `VeloxBroadcastBuildSideCache` is used for controlling to build bhj hash 
table once.
@@ -62,15 +65,15 @@ object VeloxBroadcastBuildSideCache
     buildSideRelationCache
       .get(
         broadcastContext.buildHashTableId,
-        (broadcast_id: String) => {
-          val (pointer, relation) = broadcast.value match {
+        (_: String) => {
+          val (pointer, relation, droppedDuplicates) = broadcast.value match {
             case columnar: ColumnarBuildSideRelation =>
               columnar.buildHashTable(broadcastContext)
             case unsafe: UnsafeColumnarBuildSideRelation =>
               unsafe.buildHashTable(broadcastContext)
           }
 
-          BroadcastHashTable(pointer, relation)
+          BroadcastHashTable(pointer, relation, droppedDuplicates)
         }
       )
   }
@@ -97,9 +100,9 @@ object VeloxBroadcastBuildSideCache
       if (value.relation != null) {
         value.relation match {
           case columnar: ColumnarBuildSideRelation =>
-            columnar.reset()
+            columnar.reset(value.droppedDuplicates)
           case unsafe: UnsafeColumnarBuildSideRelation =>
-            unsafe.reset()
+            unsafe.reset(value.droppedDuplicates)
         }
       }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index fea9f14974..7042f34e93 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -42,7 +42,7 @@ import org.apache.arrow.c.ArrowSchema
 
 import scala.collection.JavaConverters._
 import scala.collection.JavaConverters.asScalaIteratorConverter
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
 
 object ColumnarBuildSideRelation {
   // Keep constructor with BroadcastMode for compatibility
@@ -153,12 +153,15 @@ case class ColumnarBuildSideRelation(
 
   override def asReadOnlyCopy(): ColumnarBuildSideRelation = this
 
-  private var hashTableData: Long = 0L
+  // The relation can be reused as different hash tables by whether dropping 
duplicates or not.
+  private val hashTableData = Map.empty[Boolean, Long]
 
   def buildHashTable(
-      broadcastContext: BroadcastHashJoinContext): (Long, 
ColumnarBuildSideRelation) =
+      broadcastContext: BroadcastHashJoinContext)
+      : (Long, ColumnarBuildSideRelation, Boolean) =
     synchronized {
-      if (hashTableData == 0) {
+      val droppedDuplicates = broadcastContext.droppedDuplicates
+      if (!hashTableData.contains(droppedDuplicates)) {
         val startTime = System.nanoTime()
         val runtime = Runtimes.contextInstance(
           BackendsApiManager.getBackendName,
@@ -210,7 +213,7 @@ case class ColumnarBuildSideRelation(
         val hashJoinBuilder = HashJoinBuilder.create(runtime)
 
         // Build the hash table
-        hashTableData = hashJoinBuilder
+        hashTableData(droppedDuplicates) = hashJoinBuilder
           .nativeBuild(
             broadcastContext.buildHashTableId,
             batchArray.toArray,
@@ -232,14 +235,14 @@ case class ColumnarBuildSideRelation(
         val elapsedTime = System.nanoTime() - startTime
         broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 
1000000)
 
-        (hashTableData, this)
+        (hashTableData(droppedDuplicates), this, droppedDuplicates)
       } else {
-        (HashJoinBuilder.cloneHashTable(hashTableData), null)
+        (HashJoinBuilder.cloneHashTable(hashTableData(droppedDuplicates)), 
null, droppedDuplicates)
       }
     }
 
-  def reset(): Unit = synchronized {
-    hashTableData = 0
+  def reset(droppedDuplicates: Boolean): Unit = synchronized {
+    hashTableData.remove(droppedDuplicates)
   }
 
   /**
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
index fbc329f360..fa3dcc6934 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
@@ -48,7 +48,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
 import scala.collection.JavaConverters._
 import scala.collection.JavaConverters.asScalaIteratorConverter
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
 
 object UnsafeColumnarBuildSideRelation {
   def apply(
@@ -123,11 +123,14 @@ class UnsafeColumnarBuildSideRelation(
     batches
   }
 
-  private var hashTableData: Long = 0L
+  // The relation can be reused as different hash tables by whether dropping 
duplicates or not.
+  private val hashTableData = Map.empty[Boolean, Long]
 
-  def buildHashTable(broadcastContext: BroadcastHashJoinContext): (Long, 
BuildSideRelation) =
+  def buildHashTable(broadcastContext: BroadcastHashJoinContext)
+      : (Long, BuildSideRelation, Boolean) =
     synchronized {
-      if (hashTableData == 0) {
+      val droppedDuplicates = broadcastContext.droppedDuplicates
+      if (!hashTableData.contains(droppedDuplicates)) {
         val startTime = System.nanoTime()
         val runtime = Runtimes.contextInstance(
           BackendsApiManager.getBackendName,
@@ -180,7 +183,7 @@ class UnsafeColumnarBuildSideRelation(
         val hashJoinBuilder = HashJoinBuilder.create(runtime)
 
         // Build the hash table
-        hashTableData = hashJoinBuilder
+        hashTableData(droppedDuplicates) = hashJoinBuilder
           .nativeBuild(
             broadcastContext.buildHashTableId,
             batchArray.toArray,
@@ -202,14 +205,14 @@ class UnsafeColumnarBuildSideRelation(
         val elapsedTime = System.nanoTime() - startTime
         broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 
1000000)
 
-        (hashTableData, this)
+        (hashTableData(droppedDuplicates), this, droppedDuplicates)
       } else {
-        (HashJoinBuilder.cloneHashTable(hashTableData), null)
+        (HashJoinBuilder.cloneHashTable(hashTableData(droppedDuplicates)), 
null, droppedDuplicates)
       }
     }
 
-  def reset(): Unit = synchronized {
-    hashTableData = 0
+  def reset(droppedDuplicates: Boolean): Unit = synchronized {
+    hashTableData.remove(droppedDuplicates)
   }
 
   override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException 
{
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 9c3f4f6e4e..a83e0ebf0e 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -22,7 +22,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, 
InputIteratorTransformer}
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
 
 class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
   override protected val resourcePath: String = "/tpch-data-parquet"
@@ -354,4 +356,93 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
       }
     }
   }
+
+  test("Reuse broadcast exchange with different hash table") {
+    withSQLConf(
+      ("spark.sql.adaptive.enabled", "false")
+    ) {
+      withTable("t1", "t2") {
+        spark
+          .range(100)
+          .selectExpr("id as key", "id as value")
+          .write
+          .saveAsTable("t1")
+
+        spark
+          .range(100)
+          .selectExpr("id % 7 as key", "id as value")
+          .write
+          .saveAsTable("t2")
+
+        val query = """
+          SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+          FROM t1
+          LEFT SEMI JOIN t2 ON t1.key = t2.key
+          UNION ALL
+          SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+          from t1
+          JOIN t2 on t1.key = t2.key
+        """
+
+        runQueryAndCompare(query) {
+          df =>
+            // Check that columnar broadcast exchange is reused.
+            val plan = df.queryExecution.executedPlan
+            assert(collect(plan) { case b: ColumnarBroadcastExchangeExec => b 
}.size == 1)
+            assert(collect(plan) {
+              case r @ ReusedExchangeExec(_, _: ColumnarBroadcastExchangeExec) 
=> r
+            }.size == 1)
+        }
+      }
+    }
+  }
+
+  test("Do not reuse broadcast exchange for different null aware flag") {
+    Seq("true", "false").foreach {
+      enableBroadcastBuildOncePerExecutor =>
+        withSQLConf(
+          ("spark.sql.adaptive.enabled", "false"),
+          (
+            VeloxConfig.VELOX_BROADCAST_BUILD_HASHTABLE_ONCE_PER_EXECUTOR.key,
+            enableBroadcastBuildOncePerExecutor)
+        ) {
+          withTable("t1", "t2") {
+            spark
+              .range(100)
+              .selectExpr("id as key", "id as value")
+              .write
+              .saveAsTable("t1")
+
+            spark
+              .range(100)
+              .selectExpr("id % 7 as key", "id as value")
+              .write
+              .saveAsTable("t2")
+
+            val query = """
+              SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+              FROM t1
+              WHERE key not in (SELECT key FROM t2)
+              UNION ALL
+              SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+              from t1
+              JOIN t2 on t1.key = t2.key
+            """
+
+            runQueryAndCompare(query) {
+              df =>
+                val plan = df.queryExecution.executedPlan
+                // Columnar broadcast exchange is not reused because the
+                // HashedRelationBroadcastMode's isNullAware flag is different.
+                assert(collect(plan) { case b: ColumnarBroadcastExchangeExec 
=> b }.size == 2)
+                val modes = collect(plan) {
+                  case ColumnarBroadcastExchangeExec(mode: 
HashedRelationBroadcastMode, _) => mode
+                }
+                assert(modes.size == 2)
+                assert(modes.exists(_.isNullAware) && 
modes.exists(!_.isNullAware))
+            }
+          }
+        }
+    }
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index ddd901447a..75c94be4d4 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -81,8 +81,6 @@ trait BackendSettingsApi {
 
   def enableJoinKeysRewrite(): Boolean = true
 
-  def enableHashTableBuildOncePerExecutor(): Boolean = true
-
   def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
     case _: InnerLike | RightOuter | FullOuter => true
     case _ => false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 4b2890d51d..2ead6bf07d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -132,7 +132,9 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
   override def rowType(): Convention.RowType = Convention.RowType.None
 
   override def doCanonicalize(): SparkPlan = {
-    ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
+    val canonicalized =
+      
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
+    ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
   }
 
   override def doPrepare(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to