This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7dad958e4 [CORE] Reuse broadcast exchange for different build keys 
with same table (#5563)
7dad958e4 is described below

commit 7dad958e4e87cde85683ae03dddafd84c6f7408a
Author: Xiduo You <[email protected]>
AuthorDate: Mon Apr 29 12:56:48 2024 +0800

    [CORE] Reuse broadcast exchange for different build keys with same table 
(#5563)
    
    Vanilla Spark build HashRelation at driver side, so it is build keys 
sensitive. But we broadcast byte array and build HashRelation at executor side, 
the build keys are actually meaningless for the broadcast value.
    
    This pr erases the HashedRelationBroadcastMode build keys when do 
canonicalize. This change allows us reuse broadcast exchange for different 
build keys with same table.
---
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  | 15 ++++++++-
 .../gluten/execution/VeloxHashJoinSuite.scala      | 39 +++++++++++++++++++++-
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  4 +++
 .../execution/ColumnarBroadcastExchangeExec.scala  |  4 ++-
 4 files changed, 59 insertions(+), 3 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 132b5d7dd..8318ac2d5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
-import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.utils.ExecUtil
 import org.apache.spark.sql.expression.{UDFExpression, UDFResolver, 
UserDefinedAggregateFunction}
@@ -577,6 +577,19 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
     ColumnarBuildSideRelation(child.output, serialized.map(_.getSerialized))
   }
 
+  override def doCanonicalizeForBroadcastMode(mode: BroadcastMode): 
BroadcastMode = {
+    mode match {
+      case hash: HashedRelationBroadcastMode =>
+        // Node: It's different with vanilla Spark.
+        // Vanilla Spark build HashRelation at driver side, so it is build 
keys sensitive.
+        // But we broadcast byte array and build HashRelation at executor side,
+        // the build keys are actually meaningless for the broadcast value.
+        // This change allows us reuse broadcast exchange for different build 
keys with same table.
+        hash.copy(key = Seq.empty)
+      case _ => mode.canonicalized
+    }
+  }
+
   /**
    * * Expressions.
    */
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 9239f90e1..4c1baf856 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -19,7 +19,9 @@ package org.apache.gluten.execution
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.InputIteratorTransformer
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
InputIteratorTransformer}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec}
 
 class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
   override protected val resourcePath: String = "/tpch-data-parquet-velox"
@@ -107,4 +109,39 @@ class VeloxHashJoinSuite extends 
VeloxWholeStageTransformerSuite {
       }
     }
   }
+
+  test("Reuse broadcast exchange for different build keys with same table") {
+    withTable("t1", "t2") {
+      spark.sql("""
+                  |CREATE TABLE t1 USING PARQUET
+                  |AS SELECT id as c1, id as c2 FROM range(10)
+                  |""".stripMargin)
+
+      spark.sql("""
+                  |CREATE TABLE t2 USING PARQUET
+                  |AS SELECT id as c1, id as c2 FROM range(3)
+                  |""".stripMargin)
+
+      val df = spark.sql("""
+                           |SELECT * FROM t1
+                           |JOIN t2 as tmp1 ON t1.c1 = tmp1.c1 and tmp1.c1 = 
tmp1.c2
+                           |JOIN t2 as tmp2 on t1.c2 = tmp2.c2 and tmp2.c1 = 
tmp2.c2
+                           |""".stripMargin)
+
+      assert(collect(df.queryExecution.executedPlan) {
+        case b: BroadcastExchangeExec => b
+      }.size == 2)
+
+      checkAnswer(
+        df,
+        Row(2, 2, 2, 2, 2, 2) :: Row(1, 1, 1, 1, 1, 1) :: Row(0, 0, 0, 0, 0, 
0) :: Nil)
+
+      assert(collect(df.queryExecution.executedPlan) {
+        case b: ColumnarBroadcastExchangeExec => b
+      }.size == 1)
+      assert(collect(df.queryExecution.executedPlan) {
+        case r @ ReusedExchangeExec(_, _: ColumnarBroadcastExchangeExec) => r
+      }.size == 1)
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index b1ca34676..cfa1a4e53 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -350,6 +350,10 @@ trait SparkPlanExecApi {
       numOutputRows: SQLMetric,
       dataSize: SQLMetric): BuildSideRelation
 
+  def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
+    mode.canonicalized
+  }
+
   /** Create ColumnarWriteFilesExec */
   def createColumnarWriteFilesExec(
       child: SparkPlan,
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 34012ad09..df1c87cb0 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -128,7 +128,9 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 
   override def doCanonicalize(): SparkPlan = {
-    ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
+    val canonicalized =
+      
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
+    ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
   }
 
   override protected def doValidateInternal(): ValidationResult = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to