This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 070022a9e [GLUTEN-6040][CH] Fix can't not load part after restart 
spark session (#6041)
070022a9e is described below

commit 070022a9e5420cafb4688aab02abf6ad55ac0413
Author: Shuai li <[email protected]>
AuthorDate: Wed Jun 12 09:19:15 2024 +0800

    [GLUTEN-6040][CH] Fix can't not load part after restart spark session 
(#6041)
    
    [CH] Fix can't not load part after restart spark session
---
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  | 83 ++++++++++++++++++++--
 cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp  |  2 +-
 2 files changed, 79 insertions(+), 6 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 44c2af76f..c5dc3a237 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -635,29 +635,102 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
   }
 
   test("test mergetree insert with optimize basic") {
-    val table_name = "lineitem_mergetree_insert_optimize_basic_s3"
-    val dataPath = s"s3a://$BUCKET_NAME/$table_name"
+    val tableName = "lineitem_mergetree_insert_optimize_basic_s3"
+    val dataPath = s"s3a://$BUCKET_NAME/$tableName"
 
     withSQLConf(
       ("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
       
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true")
     ) {
       spark.sql(s"""
-                   |DROP TABLE IF EXISTS $table_name;
+                   |DROP TABLE IF EXISTS $tableName;
                    |""".stripMargin)
 
       spark.sql(s"""
-                   |CREATE TABLE IF NOT EXISTS $table_name
+                   |CREATE TABLE IF NOT EXISTS $tableName
                    |USING clickhouse
                    |LOCATION '$dataPath'
                    | as select * from lineitem
                    |""".stripMargin)
 
-      val ret = spark.sql(s"select count(*) from $table_name").collect()
+      val ret = spark.sql(s"select count(*) from $tableName").collect()
       assert(ret.apply(0).get(0) == 600572)
       assert(
         !new 
File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
     }
   }
+
+  test("test mergetree with primary keys pruning by driver") {
+    val tableName = "lineitem_mergetree_pk_pruning_by_driver_s3"
+    val dataPath = s"s3a://$BUCKET_NAME/$tableName"
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS $tableName;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS $tableName
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |TBLPROPERTIES (storage_policy='__s3_main', 
orderByKey='l_shipdate')
+                 |LOCATION '$dataPath'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table $tableName
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    FileUtils.forceDelete(new File(S3_METADATA_PATH))
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    sum(l_extendedprice * l_discount) AS revenue
+         |FROM
+         |    $tableName
+         |WHERE
+         |    l_shipdate >= date'1994-01-01'
+         |    AND l_shipdate < date'1994-01-01' + interval 1 year
+         |    AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+         |    AND l_quantity < 24
+         |""".stripMargin
+
+    withSQLConf(
+      
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
 -> "true")) {
+      runTPCHQueryBySQL(6, sqlStr) {
+        df =>
+          val scanExec = collect(df.queryExecution.executedPlan) {
+            case f: FileSourceScanExecTransformer => f
+          }
+          assert(scanExec.size == 1)
+
+          val mergetreeScan = scanExec(0)
+          assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+          val plans = collect(df.queryExecution.executedPlan) {
+            case scanExec: BasicScanExecTransformer => scanExec
+          }
+          assert(plans.size == 1)
+          assert(plans(0).getSplitInfos.size == 1)
+      }
+    }
+  }
 }
 // scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 9afa83973..c36db6b74 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -398,7 +398,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const 
substrait::ReadRel & read_
     google::protobuf::StringValue table;
     table.ParseFromString(read_rel.advanced_extension().enhancement().value());
     auto merge_tree_table = parseMergeTreeTableString(table.value());
-    auto custom_storage_mergetree = parseStorage(merge_tree_table, 
global_context);
+    auto custom_storage_mergetree = parseStorage(merge_tree_table, 
global_context, true);
 
     auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
     auto names_and_types_list = input.getNamesAndTypesList();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to