This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3c88d1ead3 [VL] Fix broadcast hash table reuse for reused exchanges
(#12264)
3c88d1ead3 is described below
commit 3c88d1ead34ff8fab9b144d14ed104c7c8c84831
Author: Wechar Yu <[email protected]>
AuthorDate: Mon Jun 22 16:11:26 2026 +0800
[VL] Fix broadcast hash table reuse for reused exchanges (#12264)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 4 -
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 9 ++-
.../gluten/execution/HashJoinExecTransformer.scala | 9 ++-
.../execution/VeloxBroadcastBuildSideCache.scala | 15 ++--
.../sql/execution/ColumnarBuildSideRelation.scala | 21 ++---
.../unsafe/UnsafeColumnarBuildSideRelation.scala | 21 ++---
.../gluten/execution/VeloxHashJoinSuite.scala | 93 +++++++++++++++++++++-
.../gluten/backendsapi/BackendSettingsApi.scala | 2 -
.../execution/ColumnarBroadcastExchangeExec.scala | 4 +-
9 files changed, 143 insertions(+), 35 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index aa3b94ebb2..01cae32c0c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -491,10 +491,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
(conf.isUseGlutenShuffleManager ||
conf.shuffleManagerSupportsColumnarShuffle)
}
- override def enableHashTableBuildOncePerExecutor(): Boolean = {
- VeloxConfig.get.enableBroadcastBuildOncePerExecutor
- }
-
override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
t =>
if (super.supportHashBuildJoinTypeOnLeft(t)) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index be21337d99..2448e04e0c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -890,8 +890,13 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def doCanonicalizeForBroadcastMode(mode: BroadcastMode):
BroadcastMode = {
mode match {
- case hash: HashedRelationBroadcastMode =>
- // Node: It's different with vanilla Spark.
+ case hash: HashedRelationBroadcastMode
+ // TODO: Build keys are sensitive when
`enableBroadcastBuildOncePerExecutor` is enabled.
+ // For expression join keys, the build HashRelation needs
pre-projection, in which case
+ // the reuse of broadcast exchange may cause incorrect results.
+ // Remove this limitation after supporting join-key pre-projection
plan rewriting.
+ if !VeloxConfig.get.enableBroadcastBuildOncePerExecutor =>
+ // Note: It's different with vanilla Spark.
// Vanilla Spark build HashRelation at driver side, so it is build
keys sensitive.
// But we broadcast byte array and build HashRelation at executor side,
// the build keys are actually meaningless for the broadcast value.
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
index 1554c4ddd3..64a61598b4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala
@@ -197,4 +197,11 @@ case class BroadcastHashJoinContext(
buildHashTableId: String,
isNullAwareAntiJoin: Boolean = false,
bloomFilterPushdownSize: Long,
- buildHashTableTimeMetric: Option[SQLMetric] = None)
+ buildHashTableTimeMetric: Option[SQLMetric] = None) {
+ def droppedDuplicates: Boolean = {
+ !hasMixedFiltCondition && (
+ substraitJoinType == JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI ||
+ substraitJoinType == JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+ )
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
index 535fd8900e..f394c7314a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala
@@ -30,7 +30,10 @@ import com.github.benmanes.caffeine.cache.{Cache, Caffeine,
RemovalCause, Remova
import java.util.concurrent.TimeUnit
-case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation)
+case class BroadcastHashTable(
+ pointer: Long,
+ relation: BuildSideRelation,
+ droppedDuplicates: Boolean)
/**
* `VeloxBroadcastBuildSideCache` is used for controlling to build bhj hash
table once.
@@ -62,15 +65,15 @@ object VeloxBroadcastBuildSideCache
buildSideRelationCache
.get(
broadcastContext.buildHashTableId,
- (broadcast_id: String) => {
- val (pointer, relation) = broadcast.value match {
+ (_: String) => {
+ val (pointer, relation, droppedDuplicates) = broadcast.value match {
case columnar: ColumnarBuildSideRelation =>
columnar.buildHashTable(broadcastContext)
case unsafe: UnsafeColumnarBuildSideRelation =>
unsafe.buildHashTable(broadcastContext)
}
- BroadcastHashTable(pointer, relation)
+ BroadcastHashTable(pointer, relation, droppedDuplicates)
}
)
}
@@ -97,9 +100,9 @@ object VeloxBroadcastBuildSideCache
if (value.relation != null) {
value.relation match {
case columnar: ColumnarBuildSideRelation =>
- columnar.reset()
+ columnar.reset(value.droppedDuplicates)
case unsafe: UnsafeColumnarBuildSideRelation =>
- unsafe.reset()
+ unsafe.reset(value.droppedDuplicates)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
index fea9f14974..7042f34e93 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala
@@ -42,7 +42,7 @@ import org.apache.arrow.c.ArrowSchema
import scala.collection.JavaConverters._
import scala.collection.JavaConverters.asScalaIteratorConverter
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
object ColumnarBuildSideRelation {
// Keep constructor with BroadcastMode for compatibility
@@ -153,12 +153,15 @@ case class ColumnarBuildSideRelation(
override def asReadOnlyCopy(): ColumnarBuildSideRelation = this
- private var hashTableData: Long = 0L
+ // The relation can be reused as different hash tables by whether dropping
duplicates or not.
+ private val hashTableData = Map.empty[Boolean, Long]
def buildHashTable(
- broadcastContext: BroadcastHashJoinContext): (Long,
ColumnarBuildSideRelation) =
+ broadcastContext: BroadcastHashJoinContext)
+ : (Long, ColumnarBuildSideRelation, Boolean) =
synchronized {
- if (hashTableData == 0) {
+ val droppedDuplicates = broadcastContext.droppedDuplicates
+ if (!hashTableData.contains(droppedDuplicates)) {
val startTime = System.nanoTime()
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
@@ -210,7 +213,7 @@ case class ColumnarBuildSideRelation(
val hashJoinBuilder = HashJoinBuilder.create(runtime)
// Build the hash table
- hashTableData = hashJoinBuilder
+ hashTableData(droppedDuplicates) = hashJoinBuilder
.nativeBuild(
broadcastContext.buildHashTableId,
batchArray.toArray,
@@ -232,14 +235,14 @@ case class ColumnarBuildSideRelation(
val elapsedTime = System.nanoTime() - startTime
broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime /
1000000)
- (hashTableData, this)
+ (hashTableData(droppedDuplicates), this, droppedDuplicates)
} else {
- (HashJoinBuilder.cloneHashTable(hashTableData), null)
+ (HashJoinBuilder.cloneHashTable(hashTableData(droppedDuplicates)),
null, droppedDuplicates)
}
}
- def reset(): Unit = synchronized {
- hashTableData = 0
+ def reset(droppedDuplicates: Boolean): Unit = synchronized {
+ hashTableData.remove(droppedDuplicates)
}
/**
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
index fbc329f360..fa3dcc6934 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala
@@ -48,7 +48,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import scala.collection.JavaConverters._
import scala.collection.JavaConverters.asScalaIteratorConverter
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
object UnsafeColumnarBuildSideRelation {
def apply(
@@ -123,11 +123,14 @@ class UnsafeColumnarBuildSideRelation(
batches
}
- private var hashTableData: Long = 0L
+ // The relation can be reused as different hash tables by whether dropping
duplicates or not.
+ private val hashTableData = Map.empty[Boolean, Long]
- def buildHashTable(broadcastContext: BroadcastHashJoinContext): (Long,
BuildSideRelation) =
+ def buildHashTable(broadcastContext: BroadcastHashJoinContext)
+ : (Long, BuildSideRelation, Boolean) =
synchronized {
- if (hashTableData == 0) {
+ val droppedDuplicates = broadcastContext.droppedDuplicates
+ if (!hashTableData.contains(droppedDuplicates)) {
val startTime = System.nanoTime()
val runtime = Runtimes.contextInstance(
BackendsApiManager.getBackendName,
@@ -180,7 +183,7 @@ class UnsafeColumnarBuildSideRelation(
val hashJoinBuilder = HashJoinBuilder.create(runtime)
// Build the hash table
- hashTableData = hashJoinBuilder
+ hashTableData(droppedDuplicates) = hashJoinBuilder
.nativeBuild(
broadcastContext.buildHashTableId,
batchArray.toArray,
@@ -202,14 +205,14 @@ class UnsafeColumnarBuildSideRelation(
val elapsedTime = System.nanoTime() - startTime
broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime /
1000000)
- (hashTableData, this)
+ (hashTableData(droppedDuplicates), this, droppedDuplicates)
} else {
- (HashJoinBuilder.cloneHashTable(hashTableData), null)
+ (HashJoinBuilder.cloneHashTable(hashTableData(droppedDuplicates)),
null, droppedDuplicates)
}
}
- def reset(): Unit = synchronized {
- hashTableData = 0
+ def reset(droppedDuplicates: Boolean): Unit = synchronized {
+ hashTableData.remove(droppedDuplicates)
}
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException
{
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 9c3f4f6e4e..a83e0ebf0e 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -22,7 +22,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec,
InputIteratorTransformer}
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarSubqueryBroadcastExec, InputIteratorTransformer}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/tpch-data-parquet"
@@ -354,4 +356,93 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
}
}
}
+
+ test("Reuse broadcast exchange with different hash table") {
+ withSQLConf(
+ ("spark.sql.adaptive.enabled", "false")
+ ) {
+ withTable("t1", "t2") {
+ spark
+ .range(100)
+ .selectExpr("id as key", "id as value")
+ .write
+ .saveAsTable("t1")
+
+ spark
+ .range(100)
+ .selectExpr("id % 7 as key", "id as value")
+ .write
+ .saveAsTable("t2")
+
+ val query = """
+ SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+ FROM t1
+ LEFT SEMI JOIN t2 ON t1.key = t2.key
+ UNION ALL
+ SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+ from t1
+ JOIN t2 on t1.key = t2.key
+ """
+
+ runQueryAndCompare(query) {
+ df =>
+ // Check that columnar broadcast exchange is reused.
+ val plan = df.queryExecution.executedPlan
+ assert(collect(plan) { case b: ColumnarBroadcastExchangeExec => b
}.size == 1)
+ assert(collect(plan) {
+ case r @ ReusedExchangeExec(_, _: ColumnarBroadcastExchangeExec)
=> r
+ }.size == 1)
+ }
+ }
+ }
+ }
+
+ test("Do not reuse broadcast exchange for different null aware flag") {
+ Seq("true", "false").foreach {
+ enableBroadcastBuildOncePerExecutor =>
+ withSQLConf(
+ ("spark.sql.adaptive.enabled", "false"),
+ (
+ VeloxConfig.VELOX_BROADCAST_BUILD_HASHTABLE_ONCE_PER_EXECUTOR.key,
+ enableBroadcastBuildOncePerExecutor)
+ ) {
+ withTable("t1", "t2") {
+ spark
+ .range(100)
+ .selectExpr("id as key", "id as value")
+ .write
+ .saveAsTable("t1")
+
+ spark
+ .range(100)
+ .selectExpr("id % 7 as key", "id as value")
+ .write
+ .saveAsTable("t2")
+
+ val query = """
+ SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+ FROM t1
+ WHERE key not in (SELECT key FROM t2)
+ UNION ALL
+ SELECT /*+ BROADCAST(t2) */ t1.key, t1.value
+ from t1
+ JOIN t2 on t1.key = t2.key
+ """
+
+ runQueryAndCompare(query) {
+ df =>
+ val plan = df.queryExecution.executedPlan
+ // Columnar broadcast exchange is not reused because the
+ // HashedRelationBroadcastMode's isNullAware flag is different.
+ assert(collect(plan) { case b: ColumnarBroadcastExchangeExec
=> b }.size == 2)
+ val modes = collect(plan) {
+ case ColumnarBroadcastExchangeExec(mode:
HashedRelationBroadcastMode, _) => mode
+ }
+ assert(modes.size == 2)
+ assert(modes.exists(_.isNullAware) &&
modes.exists(!_.isNullAware))
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index ddd901447a..75c94be4d4 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -81,8 +81,6 @@ trait BackendSettingsApi {
def enableJoinKeysRewrite(): Boolean = true
- def enableHashTableBuildOncePerExecutor(): Boolean = true
-
def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
case _: InnerLike | RightOuter | FullOuter => true
case _ => false
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 4b2890d51d..2ead6bf07d 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -132,7 +132,9 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
override def rowType(): Convention.RowType = Convention.RowType.None
override def doCanonicalize(): SparkPlan = {
- ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
+ val canonicalized =
+
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
+ ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}
override def doPrepare(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]