This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 318df2415 [GLUTEN-6463][CH] Enable cartesion product (#6510)
318df2415 is described below

commit 318df2415c69c6c912e25ce425b0e789e7393f37
Author: lgbo <[email protected]>
AuthorDate: Fri Jul 19 18:28:41 2024 +0800

    [GLUTEN-6463][CH] Enable cartesion product (#6510)
    
    [CH] Enable cartesion product
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  2 +
 .../clickhouse/CHSparkPlanExecApi.scala            | 11 +++-
 .../CHBroadcastNestedLoopJoinExecTransformer.scala |  3 +
 .../GlutenClickHouseTPCDSAbstractSuite.scala       | 10 ----
 ...nClickHouseTPCDSParquetSortMergeJoinSuite.scala |  2 +
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 12 ++++
 cpp-ch/local-engine/Parser/CrossRelParser.cpp      | 70 ++++++++++++++--------
 cpp-ch/local-engine/Parser/JoinRelParser.cpp       |  4 +-
 8 files changed, 76 insertions(+), 38 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 07129e69a..320483beb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -290,4 +290,6 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
 
   override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
 
+  override def supportCartesianProductExec(): Boolean = true
+
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 920c61cd4..32f372956 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -362,8 +362,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       left: SparkPlan,
       right: SparkPlan,
       condition: Option[Expression]): CartesianProductExecTransformer =
-    throw new GlutenNotSupportException(
-      "CartesianProductExecTransformer is not supported in ch backend.")
+    if (!condition.isEmpty) {
+      throw new GlutenNotSupportException(
+        "CartesianProductExecTransformer with condition is not supported in ch 
backend.")
+    } else {
+      CartesianProductExecTransformer(
+        ColumnarCartesianProductBridge(left),
+        ColumnarCartesianProductBridge(right),
+        condition)
+    }
 
   override def genBroadcastNestedLoopJoinExecTransformer(
       left: SparkPlan,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
index 9c0f41361..d1dc76045 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
@@ -83,6 +83,9 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
     // for ch
     val joinParametersStr = new StringBuffer("JoinParameters:")
     joinParametersStr
+      .append("isBHJ=")
+      .append(1)
+      .append("\n")
       .append("buildHashTableId=")
       .append(buildBroadcastTableId)
       .append("\n")
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
index ccde594df..f0712bf5a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
@@ -65,16 +65,6 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
               // Q45 BroadcastHashJoin, ExistenceJoin
               // Q94 BroadcastHashJoin, LeftSemi, NOT condition
               (false, false)
-            case j if j == 38 || j == 87 =>
-              // Q38 and Q87 : Hash shuffle is not supported for expression in 
some case
-              if (isAqe) {
-                (true, true)
-              } else {
-                (false, true)
-              }
-            case q77 if q77 == 77 && !isAqe =>
-              // Q77 CartesianProduct
-              (false, false)
             case other => (true, false)
           }
           sqlNums.map((_, noFallBack._1, noFallBack._2))
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
index 54734b72d..3f7816cb8 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
@@ -29,10 +29,12 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite 
extends GlutenClickHouseTPC
     "q14b",
     "q23a",
     "q23b",
+    "q38",
     "q51",
     "q69",
     "q70",
     "q78",
+    "q87",
     "q95",
     "q97"
   ) ++ super.excludedTpcdsQueries
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 36dd3778c..6d5c9c463 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2769,5 +2769,17 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
 
     spark.sql("drop table tb_date")
   }
