This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 15abafee71 [Bug](runtime filters) support late-arrival runtime filters
(#11599)
15abafee71 is described below
commit 15abafee71cfd1e75963e6bf20d3927840239984
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 12 11:55:15 2022 +0800
[Bug](runtime filters) support late-arrival runtime filters (#11599)
---
be/src/vec/exec/volap_scan_node.cpp | 45 ++---------------------------
be/src/vec/exec/volap_scanner.cpp | 3 ++
be/src/vec/exec/volap_scanner.h | 8 +++++
be/src/vec/exprs/vbloom_predicate.cpp | 16 ++++++++--
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 14 +++++++--
be/src/vec/exprs/vruntimefilter_wrapper.h | 2 ++
6 files changed, 41 insertions(+), 47 deletions(-)
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index cd4b3bb42d..0b2e814e55 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -420,9 +420,6 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
scanner->set_opened();
}
- /*
- // the following code will cause coredump when running tpcds_sf1 sqls,
- // disable temporariy to avoid it, SHOULD BE FIX LATER
std::vector<VExpr*> vexprs;
auto& scanner_filter_apply_marks =
*scanner->mutable_runtime_filter_marks();
DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
@@ -450,42 +447,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
if (!_runtime_filter_ready_flag[i]) {
// Use all conjuncts and new arrival runtime filters
to construct a new
// expression tree here.
- auto last_expr =
- _vconjunct_ctx_ptr ?
(*_vconjunct_ctx_ptr)->root() : vexprs[0];
- for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j <
vexprs.size(); j++) {
- if (_rf_vexpr_set.find(vexprs[j]) !=
_rf_vexpr_set.end()) {
- continue;
- }
- TExprNode texpr_node;
-
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
-
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
- texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
- VExpr* new_node = _pool->add(new
VcompoundPred(texpr_node));
- new_node->add_child(last_expr);
- new_node->add_child(vexprs[j]);
- last_expr = new_node;
- _rf_vexpr_set.insert(vexprs[j]);
- }
- auto new_vconjunct_ctx_ptr = _pool->add(new
VExprContext(last_expr));
- auto expr_status =
new_vconjunct_ctx_ptr->prepare(state, row_desc());
- // If error occurs in `prepare` or `open` phase,
discard these runtime
- // filters directly.
- if (UNLIKELY(!expr_status.OK())) {
- LOG(WARNING) << "Something wrong for runtime
filters: " << expr_status;
- vexprs.clear();
- break;
- }
- expr_status = new_vconjunct_ctx_ptr->open(state);
- if (UNLIKELY(!expr_status.OK())) {
- LOG(WARNING) << "Something wrong for runtime
filters: " << expr_status;
- vexprs.clear();
- break;
- }
- if (_vconjunct_ctx_ptr) {
-
_stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
- }
- _vconjunct_ctx_ptr.reset(new VExprContext*);
- *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
+ _append_rf_into_conjuncts(state, vexprs);
_runtime_filter_ready_flag[i] = true;
}
}
@@ -495,14 +457,11 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner)
{
if (!vexprs.empty()) {
if (*scanner->vconjunct_ctx_ptr()) {
- (*scanner->vconjunct_ctx_ptr())->close(state);
- *scanner->vconjunct_ctx_ptr() = nullptr;
+ scanner->discard_conjuncts();
}
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state,
scanner->vconjunct_ctx_ptr()),
"Something wrong for runtime filters: ");
- scanner->set_use_pushdown_conjuncts(true);
}
- */
std::vector<Block*> blocks;
diff --git a/be/src/vec/exec/volap_scanner.cpp
b/be/src/vec/exec/volap_scanner.cpp
index beb04ab0b8..9218fcac91 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -367,6 +367,9 @@ Status VOlapScanner::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
+ for (auto& ctx : _stale_vexpr_ctxs) {
+ ctx->close(state);
+ }
if (_vconjunct_ctx) {
_vconjunct_ctx->close(state);
}
diff --git a/be/src/vec/exec/volap_scanner.h b/be/src/vec/exec/volap_scanner.h
index f36fd55adf..3708b5b45e 100644
--- a/be/src/vec/exec/volap_scanner.h
+++ b/be/src/vec/exec/volap_scanner.h
@@ -61,6 +61,12 @@ public:
VExprContext** vconjunct_ctx_ptr() { return &_vconjunct_ctx; }
+ void discard_conjuncts() {
+ _vconjunct_ctx->mark_as_stale();
+ _stale_vexpr_ctxs.push_back(_vconjunct_ctx);
+ _vconjunct_ctx = nullptr;
+ }
+
void mark_to_need_to_close() { _need_to_close = true; }
bool need_to_close() { return _need_to_close; }
@@ -143,6 +149,8 @@ private:
bool _need_to_close = false;
TabletSchemaSPtr _tablet_schema;
+
+ std::vector<VExprContext*> _stale_vexpr_ctxs;
};
} // namespace vectorized
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index c708efdda0..83e1443ec6 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -19,6 +19,8 @@
#include <string_view>
+#include "vec/data_types/data_type_nullable.h"
+
namespace doris::vectorized {
VBloomPredicate::VBloomPredicate(const TExprNode& node)
@@ -73,8 +75,18 @@ Status VBloomPredicate::execute(VExprContext* context,
Block* block, int* result
size_t sz = argument_column->size();
res_data_column->resize(sz);
auto ptr =
((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
- for (size_t i = 0; i < sz; i++) {
- ptr[i] = _filter->find(reinterpret_cast<const
void*>(argument_column->get_data_at(i).data));
+ if
(WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type))
+ .is_string_or_fixed_string()) {
+ for (size_t i = 0; i < sz; i++) {
+ auto ele = argument_column->get_data_at(i);
+ const StringValue v(ele.data, ele.size);
+ ptr[i] = _filter->find(reinterpret_cast<const void*>(&v));
+ }
+ } else {
+ for (size_t i = 0; i < sz; i++) {
+ ptr[i] = _filter->find(
+ reinterpret_cast<const
void*>(argument_column->get_data_at(i).data));
+ }
}
if (_data_type->is_nullable()) {
auto null_map = ColumnVector<UInt8>::create(block->rows(), 0);
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 2bd353e934..cdc3d2d5dc 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -29,14 +29,20 @@
namespace doris::vectorized {
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr*
impl)
- : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0),
_scan_rows(0) {}
+ : VExpr(node),
+ _impl(impl),
+ _always_true(false),
+ _filtered_rows(0),
+ _scan_rows(0),
+ _is_closed(false) {}
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper&
vexpr)
: VExpr(vexpr),
_impl(vexpr._impl),
_always_true(vexpr._always_true),
_filtered_rows(vexpr._filtered_rows.load()),
- _scan_rows(vexpr._scan_rows.load()) {}
+ _scan_rows(vexpr._scan_rows.load()),
+ _is_closed(false) {}
Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const
RowDescriptor& desc,
VExprContext* context) {
@@ -52,6 +58,10 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state,
VExprContext* context,
void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ if (_is_closed) {
+ return;
+ }
+ _is_closed = true;
_impl->close(state, context, scope);
}
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 91a6bdbcac..e04faba9ef 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -61,5 +61,7 @@ private:
constexpr static double EXPECTED_FILTER_RATE = 0.2;
std::string _expr_name;
+
+ bool _is_closed;
};
} // namespace doris::vectorized
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]