This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 318df2415 [GLUTEN-6463][CH] Enable cartesion product (#6510)
318df2415 is described below
commit 318df2415c69c6c912e25ce425b0e789e7393f37
Author: lgbo <[email protected]>
AuthorDate: Fri Jul 19 18:28:41 2024 +0800
[GLUTEN-6463][CH] Enable cartesion product (#6510)
[CH] Enable cartesion product
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 2 +
.../clickhouse/CHSparkPlanExecApi.scala | 11 +++-
.../CHBroadcastNestedLoopJoinExecTransformer.scala | 3 +
.../GlutenClickHouseTPCDSAbstractSuite.scala | 10 ----
...nClickHouseTPCDSParquetSortMergeJoinSuite.scala | 2 +
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 12 ++++
cpp-ch/local-engine/Parser/CrossRelParser.cpp | 70 ++++++++++++++--------
cpp-ch/local-engine/Parser/JoinRelParser.cpp | 4 +-
8 files changed, 76 insertions(+), 38 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 07129e69a..320483beb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -290,4 +290,6 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
+ override def supportCartesianProductExec(): Boolean = true
+
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 920c61cd4..32f372956 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -362,8 +362,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
left: SparkPlan,
right: SparkPlan,
condition: Option[Expression]): CartesianProductExecTransformer =
- throw new GlutenNotSupportException(
- "CartesianProductExecTransformer is not supported in ch backend.")
+ if (!condition.isEmpty) {
+ throw new GlutenNotSupportException(
+ "CartesianProductExecTransformer with condition is not supported in ch
backend.")
+ } else {
+ CartesianProductExecTransformer(
+ ColumnarCartesianProductBridge(left),
+ ColumnarCartesianProductBridge(right),
+ condition)
+ }
override def genBroadcastNestedLoopJoinExecTransformer(
left: SparkPlan,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
index 9c0f41361..d1dc76045 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
@@ -83,6 +83,9 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
// for ch
val joinParametersStr = new StringBuffer("JoinParameters:")
joinParametersStr
+ .append("isBHJ=")
+ .append(1)
+ .append("\n")
.append("buildHashTableId=")
.append(buildBroadcastTableId)
.append("\n")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
index ccde594df..f0712bf5a 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSAbstractSuite.scala
@@ -65,16 +65,6 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
// Q45 BroadcastHashJoin, ExistenceJoin
// Q94 BroadcastHashJoin, LeftSemi, NOT condition
(false, false)
- case j if j == 38 || j == 87 =>
- // Q38 and Q87 : Hash shuffle is not supported for expression in
some case
- if (isAqe) {
- (true, true)
- } else {
- (false, true)
- }
- case q77 if q77 == 77 && !isAqe =>
- // Q77 CartesianProduct
- (false, false)
case other => (true, false)
}
sqlNums.map((_, noFallBack._1, noFallBack._2))
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
index 54734b72d..3f7816cb8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
@@ -29,10 +29,12 @@ class GlutenClickHouseTPCDSParquetSortMergeJoinSuite
extends GlutenClickHouseTPC
"q14b",
"q23a",
"q23b",
+ "q38",
"q51",
"q69",
"q70",
"q78",
+ "q87",
"q95",
"q97"
) ++ super.excludedTpcdsQueries
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 36dd3778c..6d5c9c463 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2769,5 +2769,17 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
spark.sql("drop table tb_date")
}
+
+ test("test CartesianProductExec") {
+ withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "-1")) {
+ val sql = """
+ |select t1.n_regionkey, t2.n_regionkey from
+ |(select n_regionkey from nation) t1
+ |cross join
+ |(select n_regionkey from nation) t2
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, { _ => })
+ }
+ }
}
// scalastyle:on line.size.limit
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
index 4405a57cb..9d6252f66 100644
--- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
@@ -52,7 +52,6 @@ using namespace DB;
namespace local_engine
{
-
std::shared_ptr<DB::TableJoin>
createCrossTableJoin(substrait::CrossRel_JoinType join_type)
{
auto & global_context = SerializedPlanParser::global_context;
@@ -148,16 +147,19 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const
substrait::CrossRel & join, DB:
optimization_info.ParseFromString(join.advanced_extension().optimization().value());
auto join_opt_info =
JoinOptimizationInfo::parse(optimization_info.value());
const auto & storage_join_key = join_opt_info.storage_join_key;
- auto storage_join = BroadCastJoinBuilder::getJoin(storage_join_key) ;
- renamePlanColumns(*left, *right, *storage_join);
+ auto storage_join = join_opt_info.is_broadcast ?
BroadCastJoinBuilder::getJoin(storage_join_key) : nullptr;
+ if (storage_join)
+ renamePlanColumns(*left, *right, *storage_join);
auto table_join = createCrossTableJoin(join.type());
DB::Block right_header_before_convert_step =
right->getCurrentDataStream().header;
addConvertStep(*table_join, *left, *right);
// Add a check to find error easily.
- if(!blocksHaveEqualStructure(right_header_before_convert_step,
right->getCurrentDataStream().header))
+ if (!blocksHaveEqualStructure(right_header_before_convert_step,
right->getCurrentDataStream().header))
{
- throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "For broadcast
join, we must not change the columns name in the right table.\nleft
header:{},\nright header: {} -> {}",
+ throw DB::Exception(
+ DB::ErrorCodes::LOGICAL_ERROR,
+ "For broadcast join, we must not change the columns name in the
right table.\nleft header:{},\nright header: {} -> {}",
left->getCurrentDataStream().header.dumpNames(),
right_header_before_convert_step.dumpNames(),
right->getCurrentDataStream().header.dumpNames());
@@ -173,28 +175,48 @@ DB::QueryPlanPtr CrossRelParser::parseJoin(const
substrait::CrossRel & join, DB:
auto right_header = right->getCurrentDataStream().header;
QueryPlanPtr query_plan;
- table_join->addDisjunct();
- auto broadcast_hash_join = storage_join->getJoinLocked(table_join,
context);
- // table_join->resetKeys();
- QueryPlanStepPtr join_step =
std::make_unique<FilledJoinStep>(left->getCurrentDataStream(),
broadcast_hash_join, 8192);
-
- join_step->setStepDescription("STORAGE_JOIN");
- steps.emplace_back(join_step.get());
- left->addStep(std::move(join_step));
- query_plan = std::move(left);
- /// hold right plan for profile
- extra_plan_holder.emplace_back(std::move(right));
-
- addPostFilter(*query_plan, join);
- Names cols;
- for (auto after_join_name : after_join_names)
+ if (storage_join)
{
- if (BlockUtil::VIRTUAL_ROW_COUNT_COLUMN == after_join_name)
- continue;
+ /// FIXME: There is mistake in
HashJoin::needUsedFlagsForPerRightTableRow which returns true when
+ /// join clauses is empty. But in fact there should not be any join
clause in cross join.
+ table_join->addDisjunct();
+
+ auto broadcast_hash_join = storage_join->getJoinLocked(table_join,
context);
+ // table_join->resetKeys();
+ QueryPlanStepPtr join_step =
std::make_unique<FilledJoinStep>(left->getCurrentDataStream(),
broadcast_hash_join, 8192);
+
+ join_step->setStepDescription("STORAGE_JOIN");
+ steps.emplace_back(join_step.get());
+ left->addStep(std::move(join_step));
+ query_plan = std::move(left);
+ /// hold right plan for profile
+ extra_plan_holder.emplace_back(std::move(right));
+
+ addPostFilter(*query_plan, join);
+ Names cols;
+ for (auto after_join_name : after_join_names)
+ {
+ if (BlockUtil::VIRTUAL_ROW_COUNT_COLUMN == after_join_name)
+ continue;
- cols.emplace_back(after_join_name);
+ cols.emplace_back(after_join_name);
+ }
+ JoinUtil::reorderJoinOutput(*query_plan, cols);
+ }
+ else
+ {
+ JoinPtr hash_join = std::make_shared<HashJoin>(table_join,
right->getCurrentDataStream().header.cloneEmpty());
+ QueryPlanStepPtr join_step =
std::make_unique<DB::JoinStep>(left->getCurrentDataStream(),
right->getCurrentDataStream(), hash_join, 8192, 1, false);
+ join_step->setStepDescription("CROSS_JOIN");
+ steps.emplace_back(join_step.get());
+ std::vector<QueryPlanPtr> plans;
+ plans.emplace_back(std::move(left));
+ plans.emplace_back(std::move(right));
+
+ query_plan = std::make_unique<QueryPlan>();
+ query_plan->unitePlans(std::move(join_step), {std::move(plans)});
+ JoinUtil::reorderJoinOutput(*query_plan, after_join_names);
}
- JoinUtil::reorderJoinOutput(*query_plan, cols);
return query_plan;
}
diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp
b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
index 1141f4796..09a515217 100644
--- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp
@@ -261,8 +261,6 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const
substrait::JoinRel & join, DB::Q
QueryPlanPtr query_plan;
- /// Support only one join clause.
- table_join->addDisjunct();
/// some examples to explain when the post_join_filter is not empty
/// - on t1.key = t2.key and t1.v1 > 1 and t2.v1 > 1, 't1.v1> 1' is in the
post filter. but 't2.v1 > 1'
/// will be pushed down into right table by spark and is not in the post
filter. 't1.key = t2.key ' is
@@ -430,6 +428,8 @@ void JoinRelParser::collectJoinKeys(
{
if (!join_rel.has_expression())
return;
+ /// Support only one join clause.
+ table_join.addDisjunct();
const auto & expr = join_rel.expression();
auto & join_clause = table_join.getClauses().back();
std::list<const const substrait::Expression *> expressions_stack;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]