This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7dad958e4 [CORE] Reuse broadcast exchange for different build keys
with same table (#5563)
7dad958e4 is described below
commit 7dad958e4e87cde85683ae03dddafd84c6f7408a
Author: Xiduo You <[email protected]>
AuthorDate: Mon Apr 29 12:56:48 2024 +0800
[CORE] Reuse broadcast exchange for different build keys with same table
(#5563)
Vanilla Spark build HashRelation at driver side, so it is build keys
sensitive. But we broadcast byte array and build HashRelation at executor side,
the build keys are actually meaningless for the broadcast value.
This pr erases the HashedRelationBroadcastMode build keys when do
canonicalize. This change allows us reuse broadcast exchange for different
build keys with same table.
---
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 15 ++++++++-
.../gluten/execution/VeloxHashJoinSuite.scala | 39 +++++++++++++++++++++-
.../gluten/backendsapi/SparkPlanExecApi.scala | 4 +++
.../execution/ColumnarBroadcastExchangeExec.scala | 4 ++-
4 files changed, 59 insertions(+), 3 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 132b5d7dd..8318ac2d5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
-import org.apache.spark.sql.execution.joins.BuildSideRelation
+import org.apache.spark.sql.execution.joins.{BuildSideRelation,
HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.ExecUtil
import org.apache.spark.sql.expression.{UDFExpression, UDFResolver,
UserDefinedAggregateFunction}
@@ -577,6 +577,19 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
ColumnarBuildSideRelation(child.output, serialized.map(_.getSerialized))
}
+ override def doCanonicalizeForBroadcastMode(mode: BroadcastMode):
BroadcastMode = {
+ mode match {
+ case hash: HashedRelationBroadcastMode =>
+ // Node: It's different with vanilla Spark.
+ // Vanilla Spark build HashRelation at driver side, so it is build
keys sensitive.
+ // But we broadcast byte array and build HashRelation at executor side,
+ // the build keys are actually meaningless for the broadcast value.
+ // This change allows us reuse broadcast exchange for different build
keys with same table.
+ hash.copy(key = Seq.empty)
+ case _ => mode.canonicalized
+ }
+ }
+
/**
* * Expressions.
*/
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
index 9239f90e1..4c1baf856 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxHashJoinSuite.scala
@@ -19,7 +19,9 @@ package org.apache.gluten.execution
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.SparkConf
-import org.apache.spark.sql.execution.InputIteratorTransformer
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
InputIteratorTransformer}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ReusedExchangeExec}
class VeloxHashJoinSuite extends VeloxWholeStageTransformerSuite {
override protected val resourcePath: String = "/tpch-data-parquet-velox"
@@ -107,4 +109,39 @@ class VeloxHashJoinSuite extends
VeloxWholeStageTransformerSuite {
}
}
}
+
+ test("Reuse broadcast exchange for different build keys with same table") {
+ withTable("t1", "t2") {
+ spark.sql("""
+ |CREATE TABLE t1 USING PARQUET
+ |AS SELECT id as c1, id as c2 FROM range(10)
+ |""".stripMargin)
+
+ spark.sql("""
+ |CREATE TABLE t2 USING PARQUET
+ |AS SELECT id as c1, id as c2 FROM range(3)
+ |""".stripMargin)
+
+ val df = spark.sql("""
+ |SELECT * FROM t1
+ |JOIN t2 as tmp1 ON t1.c1 = tmp1.c1 and tmp1.c1 =
tmp1.c2
+ |JOIN t2 as tmp2 on t1.c2 = tmp2.c2 and tmp2.c1 =
tmp2.c2
+ |""".stripMargin)
+
+ assert(collect(df.queryExecution.executedPlan) {
+ case b: BroadcastExchangeExec => b
+ }.size == 2)
+
+ checkAnswer(
+ df,
+ Row(2, 2, 2, 2, 2, 2) :: Row(1, 1, 1, 1, 1, 1) :: Row(0, 0, 0, 0, 0,
0) :: Nil)
+
+ assert(collect(df.queryExecution.executedPlan) {
+ case b: ColumnarBroadcastExchangeExec => b
+ }.size == 1)
+ assert(collect(df.queryExecution.executedPlan) {
+ case r @ ReusedExchangeExec(_, _: ColumnarBroadcastExchangeExec) => r
+ }.size == 1)
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index b1ca34676..cfa1a4e53 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -350,6 +350,10 @@ trait SparkPlanExecApi {
numOutputRows: SQLMetric,
dataSize: SQLMetric): BuildSideRelation
+ def doCanonicalizeForBroadcastMode(mode: BroadcastMode): BroadcastMode = {
+ mode.canonicalized
+ }
+
/** Create ColumnarWriteFilesExec */
def createColumnarWriteFilesExec(
child: SparkPlan,
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 34012ad09..df1c87cb0 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -128,7 +128,9 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
override def doCanonicalize(): SparkPlan = {
- ColumnarBroadcastExchangeExec(mode.canonicalized, child.canonicalized)
+ val canonicalized =
+
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
+ ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}
override protected def doValidateInternal(): ValidationResult = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]