This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d6f95dd9 [GLUTEN-5182] [CH] fix fail to parse post join filter (#5183)
9d6f95dd9 is described below

commit 9d6f95dd9c342b1e31ddbccb3411e6395edb1896
Author: shuai.xu <[email protected]>
AuthorDate: Sun Apr 7 18:10:17 2024 +0800

    [GLUTEN-5182] [CH] fix fail to parse post join filter (#5183)
    
    What changes were proposed in this pull request?
    This pr fix the bug that may fail to parse post join filter if contains 
complex expressions.
    
    (Fixes: #5182)
    
    How was this patch tested?
    This patch was tested by ut.
---
 .../local-engine/Parser/SerializedPlanParser.cpp   | 30 ++++++++---------
 .../hive/execution/GlutenHiveSQLQuerySuite.scala   | 39 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQuerySuite.scala   | 39 ++++++++++++++++++++++
 .../hive/execution/GlutenHiveSQLQuerySuite.scala   | 39 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 15 deletions(-)

diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index f7926c8ab..8e0284c90 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -1927,23 +1927,23 @@ ActionsDAGPtr ASTParser::convertToActions(const 
NamesAndTypesList & name_and_typ
 ASTPtr ASTParser::parseToAST(const Names & names, const substrait::Expression 
& rel)
 {
     LOG_DEBUG(&Poco::Logger::get("ASTParser"), "substrait plan:\n{}", 
rel.DebugString());
-    if (rel.has_singular_or_list())
-        return parseArgumentToAST(names, rel);
-    if (!rel.has_scalar_function())
-        throw Exception(ErrorCodes::BAD_ARGUMENTS, "the root of expression 
should be a scalar function:\n {}", rel.DebugString());
-
-    const auto & scalar_function = rel.scalar_function();
-    auto function_signature = 
function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
+    if (rel.has_scalar_function())
+    {
+        const auto & scalar_function = rel.scalar_function();
+        auto function_signature = 
function_mapping.at(std::to_string(rel.scalar_function().function_reference()));
 
-    auto substrait_name = function_signature.substr(0, 
function_signature.find(':'));
-    auto func_parser = 
FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
-    String function_name = func_parser ? 
func_parser->getCHFunctionName(scalar_function)
-                                       : 
SerializedPlanParser::getFunctionName(function_signature, scalar_function);
+        auto substrait_name = function_signature.substr(0, 
function_signature.find(':'));
+        auto func_parser = 
FunctionParserFactory::instance().tryGet(substrait_name, plan_parser);
+        String function_name = func_parser ? 
func_parser->getCHFunctionName(scalar_function)
+                                           : 
SerializedPlanParser::getFunctionName(function_signature, scalar_function);
 
-    ASTs ast_args;
-    parseFunctionArgumentsToAST(names, scalar_function, ast_args);
+        ASTs ast_args;
+        parseFunctionArgumentsToAST(names, scalar_function, ast_args);
 
-    return makeASTFunction(function_name, ast_args);
+        return makeASTFunction(function_name, ast_args);
+    }
+    else
+        return parseArgumentToAST(names, rel);
 }
 
 void ASTParser::parseFunctionArgumentsToAST(
@@ -2375,4 +2375,4 @@ std::string 
NonNullableColumnsResolver::safeGetFunctionName(
         return "";
     }
 }
-}
\ No newline at end of file
+}
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 99ed63393..84c83aae3 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -146,4 +146,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("5182: Fix failed to parse post join filters") {
+    withSQLConf(
+      "spark.sql.hive.convertMetastoreParquet" -> "false",
+      "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+      sql("DROP TABLE IF EXISTS test_5128_0;")
+      sql("DROP TABLE IF EXISTS test_5128_1;")
+      sql(
+        "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count 
int, " +
+          "status bigint, ts bigint, vm_typeid int) " +
+          "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` 
STRING);")
+      sql(
+        "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+          "ss_start_time bigint, ss_end_time bigint) " +
+          "USING hive OPTIONS(fileFormat 'parquet');")
+      sql(
+        "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+          "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+      sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+      val df = spark.sql(
+        "select ee.from_uid as uid,day, vgift_typeid, money from " +
+          "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+          "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+          "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+          "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+          "where day between '2024-03-30' and '2024-03-31' and status=1 and 
vm_typeid=2) t_a " +
+          "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+          "where t_b.groupid in (1,2)) ee where ss_id=1;")
+      checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+    }
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_0"),
+      ignoreIfNotExists = true,
+      purge = false)
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_1"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index 3b6472d2e..50cd93841 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -203,4 +203,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("5182: Fix failed to parse post join filters") {
+    withSQLConf(
+      "spark.sql.hive.convertMetastoreParquet" -> "false",
+      "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+      sql("DROP TABLE IF EXISTS test_5128_0;")
+      sql("DROP TABLE IF EXISTS test_5128_1;")
+      sql(
+        "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count 
int, " +
+          "status bigint, ts bigint, vm_typeid int) " +
+          "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` 
STRING);")
+      sql(
+        "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+          "ss_start_time bigint, ss_end_time bigint) " +
+          "USING hive OPTIONS(fileFormat 'parquet');")
+      sql(
+        "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+          "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+      sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+      val df = spark.sql(
+        "select ee.from_uid as uid,day, vgift_typeid, money from " +
+          "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+          "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+          "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+          "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+          "where day between '2024-03-30' and '2024-03-31' and status=1 and 
vm_typeid=2) t_a " +
+          "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+          "where t_b.groupid in (1,2)) ee where ss_id=1;")
+      checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+    }
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_0"),
+      ignoreIfNotExists = true,
+      purge = false)
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_1"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
index dfcd9075c..15a649acf 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/hive/execution/GlutenHiveSQLQuerySuite.scala
@@ -118,4 +118,43 @@ class GlutenHiveSQLQuerySuite extends GlutenSQLTestsTrait {
       ignoreIfNotExists = true,
       purge = false)
   }
+
+  testGluten("5182: Fix failed to parse post join filters") {
+    withSQLConf(
+      "spark.sql.hive.convertMetastoreParquet" -> "false",
+      "spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
+      sql("DROP TABLE IF EXISTS test_5128_0;")
+      sql("DROP TABLE IF EXISTS test_5128_1;")
+      sql(
+        "CREATE TABLE test_5128_0 (from_uid STRING, vgift_typeid int, vm_count 
int, " +
+          "status bigint, ts bigint, vm_typeid int) " +
+          "USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` 
STRING);")
+      sql(
+        "CREATE TABLE test_5128_1 (typeid int, groupid int, ss_id bigint, " +
+          "ss_start_time bigint, ss_end_time bigint) " +
+          "USING hive OPTIONS(fileFormat 'parquet');")
+      sql(
+        "INSERT INTO test_5128_0 partition(day='2024-03-31') " +
+          "VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
+      sql("INSERT INTO test_5128_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
+      val df = spark.sql(
+        "select ee.from_uid as uid,day, vgift_typeid, money from " +
+          "(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
+          "t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
+          "t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
+          "(select from_uid,day,vgift_typeid,vm_count,ts from test_5128_0 " +
+          "where day between '2024-03-30' and '2024-03-31' and status=1 and 
vm_typeid=2) t_a " +
+          "left join test_5128_1 t_b on t_a.vgift_typeid=t_b.typeid " +
+          "where t_b.groupid in (1,2)) ee where ss_id=1;")
+      checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
+    }
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_0"),
+      ignoreIfNotExists = true,
+      purge = false)
+    spark.sessionState.catalog.dropTable(
+      TableIdentifier("test_5128_1"),
+      ignoreIfNotExists = true,
+      purge = false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to