+
+  test("test CartesianProductExec") {
+    withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
+      val sql = """
+                  |select t1.n_regionkey, t2.n_regionkey from
+                  |(select n_regionkey from nation) t1
+                  |cross join
+                  |(select n_regionkey from nation) t2
+                  |""".stripMargin
+      compareResultsAgainstVanillaSpark(sql, true, { _ => })
+    }
+  }
 }
 // scalastyle:on line.size.limit
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp 
b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
index 4405a57cb..9d6252f66 100644
--- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
@@ -52,7 +52,6 @@ using namespace DB;
 
 namespace local_engine
 {
-
 std::shared_ptr<DB::TableJoin> 
createCrossTableJoin(substrait::CrossRel_JoinType join_type)
 {
     auto & global_context = SerializedPlanParser::global_context;
@@ -148,16 +147,19 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     
optimization_info.ParseFromString(join.advanced_extension().optimization().value());
     auto join_opt_info = 
JoinOptimizationInfo::parse(optimization_info.value());
     const auto & storage_join_key = join_opt_info.storage_join_key;
-    auto storage_join = BroadCastJoinBuilder::getJoin(storage_join_key) ;
-    renamePlanColumns(*left, *right, *storage_join);
+    auto storage_join = join_opt_info.is_broadcast ? 
BroadCastJoinBuilder::getJoin(storage_join_key) : nullptr;
+    if (storage_join)
+        renamePlanColumns(*left, *right, *storage_join);
     auto table_join = createCrossTableJoin(join.type());
     DB::Block right_header_before_convert_step = 
right->getCurrentDataStream().header;
     addConvertStep(*table_join, *left, *right);
 
     // Add a check to find error easily.
-    if(!blocksHaveEqualStructure(right_header_before_convert_step, 
right->getCurrentDataStream().header))
+    if (!blocksHaveEqualStructure(right_header_before_convert_step, 
right->getCurrentDataStream().header))
     {
-        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "For broadcast 
join, we must not change the columns name in the right table.\nleft 
header:{},\nright header: {} -> {}",
+        throw DB::Exception(
+            DB::ErrorCodes::LOGICAL_ERROR,
+            "For broadcast join, we must not change the columns name in the 
right table.\nleft header:{},\nright header: {} -> {}",
             left->getCurrentDataStream().header.dumpNames(),
             right_header_before_convert_step.dumpNames(),
             right->getCurrentDataStream().header.dumpNames());
@@ -173,28 +175,48 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const 
substrait::CrossRel & join, DB:
     auto right_header = right->getCurrentDataStream().header;
 
     QueryPlanPtr query_plan;
-    table_join->addDisjunct();
-    auto broadcast_hash_join = storage_join->getJoinLocked(table_join, 
context);
-    // table_join->resetKeys();
-    QueryPlanStepPtr join_step = 
std::make_unique<FilledJoinStep>(left->getCurrentDataStream(), 
broadcast_hash_join, 8192);
-
-    join_step->setStepDescription("STORAGE_JOIN");
-    steps.emplace_back(join_step.get());
-    left->addStep(std::move(join_step));
-    query_plan = std::move(left);
-    /// hold right plan for profile
-    extra_plan_holder.emplace_back(std::move(right));
-
-    addPostFilter(*query_plan, join);
-    Names cols;
-    for (auto after_join_name : after_join_names)
+    if (storage_join)
     {
-        if (BlockUtil::VIRTUAL_ROW_COUNT_COLUMN == after_join_name)
-            continue;
+        /// FIXME: There is mistake in 
HashJoin::needUsedFlagsForPerRightTableRow which returns true when
+        /// join clauses is empty. But in fact there should not be any join 
clause in cross join.
+        table_join->addDisjunct();
+
+        auto broadcast_hash_join = storage_join->getJoinLocked(table_join, 
context);
+        // table_join->resetKeys();
+        QueryPlanStepPtr join_step = 
std::make_unique<FilledJoinStep>(left->getCurrentDataStream(), 
broadcast_hash_join, 8192);
+
+        join_step->setStepDescription("STORAGE_JOIN");
+        steps.emplace_back(join_step.get());
+        left->addStep(std::move(join_step));
+        query_plan = std::move(left);
+        /// hold right plan for profile
+        extra_plan_holder.emplace_back(std::move(right));
+
+        addPostFilter(*query_plan, join);
+        Names cols;
+        for (auto after_join_name : after_join_names)
+        {
+            if (BlockUtil::VIRTUAL_ROW_COUNT_COLUMN == after_join_name)
+                continue;
 
-        cols.emplace_back(after_join_name);
+            cols.emplace_back(after_join_name);
+        }
+        JoinUtil::reorderJoinOutput(*query_plan, cols);
+    }
+    else
+    {
+        JoinPtr hash_join = std::make_shared<HashJoin>(table_join, 
right->getCurrentDataStream().header.cloneEmpty());
+        QueryPlanStepPtr join_step = 
std::make_unique<DB::JoinStep>(left->getCurrentDataStream(), 
right->getCurrentDataStream(), hash_join, 8192, 1, false);
+        join_step->setStepDescription("CROSS_JOIN");
+        steps.emplace_back(join_step.get());
+        std::vector<QueryPlanPtr> plans;
+        plans.emplace_back(std::move(left));
+        plans.emplace_back(std::move(right));
+
+        query_plan = std::make_unique<QueryPlan>();
+        query_plan->unitePlans(std::move(join_step), {std::move(plans)});
+        JoinUtil::reorderJoinOutput(*query_plan, after_join_names);
     }
-    JoinUtil::reorderJoinOutput(*query_plan, cols);
 
     return query_plan;
 }
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp 
b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
index 1141f4796..09a515217 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
@@ -261,8 +261,6 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const 
substrait::JoinRel & join, DB::Q
 
     QueryPlanPtr query_plan;
 
-    /// Support only one join clause.
-    table_join->addDisjunct();
     /// some examples to explain when the post_join_filter is not empty
     /// - on t1.key = t2.key and t1.v1 > 1 and t2.v1 > 1, 't1.v1> 1' is in the 
 post filter. but 't2.v1 > 1'
     ///   will be pushed down into right table by spark and is not in the post 
filter. 't1.key = t2.key ' is
@@ -430,6 +428,8 @@ void JoinRelParser::collectJoinKeys(
 {
     if (!join_rel.has_expression())
         return;
+    /// Support only one join clause.
+    table_join.addDisjunct();
     const auto & expr = join_rel.expression();
     auto & join_clause = table_join.getClauses().back();
     std::list<const const substrait::Expression *> expressions_stack;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to