This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new a149a5c748 [Feature](inverted index) push count on index down to scan
node #22687 (#23055)
a149a5c748 is described below
commit a149a5c7481afea8028efab14853552dbaa855dc
Author: airborne12 <[email protected]>
AuthorDate: Fri Aug 18 18:28:14 2023 +0800
[Feature](inverted index) push count on index down to scan node #22687
(#23055)
---
be/src/olap/rowset/segment_v2/segment.cpp | 3 +-
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 14 ++++-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 3 +-
.../glue/translator/PhysicalPlanTranslator.java | 3 +
.../org/apache/doris/nereids/rules/RuleType.java | 1 +
.../rules/implementation/AggregateStrategies.java | 73 ++++++++++++++++++++++
.../physical/PhysicalStorageLayerAggregate.java | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 17 ++++-
gensrc/thrift/PlanNodes.thrift | 3 +-
9 files changed, 113 insertions(+), 7 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index c6d472d035..f70d1bd1d5 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -160,7 +160,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const
StorageReadOptions& read_o
RETURN_IF_ERROR(load_index());
if (read_options.delete_condition_predicates->num_of_column_predicate() ==
0 &&
- read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
+ read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
+ read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(),
*schema));
} else {
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index f1c5b15bce..cc5d4b9fdb 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -956,9 +956,19 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
// occurring, return true here that column data needs to be read
return true;
}
+ // Check the following conditions:
+ // 1. If the column represented by the unique ID is an inverted index
column (indicated by '_need_read_data_indices.count(unique_id) > 0 &&
!_need_read_data_indices[unique_id]')
+ // and it's not marked for projection in '_output_columns'.
+ // 2. Or, if the column is an inverted index column and it's marked for
projection in '_output_columns',
+ // and the operation is a push down of the 'COUNT_ON_INDEX' aggregation
function.
+ // If any of the above conditions are met, log a debug message indicating
that there's no need to read data for the indexed column.
+ // Then, return false.
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
- if (_need_read_data_indices.count(unique_id) > 0 &&
!_need_read_data_indices[unique_id] &&
- _output_columns.count(unique_id) < 1) {
+ if ((_need_read_data_indices.count(unique_id) > 0 &&
!_need_read_data_indices[unique_id] &&
+ _output_columns.count(unique_id) < 1) ||
+ (_need_read_data_indices.count(unique_id) > 0 &&
!_need_read_data_indices[unique_id] &&
+ _output_columns.count(unique_id) == 1 &&
+ _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
VLOG_DEBUG << "SegmentIterator no need read data for column: "
<< _opts.tablet_schema->column_by_uid(unique_id).name();
return false;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 3af5bb9f89..c5f9e75586 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -249,7 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() {
}
Status NewOlapScanNode::_build_key_ranges_and_filters() {
- if (_push_down_agg_type == TPushAggOp::NONE) {
+ if (_push_down_agg_type == TPushAggOp::NONE ||
+ _push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) {
const std::vector<std::string>& column_names =
_olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types =
_olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 9fda9b99f9..ed9b30f93e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -787,6 +787,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
case COUNT:
pushAggOp = TPushAggOp.COUNT;
break;
+ case COUNT_ON_MATCH:
+ pushAggOp = TPushAggOp.COUNT_ON_INDEX;
+ break;
case MIN_MAX:
pushAggOp = TPushAggOp.MINMAX;
break;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 4015a30ef6..3be748fe0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -319,6 +319,7 @@ public enum RuleType {
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION),
+ COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 03962ff752..48e975b00c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
+import org.apache.doris.nereids.trees.expressions.Match;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
@@ -56,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
@@ -97,6 +99,28 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
PatternDescriptor<LogicalAggregate<GroupPlan>> basePattern =
logicalAggregate();
return ImmutableList.of(
+ RuleType.COUNT_ON_INDEX.build(
+ logicalAggregate(
+ logicalProject(
+ logicalFilter(
+ logicalOlapScan()
+ ).when(filter ->
containsMatchExpression(filter.getExpressions())
+ && filter.getExpressions().size() == 1)
+ ))
+ .when(agg -> enablePushDownCountOnIndex())
+ .when(agg -> agg.getGroupByExpressions().size() == 0)
+ .when(agg -> {
+ Set<AggregateFunction> funcs =
agg.getAggregateFunctions();
+ return !funcs.isEmpty() && funcs.stream().allMatch(f
-> f instanceof Count && !f.isDistinct());
+ })
+ .thenApply(ctx -> {
+
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
+ LogicalProject<LogicalFilter<LogicalOlapScan>> project
= agg.child();
+ LogicalFilter<LogicalOlapScan> filter =
project.child();
+ LogicalOlapScan olapScan = filter.child();
+ return pushdownCountOnIndex(agg, project, filter,
olapScan, ctx.cascadesContext);
+ })
+ ),
RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build(
logicalAggregate(
logicalOlapScan()
@@ -188,6 +212,55 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
);
}
+ private boolean containsMatchExpression(List<Expression> expressions) {
+ return expressions.stream().allMatch(expr -> expr instanceof Match);
+ }
+
+ private boolean enablePushDownCountOnIndex() {
+ ConnectContext connectContext = ConnectContext.get();
+ return connectContext != null &&
connectContext.getSessionVariable().isEnablePushDownCountOnIndex();
+ }
+
+ /**
+ * sql: select count(*) from tbl where column match 'token'
+ * <p>
+ * before:
+ * <p>
+ * LogicalAggregate(groupBy=[], output=[count(*)])
+ * |
+ * LogicalFilter(column match 'token')
+ * |
+ * LogicalOlapScan(table=tbl)
+ * <p>
+ * after:
+ * <p>
+ * LogicalAggregate(groupBy=[], output=[count(*)])
+ * |
+ * LogicalFilter(column match 'token')
+ * |
+ * PhysicalStorageLayerAggregate(pushAggOp=COUNT_ON_INDEX,
table=PhysicalOlapScan(table=tbl))
+ *
+ */
+ private LogicalAggregate<? extends Plan> pushdownCountOnIndex(
+ LogicalAggregate<? extends Plan> agg,
+ LogicalProject<? extends Plan> project,
+ LogicalFilter<? extends Plan> filter,
+ LogicalOlapScan olapScan,
+ CascadesContext cascadesContext) {
+ PhysicalOlapScan physicalOlapScan
+ = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
+ .build()
+ .transform(olapScan, cascadesContext)
+ .get(0);
+ return agg.withChildren(ImmutableList.of(
+ project.withChildren(ImmutableList.of(
+ filter.withChildren(ImmutableList.of(
+ new PhysicalStorageLayerAggregate(
+ physicalOlapScan,
+ PushDownAggOp.COUNT_ON_MATCH)))))
+ ));
+ }
+
/**
* sql: select count(*) from tbl
* <p>
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
index 7a9550adc3..73fdfa7305 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java
@@ -108,8 +108,9 @@ public class PhysicalStorageLayerAggregate extends
PhysicalCatalogRelation {
/** PushAggOp */
public enum PushDownAggOp {
- COUNT, MIN_MAX, MIX;
+ COUNT, MIN_MAX, MIX, COUNT_ON_MATCH;
+ /** supportedFunctions */
public static Map<Class<? extends AggregateFunction>, PushDownAggOp>
supportedFunctions() {
return ImmutableMap.<Class<? extends AggregateFunction>,
PushDownAggOp>builder()
.put(Count.class, PushDownAggOp.COUNT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 36afda4d3b..d570dc3f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -318,6 +318,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_INVERTED_INDEX_QUERY =
"enable_inverted_index_query";
+ public static final String ENABLE_PUSHDOWN_COUNT_ON_INDEX =
"enable_count_on_index_pushdown";
+
public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST =
"group_by_and_having_use_alias_first";
public static final String DROP_TABLE_IF_CTAS_FAILED =
"drop_table_if_ctas_failed";
@@ -958,9 +960,14 @@ public class SessionVariable implements Serializable,
Writable {
// Whether enable query with inverted index.
@VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward =
true, description = {
- "是否启用inverted index query。", "Set wether to use inverted index
query."})
+ "是否启用inverted index query。", "Set whether to use inverted index
query."})
public boolean enableInvertedIndexQuery = true;
+ // Whether enable pushdown count agg to scan node when using inverted
index match.
+ @VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward =
true, description = {
+ "是否启用count_on_index pushdown。", "Set whether to pushdown
count_on_index."})
+ public boolean enablePushDownCountOnIndex = true;
+
// Whether drop table when create table as select insert data appear error.
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
public boolean dropTableIfCtasFailed = true;
@@ -2019,6 +2026,14 @@ public class SessionVariable implements Serializable,
Writable {
this.enableInvertedIndexQuery = enableInvertedIndexQuery;
}
+ public boolean isEnablePushDownCountOnIndex() {
+ return enablePushDownCountOnIndex;
+ }
+
+ public void setEnablePushDownCountOnIndex(boolean
enablePushDownCountOnIndex) {
+ this.enablePushDownCountOnIndex = enablePushDownCountOnIndex;
+ }
+
public int getMaxTableCountUseCascadesJoinReorder() {
return this.maxTableCountUseCascadesJoinReorder;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 614017d66a..72cd772cfc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -622,7 +622,8 @@ enum TPushAggOp {
NONE = 0,
MINMAX = 1,
COUNT = 2,
- MIX = 3
+ MIX = 3,
+ COUNT_ON_INDEX = 4
}
struct TOlapScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]