This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 9d6f95dd9 [GLUTEN-5182] [CH] fix fail to parse post join filter (#5183)
9d6f95dd9 is described below
commit 9d6f95dd9c342b1e31ddbccb3411e6395edb1896
Author: shuai.xu <[email protected]>
AuthorDate: Sun Apr 7 18:10:17 2024 +0800
[GLUTEN-5182] [CH] fix fail to parse post join filter (#5183)
What changes were proposed in this pull request?
This pr fix the bug that may fail to parse post join filter if contains
complex expressions.
(Fixes: #5182)
How was this patch tested?
This patch was tested by ut.
---
.../local-engine/Parser/SerializedPlanParser.cpp | 30 ++++++++---------
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 39 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 39 ++++++++++++++++++++++
.../hive/execution/GlutenHiveSQLQuerySuite.scala | 39 ++++++++++++++++++++++
4 files changed, 132 insertions(+), 15 deletions(-)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index f7926c8ab..8e0284c90 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -1927,23 +1927,23 @@ ActionsDAGPtr ASTParser::convertToActions(const
NamesAndTypesList & name_and_typ
ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression
& rel)
{
LOG_DEBUG(&Poco::Logger::get("ASTParser"), "substrait plan:\n{}",
rel.DebugString());
- if (rel.has_singular_or_list())
- return parseArgumentToAST(names, rel);
- if (!rel.has_scalar_function())
- throw Exception(ErrorCodes::BAD_ARGUMENTS, "the root of expression
should be a scalar function:\n {}", rel.DebugString());
-
- const auto & scalar_function = rel.scalar_function();
- auto function_signature =
function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
+ if (rel.has_scalar_function())
+ {
+ const auto & scalar_function = rel.scalar_function();
+ auto function_signature =
function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
- auto substrait_name = function_signature.substr(0,
function_signature.find(':'));
- auto func_parser =
FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
- String function_name = func_parser ?
func_parser->getCHFunctionName(scalar_function)
- :
SerializedPlanParser::getFunctionName(function_signature, scalar_function);
+ auto substrait_name = function_signature.substr(0,
function_signature.find(':'));
+ auto func_parser =
FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
+ String function_name = func_parser ?
func_parser->getCHFunctionName(scalar_function)
+ :
SerializedPlanParser::getFunctionName(function_signature, scalar_function);
- ASTs ast_args;
- parseFunctionArgumentsToAST(names, scalar_function, ast_args);
+ ASTs ast_args;
+ parseFunctionArgumentsToAST(names, scalar_function, ast_args);
- return makeASTFunction(function_name, ast_args);
+ return makeASTFunction(function_name, ast_args);
+ }
+ else
+ return parseArgumentToAST(names, rel);
}
void ASTParser::parseFunctionArgumentsToAST(
@@ -2375,4 +2375,4 @@ std::string
NonNullableColumnsResolver::safeGetFunctionName(
return "";
}
}
-}
\ No newline at end of file
+}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 99ed63393..84c83aae3 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -146,4 +146,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5128_0;")
+ sql("DROP TABLE IF EXISTS test_5128_1;")
+ sql(
+ "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 3b6472d2e..50cd93841 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -203,4 +203,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5128_0;")
+ sql("DROP TABLE IF EXISTS test_5128_1;")
+ sql(
+ "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index dfcd9075c..15a649acf 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -118,4 +118,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
ignoreIfNotExists = true,
purge = false)
}
+
+ testGluten("5182: Fix failed to parse post join filters") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+ sql("DROP TABLE IF EXISTS test_5128_0;")
+ sql("DROP TABLE IF EXISTS test_5128_1;")
+ sql(
+ "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count
int, " +
+ "status bigint, ts bigint, vm_typeid int) " +
+ "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day`
STRING);")
+ sql(
+ "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+ "ss_start_time bigint, ss_end_time bigint) " +
+ "USING hive OPTIONS(fileFormat 'parquet');")
+ sql(
+ "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+ "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+ sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+ val df = spark.sql(
+ "select ee.from_uid as uid,day, vgift_typeid, money from " +
+ "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+ "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+ "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+ "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+ "where day between '2024-03-30' and '2024-03-31' and status=1 and
vm_typeid=2) t_a " +
+ "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+ "where t_b.groupid in (1,2)) ee where ss_id=1;")
+ checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+ }
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_0"),
+ ignoreIfNotExists = true,
+ purge = false)
+ spark.sessionState.catalog.dropTable(
+ TableIdentifier("test_5128_1"),
+ ignoreIfNotExists = true,
+ purge = false)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]