This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6058d36e85 [GLUTEN-7971][CH] Support using left side as the build
table for the left anti/semi join (#7981)
6058d36e85 is described below
commit 6058d36e85563e7f17a24a930778be5157ab52ca
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Nov 20 15:05:48 2024 +0800
[GLUTEN-7971][CH] Support using left side as the build table for the left
anti/semi join (#7981)
* [GLUTEN-7971][CH] Support using left side as the build table for the left
anti/semi join
Now Vanilla Spark does not support the right anti/semi join, but CH backend
does. According to the runtime statistics, it can convert the A left anti/semi
join B to B right anti/semi join A when AQE is on and the side ot A table is
the smaller than B table.
Close #7971.
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 11 +++
.../execution/CHHashJoinExecTransformer.scala | 17 +++--
.../GlutenClickHouseTPCDSParquetAQESuite.scala | 3 -
...kHouseTPCDSParquetColumnarShuffleAQESuite.scala | 79 +++++++++++++++++++++-
...lickHouseTPCDSParquetColumnarShuffleSuite.scala | 3 -
.../tpcds/GlutenClickHouseTPCDSParquetSuite.scala | 3 -
.../tpch/GlutenClickHouseTPCHParquetAQESuite.scala | 18 +++++
cpp-ch/local-engine/Common/CHUtil.cpp | 4 ++
.../org/apache/gluten/execution/JoinUtils.scala | 3 +
9 files changed, 124 insertions(+), 17 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 6e73ff6b29..966e953199 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -137,6 +137,10 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
CHConf.prefixOf("delta.metadata.optimize")
val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true"
+ val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT: String =
+ CHConf.prefixOf("convert.left.anti_semi.to.right")
+ val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE: String
= "false"
+
def affinityMode: String = {
SparkEnv.get.conf
.get(
@@ -377,6 +381,13 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
true
} else {
t match {
+ case LeftAnti | LeftSemi
+ if (SQLConf.get
+ .getConfString(
+ GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT,
+
GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE)
+ .toBoolean) =>
+ true
case LeftOuter => true
case _ => false
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index 43f19c30e2..6bf2248ebe 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -58,16 +58,23 @@ object JoinTypeTransform {
} else {
JoinRel.JoinType.JOIN_TYPE_RIGHT
}
- case LeftSemi | ExistenceJoin(_) =>
+ case LeftSemi =>
if (!buildRight) {
- throw new IllegalArgumentException("LeftSemi join should not switch
children")
+ JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
+ } else {
+ JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
}
- JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
case LeftAnti =>
if (!buildRight) {
- throw new IllegalArgumentException("LeftAnti join should not switch
children")
+ JoinRel.JoinType.JOIN_TYPE_RIGHT_ANTI
+ } else {
+ JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+ }
+ case ExistenceJoin(_) =>
+ if (!buildRight) {
+ throw new IllegalArgumentException("Existence join should not switch
children")
}
- JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+ JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
case _ =>
// TODO: Support cross join with Cross Rel
JoinRel.JoinType.UNRECOGNIZED
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
index 389d617f10..52a72c4494 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
@@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetAQESuite
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- // Currently, it can not support to read multiple partitioned file in
one task.
- // .set("spark.sql.files.maxPartitionBytes", "134217728")
- // .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.memory.offHeap.size", "4g")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
index 3e965c67ea..e8fea04f2c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
@@ -20,6 +20,8 @@ import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
+import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec,
ReusedSubqueryExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -35,9 +37,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- // Currently, it can not support to read multiple partitioned file in
one task.
- // .set("spark.sql.files.maxPartitionBytes", "134217728")
- // .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.memory.offHeap.size", "4g")
}
@@ -265,4 +264,78 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
}
})
}
+
+ test("GLUTEN-7971: Support using left side as the build table for the left
anti/semi join") {
+ withSQLConf(
+ ("spark.sql.autoBroadcastJoinThreshold", "-1"),
+ ("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right",
"true")) {
+ val sql1 =
+ s"""
+ |select
+ | cd_gender,
+ | cd_marital_status,
+ | cd_education_status,
+ | count(*) cnt1
+ | from
+ | customer c,customer_address ca,customer_demographics
+ | where
+ | c.c_current_addr_sk = ca.ca_address_sk and
+ | ca_county in ('Walker County','Richland County','Gaines
County','Douglas County')
+ | and cd_demo_sk = c.c_current_cdemo_sk and
+ | exists (select *
+ | from store_sales
+ | where c.c_customer_sk = ss_customer_sk)
+ | group by cd_gender,
+ | cd_marital_status,
+ | cd_education_status
+ | order by cd_gender,
+ | cd_marital_status,
+ | cd_education_status
+ | LIMIT 100 ;
+ |""".stripMargin
+ runQueryAndCompare(sql1)(
+ df => {
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+ case h: CHShuffledHashJoinExecTransformer if h.joinType ==
LeftSemi => h
+ }
+ assertResult(1)(shuffledHashJoinExecs.size)
+ assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+ })
+
+ val sql2 =
+ s"""
+ |select
+ | cd_gender,
+ | cd_marital_status,
+ | cd_education_status,
+ | count(*) cnt1
+ | from
+ | customer c,customer_address ca,customer_demographics
+ | where
+ | c.c_current_addr_sk = ca.ca_address_sk and
+ | ca_county in ('Walker County','Richland County','Gaines
County','Douglas County')
+ | and cd_demo_sk = c.c_current_cdemo_sk and
+ | not exists (select *
+ | from store_sales
+ | where c.c_customer_sk = ss_customer_sk)
+ | group by cd_gender,
+ | cd_marital_status,
+ | cd_education_status
+ | order by cd_gender,
+ | cd_marital_status,
+ | cd_education_status
+ | LIMIT 100 ;
+ |""".stripMargin
+ runQueryAndCompare(sql2)(
+ df => {
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+ case h: CHShuffledHashJoinExecTransformer if h.joinType ==
LeftAnti => h
+ }
+ assertResult(1)(shuffledHashJoinExecs.size)
+ assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+ })
+ }
+ }
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
index 4675de249c..24d8e9db60 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
@@ -32,9 +32,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite
extends GlutenClickHouseT
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- // Currently, it can not support to read multiple partitioned file in
one task.
- // .set("spark.sql.files.maxPartitionBytes", "134217728")
- // .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.memory.offHeap.size", "4g")
// .set("spark.sql.planChangeLog.level", "error")
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index 08f4522d9c..aa7c6b0f56 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetSuite extends
GlutenClickHouseTPCDSAbstractSui
.set("spark.io.compression.codec", "snappy")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- // Currently, it can not support to read multiple partitioned file in
one task.
- // .set("spark.sql.files.maxPartitionBytes", "134217728")
- // .set("spark.sql.files.openCostInBytes", "134217728")
.set("spark.memory.offHeap.size", "4g")
.set("spark.gluten.sql.validation.logLevel", "ERROR")
.set("spark.gluten.sql.validation.printStackOnFailure", "true")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
index af72ba84a6..1c627140b6 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.execution._
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper}
@@ -213,6 +214,23 @@ class GlutenClickHouseTPCHParquetAQESuite
runTPCHQuery(21) { df => }
}
+ test(
+ "TPCH Q21 with GLUTEN-7971: Support using left side as the build table for
the left anti/semi join") {
+ withSQLConf(
+ ("spark.sql.autoBroadcastJoinThreshold", "-1"),
+ ("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right",
"true")) {
+ runTPCHQuery(21, compareResult = false) {
+ df =>
+
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+ val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+ case h: CHShuffledHashJoinExecTransformer if h.joinType ==
LeftSemi => h
+ }
+ assertResult(1)(shuffledHashJoinExecs.size)
+ assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+ }
+ }
+ }
+
test("TPCH Q22") {
runTPCHQuery(22) {
df =>
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 6e907266ff..03df93c851 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -1081,8 +1081,12 @@
JoinUtil::getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool i
return {DB::JoinKind::Left, DB::JoinStrictness::Any};
return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
}
+ case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
+ return {DB::JoinKind::Right, DB::JoinStrictness::Semi};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_ANTI:
return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
+ case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_ANTI:
+ return {DB::JoinKind::Right, DB::JoinStrictness::Anti};
case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
return {DB::JoinKind::Left, DB::JoinStrictness::All};
case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index dae5d51af8..303c9e818f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -265,6 +265,9 @@ object JoinUtils {
case _: ExistenceJoin =>
inputBuildOutput.indices.map(ExpressionBuilder.makeSelection(_)) :+
ExpressionBuilder.makeSelection(buildOutput.size)
+ case LeftSemi | LeftAnti =>
+ // When the left semi/anti join support the BuildLeft
+ leftOutput.indices.map(idx => ExpressionBuilder.makeSelection(idx +
streamedOutput.size))
case LeftExistence(_) =>
leftOutput.indices.map(ExpressionBuilder.makeSelection(_))
case _ =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]