This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab554bee2c4 [Feature](topn) BE adaptive choose whether push topn
filter down to storage layer (#34713)
ab554bee2c4 is described below
commit ab554bee2c43e0064dde17526ab5355ecdbb4510
Author: Pxl <[email protected]>
AuthorDate: Mon May 13 11:29:50 2024 +0800
[Feature](topn) BE adaptive choose whether push topn filter down to storage
layer (#34713)
support judge topn filter push down
topn filter will push down to storage layer when 2 case:
filter target is key column
table data model is dup/merge on write
---
be/src/pipeline/exec/olap_scan_operator.h | 7 ++++++-
be/src/pipeline/exec/scan_operator.cpp | 9 ++-------
be/src/pipeline/exec/scan_operator.h | 16 +++++++++++++---
be/src/runtime/runtime_predicate.h | 4 +++-
be/src/vec/exec/scan/new_olap_scanner.cpp | 15 ++++++---------
5 files changed, 30 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 15dfd821772..daff2167f7f 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -80,7 +80,12 @@ private:
bool _storage_no_merge() override;
- bool _push_down_topn() override { return true; }
+ bool _push_down_topn(const vectorized::RuntimePredicate& predicate)
override {
+ if (!predicate.target_is_slot()) {
+ return false;
+ }
+ return _is_key_column(predicate.get_col_name()) || _storage_no_merge();
+ }
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index ddb379e0977..19a3911c6a7 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -189,9 +189,7 @@ Status
ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]],
type);
}
- if (!_push_down_topn()) {
- RETURN_IF_ERROR(_get_topn_filters(state));
- }
+ RETURN_IF_ERROR(_get_topn_filters(state));
for (auto it = _conjuncts.begin(); it != _conjuncts.end();) {
auto& conjunct = *it;
@@ -1269,11 +1267,8 @@ Status ScanLocalState<Derived>::_init_profile() {
template <typename Derived>
Status ScanLocalState<Derived>::_get_topn_filters(RuntimeState* state) {
- for (auto id : get_topn_filter_source_node_ids()) {
+ for (auto id : get_topn_filter_source_node_ids(state, false)) {
const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
- if (!pred.inited()) {
- continue;
- }
SlotDescriptor* slot_desc =
_slot_id_to_slot_desc[_colname_to_slot_id[pred.get_col_name()]];
vectorized::VExprSPtr topn_pred;
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 3ebd573fc71..e7ce9b31d19 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -158,8 +158,18 @@ class ScanLocalState : public ScanLocalStateBase {
std::vector<Dependency*> dependencies() const override { return
{_scan_dependency.get()}; }
- std::vector<int> get_topn_filter_source_node_ids() {
- return _parent->cast<typename
Derived::Parent>().topn_filter_source_node_ids;
+ std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool
push_down) {
+ std::vector<int> result;
+ for (int id : _parent->cast<typename
Derived::Parent>().topn_filter_source_node_ids) {
+ const auto& pred =
state->get_query_ctx()->get_runtime_predicate(id);
+ if (!pred.inited()) {
+ continue;
+ }
+ if (_push_down_topn(pred) == push_down) {
+ result.push_back(id);
+ }
+ }
+ return result;
}
protected:
@@ -176,7 +186,7 @@ protected:
virtual bool _should_push_down_common_expr() { return false; }
virtual bool _storage_no_merge() { return false; }
- virtual bool _push_down_topn() { return false; }
+ virtual bool _push_down_topn(const vectorized::RuntimePredicate&
predicate) { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
virtual vectorized::VScanNode::PushDownType
_should_push_down_bloom_filter() {
return vectorized::VScanNode::PushDownType::UNACCEPTABLE;
diff --git a/be/src/runtime/runtime_predicate.h
b/be/src/runtime/runtime_predicate.h
index 00fbd62dd88..0305994e0fc 100644
--- a/be/src/runtime/runtime_predicate.h
+++ b/be/src/runtime/runtime_predicate.h
@@ -47,6 +47,7 @@ public:
Status init(PrimitiveType type, bool nulls_first, bool is_asc, const
std::string& col_name);
bool inited() const {
+ // when sort node and scan node are not in the same fragment,
predicate will not be initialized
std::shared_lock<std::shared_mutex> rlock(_rwlock);
return _inited;
}
@@ -58,7 +59,6 @@ public:
Status set_tablet_schema(TabletSchemaSPtr tablet_schema) {
std::unique_lock<std::shared_mutex> wlock(_rwlock);
- // when sort node and scan node are not in the same backend, predicate
will not be initialized
if (_tablet_schema || !_inited) {
return Status::OK();
}
@@ -92,6 +92,8 @@ public:
bool nulls_first() const { return _nulls_first; }
+ bool target_is_slot() const { return true; }
+
private:
mutable std::shared_mutex _rwlock;
Field _orderby_extrem {Field::Types::Null};
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index c058c8f3299..56593ae61e5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -401,24 +401,21 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.filter_block_conjuncts = _conjuncts;
}
- // runtime predicate push down optimization for topn
- if (!_parent && !((pipeline::OlapScanLocalState*)_local_state)
- ->get_topn_filter_source_node_ids()
- .empty()) {
- // the new topn whitch support external table
+ if (!_parent) {
_tablet_reader_params.topn_filter_source_node_ids =
((pipeline::OlapScanLocalState*)_local_state)
- ->get_topn_filter_source_node_ids();
- } else {
+ ->get_topn_filter_source_node_ids(_state, true);
+ }
+
+ if (_tablet_reader_params.topn_filter_source_node_ids.empty()) {
+ // old topn logic
_tablet_reader_params.use_topn_opt = olap_scan_node.use_topn_opt;
if (_tablet_reader_params.use_topn_opt) {
if (olap_scan_node.__isset.topn_filter_source_node_ids) {
- // the 2.1 new multiple topn
_tablet_reader_params.topn_filter_source_node_ids =
olap_scan_node.topn_filter_source_node_ids;
} else {
- // the 2.0 old topn
_tablet_reader_params.topn_filter_source_node_ids = {0};
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]