This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 070022a9e [GLUTEN-6040][CH] Fix can't not load part after restart
spark session (#6041)
070022a9e is described below
commit 070022a9e5420cafb4688aab02abf6ad55ac0413
Author: Shuai li <[email protected]>
AuthorDate: Wed Jun 12 09:19:15 2024 +0800
[GLUTEN-6040][CH] Fix can't not load part after restart spark session
(#6041)
[CH] Fix can't not load part after restart spark session
---
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 83 ++++++++++++++++++++--
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 2 +-
2 files changed, 79 insertions(+), 6 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 44c2af76f..c5dc3a237 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -635,29 +635,102 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}
test("test mergetree insert with optimize basic") {
- val table_name = "lineitem_mergetree_insert_optimize_basic_s3"
- val dataPath = s"s3a://$BUCKET_NAME/$table_name"
+ val tableName = "lineitem_mergetree_insert_optimize_basic_s3"
+ val dataPath = s"s3a://$BUCKET_NAME/$tableName"
withSQLConf(
("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true")
) {
spark.sql(s"""
- |DROP TABLE IF EXISTS $table_name;
+ |DROP TABLE IF EXISTS $tableName;
|""".stripMargin)
spark.sql(s"""
- |CREATE TABLE IF NOT EXISTS $table_name
+ |CREATE TABLE IF NOT EXISTS $tableName
|USING clickhouse
|LOCATION '$dataPath'
| as select * from lineitem
|""".stripMargin)
- val ret = spark.sql(s"select count(*) from $table_name").collect()
+ val ret = spark.sql(s"select count(*) from $tableName").collect()
assert(ret.apply(0).get(0) == 600572)
assert(
!new
File(s"$CH_DEFAULT_STORAGE_DIR/lineitem_mergetree_insert_optimize_basic").exists())
}
}
+
+ test("test mergetree with primary keys pruning by driver") {
+ val tableName = "lineitem_mergetree_pk_pruning_by_driver_s3"
+ val dataPath = s"s3a://$BUCKET_NAME/$tableName"
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS $tableName;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (storage_policy='__s3_main',
orderByKey='l_shipdate')
+ |LOCATION '$dataPath'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table $tableName
+ | select * from lineitem
+ |""".stripMargin)
+
+ FileUtils.forceDelete(new File(S3_METADATA_PATH))
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | sum(l_extendedprice * l_discount) AS revenue
+ |FROM
+ | $tableName
+ |WHERE
+ | l_shipdate >= date'1994-01-01'
+ | AND l_shipdate < date'1994-01-01' + interval 1 year
+ | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+ | AND l_quantity < 24
+ |""".stripMargin
+
+ withSQLConf(
+
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
-> "true")) {
+ runTPCHQueryBySQL(6, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val plans = collect(df.queryExecution.executedPlan) {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans(0).getSplitInfos.size == 1)
+ }
+ }
+ }
}
// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 9afa83973..c36db6b74 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -398,7 +398,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const
substrait::ReadRel & read_
google::protobuf::StringValue table;
table.ParseFromString(read_rel.advanced_extension().enhancement().value());
auto merge_tree_table = parseMergeTreeTableString(table.value());
- auto custom_storage_mergetree = parseStorage(merge_tree_table,
global_context);
+ auto custom_storage_mergetree = parseStorage(merge_tree_table,
global_context, true);
auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]