This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d762deb9b [GLUTEN-5041][CH] Fix primary not used when query with
filter (#5045)
d762deb9b is described below
commit d762deb9bee73b21df48e5a3f51f3fe6cad4396b
Author: Shuai li <[email protected]>
AuthorDate: Wed Mar 20 19:00:59 2024 +0800
[GLUTEN-5041][CH] Fix primary not used when query with filter (#5045)
Fix primary not used when query with filter
---
.../java/io/glutenproject/metrics/MetricsStep.java | 24 ++++++
.../backendsapi/clickhouse/CHMetricsApi.scala | 4 +-
.../metrics/FileSourceScanMetricsUpdater.scala | 8 ++
.../GlutenClickHouseMergeTreeWriteSuite.scala | 89 ++++++++++++++++++++++
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 8 ++
cpp-ch/local-engine/Parser/RelMetric.cpp | 12 +++
6 files changed, 144 insertions(+), 1 deletion(-)
diff --git
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
index de39f7679..c569cd2ee 100644
---
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
+++
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
@@ -16,6 +16,8 @@
*/
package io.glutenproject.metrics;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.List;
public class MetricsStep {
@@ -24,6 +26,12 @@ public class MetricsStep {
protected String description;
protected List<MetricsProcessor> processors;
+ @JsonProperty("total_marks_pk")
+ protected long totalMarksPk;
+
+ @JsonProperty("selected_marks_pk")
+ protected long selectedMarksPk;
+
public String getName() {
return name;
}
@@ -47,4 +55,20 @@ public class MetricsStep {
public void setProcessors(List<MetricsProcessor> processors) {
this.processors = processors;
}
+
+ public void setTotalMarksPk(long totalMarksPk) {
+ this.totalMarksPk = totalMarksPk;
+ }
+
+ public void setSelectedMarksPk(long selectedMarksPk) {
+ this.selectedMarksPk = selectedMarksPk;
+ }
+
+ public long getTotalMarksPk() {
+ return totalMarksPk;
+ }
+
+ public long getSelectedMarksPk() {
+ return selectedMarksPk;
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index cda406872..3012d5371 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -125,7 +125,9 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"pruningTime" ->
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning
time"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
- "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time")
+ "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
+ "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected
marks primary"),
+ "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks
primary")
)
override def genFileSourceScanTransformerMetricsUpdater(
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 1c6da8dad..8c79536bd 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -35,6 +35,8 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
+ val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
+ val total_marks_pk: SQLMetric = metrics("totalMarksPk")
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -51,6 +53,12 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
outputWaitTime += (metricsData.outputWaitTime / 1000L).toLong
outputVectors += metricsData.outputVectors
+ metricsData.getSteps.forEach(
+ step => {
+ selected_marks_pk += step.selectedMarksPk
+ total_marks_pk += step.totalMarksPk
+ })
+
MetricsUtil.updateExtraTimeMetric(
metricsData,
extraTime,
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 2960b502b..f8aa2cfa4 100644
---
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1290,5 +1290,94 @@ class GlutenClickHouseMergeTreeWriteSuite
)
}
+ test("test mergetree with primary keys filter") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey2;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey2
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_shipdate,l_orderkey',
+ | primaryKey='l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_orderbykey2'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_orderbykey2
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | sum(l_extendedprice * l_discount) AS revenue
+ |FROM
+ | lineitem_mergetree_orderbykey2
+ |WHERE
+ | l_shipdate >= date'1994-01-01'
+ | AND l_shipdate < date'1994-01-01' + interval 1 year
+ | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+ | AND l_quantity < 24
+ |""".stripMargin
+ runTPCHQueryBySQL(6, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate,l_orderkey"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate"))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 6)
+ assert(addFiles.map(_.rows).sum == 600572)
+
+ val plans = collect(df.queryExecution.executedPlan) {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans(0).metrics("selectedMarksPk").value === 17)
+ assert(plans(0).metrics("totalMarksPk").value === 74)
+ }
+ }
}
// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 8737ecb7d..34746217b 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -171,6 +171,14 @@ MergeTreeRelParser::parseReadRel(
context,
context->getSettingsRef().max_block_size,
1);
+
+ auto * source_step_with_filter = static_cast<SourceStepWithFilter
*>(read_step.get());
+ const auto & storage_prewhere_info = query_info->prewhere_info;
+ if (storage_prewhere_info)
+ {
+
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions,
storage_prewhere_info->prewhere_column_name);
+ source_step_with_filter->applyFilters();
+ }
query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
*>(read_step.get()), ranges);
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 10a49639b..2449fa4e5 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -17,6 +17,7 @@
#include "RelMetric.h"
#include <Processors/IProcessor.h>
#include <Processors/QueryPlan/AggregatingStep.h>
+#include <Processors/QueryPlan/ReadFromMergeTree.h>
using namespace rapidjson;
@@ -115,6 +116,17 @@ void RelMetric::serialize(Writer<StringBuffer> & writer,
bool) const
writer.EndObject();
}
writer.EndArray();
+
+ if (auto read_mergetree =
dynamic_cast<DB::ReadFromMergeTree*>(step))
+ {
+ auto selected_marks_pk =
read_mergetree->getAnalysisResult().selected_marks_pk;
+ auto total_marks_pk =
read_mergetree->getAnalysisResult().total_marks_pk;
+ writer.Key("selected_marks_pk");
+ writer.Uint64(selected_marks_pk);
+ writer.Key("total_marks_pk");
+ writer.Uint64(total_marks_pk);
+ }
+
writer.EndObject();
}
writer.EndArray();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]