This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6058d36e85 [GLUTEN-7971][CH] Support using left side as the build 
table for the left anti/semi join (#7981)
6058d36e85 is described below

commit 6058d36e85563e7f17a24a930778be5157ab52ca
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Nov 20 15:05:48 2024 +0800

    [GLUTEN-7971][CH] Support using left side as the build table for the left 
anti/semi join (#7981)
    
    * [GLUTEN-7971][CH] Support using left side as the build table for the left 
anti/semi join
    
    Now Vanilla Spark does not support the right anti/semi join, but CH backend 
does. According to the runtime statistics, it can convert the A left anti/semi 
join B to B right anti/semi join A when AQE is on and the side ot A table is 
the smaller than B table.
    
    Close #7971.
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  | 11 +++
 .../execution/CHHashJoinExecTransformer.scala      | 17 +++--
 .../GlutenClickHouseTPCDSParquetAQESuite.scala     |  3 -
 ...kHouseTPCDSParquetColumnarShuffleAQESuite.scala | 79 +++++++++++++++++++++-
 ...lickHouseTPCDSParquetColumnarShuffleSuite.scala |  3 -
 .../tpcds/GlutenClickHouseTPCDSParquetSuite.scala  |  3 -
 .../tpch/GlutenClickHouseTPCHParquetAQESuite.scala | 18 +++++
 cpp-ch/local-engine/Common/CHUtil.cpp              |  4 ++
 .../org/apache/gluten/execution/JoinUtils.scala    |  3 +
 9 files changed, 124 insertions(+), 17 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 6e73ff6b29..966e953199 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -137,6 +137,10 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
     CHConf.prefixOf("delta.metadata.optimize")
   val GLUTEN_CLICKHOUSE_DELTA_METADATA_OPTIMIZE_DEFAULT_VALUE: String = "true"
 
+  val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT: String =
+    CHConf.prefixOf("convert.left.anti_semi.to.right")
+  val GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE: String 
= "false"
+
   def affinityMode: String = {
     SparkEnv.get.conf
       .get(
@@ -377,6 +381,13 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
         true
       } else {
         t match {
+          case LeftAnti | LeftSemi
+              if (SQLConf.get
+                .getConfString(
+                  GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT,
+                  
GLUTEN_CLICKHOUSE_CONVERT_LEFT_ANTI_SEMI_TO_RIGHT_DEFAULT_VALUE)
+                .toBoolean) =>
+            true
           case LeftOuter => true
           case _ => false
         }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index 43f19c30e2..6bf2248ebe 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -58,16 +58,23 @@ object JoinTypeTransform {
         } else {
           JoinRel.JoinType.JOIN_TYPE_RIGHT
         }
-      case LeftSemi | ExistenceJoin(_) =>
+      case LeftSemi =>
         if (!buildRight) {
-          throw new IllegalArgumentException("LeftSemi join should not switch 
children")
+          JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
+        } else {
+          JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
         }
-        JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
       case LeftAnti =>
         if (!buildRight) {
-          throw new IllegalArgumentException("LeftAnti join should not switch 
children")
+          JoinRel.JoinType.JOIN_TYPE_RIGHT_ANTI
+        } else {
+          JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+        }
+      case ExistenceJoin(_) =>
+        if (!buildRight) {
+          throw new IllegalArgumentException("Existence join should not switch 
children")
         }
-        JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
+        JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
       case _ =>
         // TODO: Support cross join with Cross Rel
         JoinRel.JoinType.UNRECOGNIZED
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
index 389d617f10..52a72c4494 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetAQESuite.scala
@@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetAQESuite
       .set("spark.io.compression.codec", "snappy")
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      // Currently, it can not support to read multiple partitioned file in 
one task.
-      //      .set("spark.sql.files.maxPartitionBytes", "134217728")
-      //      .set("spark.sql.files.openCostInBytes", "134217728")
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.memory.offHeap.size", "4g")
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
index 3e965c67ea..e8fea04f2c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite.scala
@@ -20,6 +20,8 @@ import org.apache.gluten.execution._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
+import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
 import org.apache.spark.sql.execution.{ColumnarSubqueryBroadcastExec, 
ReusedSubqueryExec}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper}
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
@@ -35,9 +37,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
       .set("spark.io.compression.codec", "LZ4")
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      // Currently, it can not support to read multiple partitioned file in 
one task.
-      //      .set("spark.sql.files.maxPartitionBytes", "134217728")
-      //      .set("spark.sql.files.openCostInBytes", "134217728")
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.memory.offHeap.size", "4g")
   }
@@ -265,4 +264,78 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleAQESuite
         }
       })
   }
+
+  test("GLUTEN-7971: Support using left side as the build table for the left 
anti/semi join") {
+    withSQLConf(
+      ("spark.sql.autoBroadcastJoinThreshold", "-1"),
+      ("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right", 
"true")) {
+      val sql1 =
+        s"""
+           |select
+           |  cd_gender,
+           |  cd_marital_status,
+           |  cd_education_status,
+           |  count(*) cnt1
+           | from
+           |  customer c,customer_address ca,customer_demographics
+           | where
+           |  c.c_current_addr_sk = ca.ca_address_sk and
+           |  ca_county in ('Walker County','Richland County','Gaines 
County','Douglas County')
+           |  and cd_demo_sk = c.c_current_cdemo_sk and
+           |  exists (select *
+           |          from store_sales
+           |          where c.c_customer_sk = ss_customer_sk)
+           | group by cd_gender,
+           |          cd_marital_status,
+           |          cd_education_status
+           | order by cd_gender,
+           |          cd_marital_status,
+           |          cd_education_status
+           | LIMIT 100 ;
+           |""".stripMargin
+      runQueryAndCompare(sql1)(
+        df => {
+          
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+          val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+            case h: CHShuffledHashJoinExecTransformer if h.joinType == 
LeftSemi => h
+          }
+          assertResult(1)(shuffledHashJoinExecs.size)
+          assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+        })
+
+      val sql2 =
+        s"""
+           |select
+           |  cd_gender,
+           |  cd_marital_status,
+           |  cd_education_status,
+           |  count(*) cnt1
+           | from
+           |  customer c,customer_address ca,customer_demographics
+           | where
+           |  c.c_current_addr_sk = ca.ca_address_sk and
+           |  ca_county in ('Walker County','Richland County','Gaines 
County','Douglas County')
+           |  and cd_demo_sk = c.c_current_cdemo_sk and
+           |  not exists (select *
+           |          from store_sales
+           |          where c.c_customer_sk = ss_customer_sk)
+           | group by cd_gender,
+           |          cd_marital_status,
+           |          cd_education_status
+           | order by cd_gender,
+           |          cd_marital_status,
+           |          cd_education_status
+           | LIMIT 100 ;
+           |""".stripMargin
+      runQueryAndCompare(sql2)(
+        df => {
+          
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+          val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+            case h: CHShuffledHashJoinExecTransformer if h.joinType == 
LeftAnti => h
+          }
+          assertResult(1)(shuffledHashJoinExecs.size)
+          assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+        })
+    }
+  }
 }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
