This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e25ab2e7a [GLUTEN-4451] [CH] fix header maybe changed by
FilterTransform (#6166)
e25ab2e7a is described below
commit e25ab2e7adc197ada9285e18f4e35544fec4e3fe
Author: shuai.xu <[email protected]>
AuthorDate: Fri Jun 21 18:10:16 2024 +0800
[GLUTEN-4451] [CH] fix header maybe changed by FilterTransform (#6166)
What changes were proposed in this pull request?
Rollback header if changed in FilterTransform
(Fixes: #4451)
How was this patch tested?
This patch was tested by integration tests.
---
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 50 ++++++++++++++++++++++
cpp-ch/local-engine/Parser/FilterRelParser.cpp | 7 ++-
.../local-engine/Parser/SerializedPlanParser.cpp | 13 ++++++
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 1 +
4 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 1d3bbec84..504015332 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2638,5 +2638,55 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
spark.sql("drop table test_tbl_5910_0")
spark.sql("drop table test_tbl_5910_1")
}
+
+ test("GLUTEN-4451: Fix schema may be changed by filter") {
+ val create_sql =
+ """
+ |create table if not exists test_tbl_4451(
+ | month_day string,
+ | month_dif int,
+ | is_month_new string,
+ | country string,
+ | os string,
+ | mr bigint
+ |) using parquet
+ |PARTITIONED BY (
+ | day string,
+ | app_name string)
+ |""".stripMargin
+ val insert_sql1 =
+ "INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc')
" +
+ "values('2024-06-01', 0, '1', 'CN', 'iOS', 100)"
+ val insert_sql2 =
+ "INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc')
" +
+ "values('2024-06-01', 0, '1', 'CN', 'iOS', 50)"
+ val insert_sql3 =
+ "INSERT into test_tbl_4451 partition (day='2024-06-01', app_name='abc')
" +
+ "values('2024-06-01', 1, '1', 'CN', 'iOS', 80)"
+ spark.sql(create_sql)
+ spark.sql(insert_sql1)
+ spark.sql(insert_sql2)
+ spark.sql(insert_sql3)
+ val select_sql =
+ """
+ |SELECT * FROM (
+ | SELECT
+ | month_day,
+ | country,
+ | if(os = 'ALite','Android',os) AS os,
+ | is_month_new,
+ | nvl(sum(if(month_dif = 0, mr, 0)),0) AS `month0_n`,
+ | nvl(sum(if(month_dif = 1, mr, 0)) / sum(if(month_dif = 0, mr,
0)),0) AS `month1_rate`,
+ | '2024-06-18' as day,
+ | app_name
+ | FROM test_tbl_4451
+ | GROUP BY month_day,country,if(os =
'ALite','Android',os),is_month_new,app_name
+ |) tt
+ |WHERE month0_n > 0 AND month1_rate <= 1 AND os IN
('all','Android','iOS')
+ | AND app_name IS NOT NULL
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(select_sql, true, { _ => })
+ spark.sql("drop table test_tbl_4451")
+ }
}
// scalastyle:on line.size.limit
diff --git a/cpp-ch/local-engine/Parser/FilterRelParser.cpp
b/cpp-ch/local-engine/Parser/FilterRelParser.cpp
index 4c71cc312..e0098f747 100644
--- a/cpp-ch/local-engine/Parser/FilterRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/FilterRelParser.cpp
@@ -59,7 +59,12 @@ DB::QueryPlanPtr FilterRelParser::parse(DB::QueryPlanPtr
query_plan, const subst
filter_step->setStepDescription("WHERE");
steps.emplace_back(filter_step.get());
query_plan->addStep(std::move(filter_step));
-
+
+ // header maybe changed, need to rollback it
+ if (!blocksHaveEqualStructure(input_header,
query_plan->getCurrentDataStream().header)) {
+
steps.emplace_back(getPlanParser()->addRollbackFilterHeaderStep(query_plan,
input_header));
+ }
+
// remove nullable
auto * remove_null_step =
getPlanParser()->addRemoveNullableStep(*query_plan, non_nullable_columns);
if (remove_null_step)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 5f2c9cc33..40e01e305 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -335,6 +335,19 @@ IQueryPlanStep *
SerializedPlanParser::addRemoveNullableStep(QueryPlan & plan, c
return step_ptr;
}
+IQueryPlanStep *
SerializedPlanParser::addRollbackFilterHeaderStep(QueryPlanPtr & query_plan,
const Block & input_header)
+{
+ auto convert_actions_dag = ActionsDAG::makeConvertingActions(
+ query_plan->getCurrentDataStream().header.getColumnsWithTypeAndName(),
+ input_header.getColumnsWithTypeAndName(),
+ ActionsDAG::MatchColumnsMode::Name);
+ auto expression_step =
std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(),
convert_actions_dag);
+ expression_step->setStepDescription("Generator for rollback filter");
+ auto * step_ptr = expression_step.get();
+ query_plan->addStep(std::move(expression_step));
+ return step_ptr;
+}
+
DataTypePtr wrapNullableType(substrait::Type_Nullability nullable, DataTypePtr
nested_type)
{
return wrapNullableType(nullable ==
substrait::Type_Nullability_NULLABILITY_NULLABLE, nested_type);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index ccd5c0fdc..45ff5a20b 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -299,6 +299,7 @@ public:
static std::string getFunctionName(const std::string & function_sig, const
substrait::Expression_ScalarFunction & function);
IQueryPlanStep * addRemoveNullableStep(QueryPlan & plan, const
std::set<String> & columns);
+ IQueryPlanStep * addRollbackFilterHeaderStep(QueryPlanPtr & query_plan,
const Block & input_header);
static ContextMutablePtr global_context;
static Context::ConfigurationPtr config;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]