index 4675de249c..24d8e9db60 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetColumnarShuffleSuite.scala
@@ -32,9 +32,6 @@ class GlutenClickHouseTPCDSParquetColumnarShuffleSuite 
extends GlutenClickHouseT
       .set("spark.io.compression.codec", "LZ4")
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      // Currently, it can not support to read multiple partitioned file in 
one task.
-      //      .set("spark.sql.files.maxPartitionBytes", "134217728")
-      //      .set("spark.sql.files.openCostInBytes", "134217728")
       .set("spark.memory.offHeap.size", "4g")
     // .set("spark.sql.planChangeLog.level", "error")
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index 08f4522d9c..aa7c6b0f56 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -35,9 +35,6 @@ class GlutenClickHouseTPCDSParquetSuite extends 
GlutenClickHouseTPCDSAbstractSui
       .set("spark.io.compression.codec", "snappy")
       .set("spark.sql.shuffle.partitions", "5")
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      // Currently, it can not support to read multiple partitioned file in 
one task.
-      //      .set("spark.sql.files.maxPartitionBytes", "134217728")
-      //      .set("spark.sql.files.openCostInBytes", "134217728")
       .set("spark.memory.offHeap.size", "4g")
       .set("spark.gluten.sql.validation.logLevel", "ERROR")
       .set("spark.gluten.sql.validation.printStackOnFailure", "true")
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
index af72ba84a6..1c627140b6 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetAQESuite.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.execution._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.optimizer.BuildLeft
+import org.apache.spark.sql.catalyst.plans.LeftSemi
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, SubqueryExec}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper}
 
@@ -213,6 +214,23 @@ class GlutenClickHouseTPCHParquetAQESuite
     runTPCHQuery(21) { df => }
   }
 
+  test(
+    "TPCH Q21 with GLUTEN-7971: Support using left side as the build table for 
the left anti/semi join") {
+    withSQLConf(
+      ("spark.sql.autoBroadcastJoinThreshold", "-1"),
+      ("spark.gluten.sql.columnar.backend.ch.convert.left.anti_semi.to.right", 
"true")) {
+      runTPCHQuery(21, compareResult = false) {
+        df =>
+          
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
+          val shuffledHashJoinExecs = collect(df.queryExecution.executedPlan) {
+            case h: CHShuffledHashJoinExecTransformer if h.joinType == 
LeftSemi => h
+          }
+          assertResult(1)(shuffledHashJoinExecs.size)
+          assertResult(BuildLeft)(shuffledHashJoinExecs(0).buildSide)
+      }
+    }
+  }
+
   test("TPCH Q22") {
     runTPCHQuery(22) {
       df =>
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 6e907266ff..03df93c851 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -1081,8 +1081,12 @@ 
JoinUtil::getJoinKindAndStrictness(substrait::JoinRel_JoinType join_type, bool i
                 return {DB::JoinKind::Left, DB::JoinStrictness::Any};
             return {DB::JoinKind::Left, DB::JoinStrictness::Semi};
         }
+        case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_SEMI:
+            return {DB::JoinKind::Right, DB::JoinStrictness::Semi};
         case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT_ANTI:
             return {DB::JoinKind::Left, DB::JoinStrictness::Anti};
+        case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT_ANTI:
+            return {DB::JoinKind::Right, DB::JoinStrictness::Anti};
         case substrait::JoinRel_JoinType_JOIN_TYPE_LEFT:
             return {DB::JoinKind::Left, DB::JoinStrictness::All};
         case substrait::JoinRel_JoinType_JOIN_TYPE_RIGHT:
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index dae5d51af8..303c9e818f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -265,6 +265,9 @@ object JoinUtils {
         case _: ExistenceJoin =>
           inputBuildOutput.indices.map(ExpressionBuilder.makeSelection(_)) :+
             ExpressionBuilder.makeSelection(buildOutput.size)
+        case LeftSemi | LeftAnti =>
+          // When the left semi/anti join support the BuildLeft
+          leftOutput.indices.map(idx => ExpressionBuilder.makeSelection(idx + 
streamedOutput.size))
         case LeftExistence(_) =>
           leftOutput.indices.map(ExpressionBuilder.makeSelection(_))
         case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to