yiguolei commented on code in PR #10103:
URL: https://github.com/apache/incubator-doris/pull/10103#discussion_r897425517
##########
be/src/vec/exec/volap_scan_node.cpp:
##########
@@ -162,29 +163,63 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner)
{
scanner->set_opened();
}
- std::vector<ExprContext*> contexts;
+ std::vector<VExpr*> vexprs;
auto& scanner_filter_apply_marks =
*scanner->mutable_runtime_filter_marks();
DCHECK(scanner_filter_apply_marks.size() == _runtime_filter_descs.size());
for (size_t i = 0; i < scanner_filter_apply_marks.size(); i++) {
if (!scanner_filter_apply_marks[i] &&
!_runtime_filter_ctxs[i].apply_mark) {
+ /// When runtime filters are ready during running, we should use
them to filter data
+ /// in VOlapScanner.
+ /// New arrival rf will be processed as below:
+ /// 1. convert these runtime filters to vectorized expressions
+ /// 2. if this is the first scanner thread to receive this rf,
construct a new
+ /// VExprContext and update `_vconjunct_ctx_ptr` in scan node.
Notice that we use
+ /// `_runtime_filter_ready_flag` to ensure `_vconjunct_ctx_ptr`
will be updated only
+ /// once after any runtime_filters are ready.
+ /// 3. finally, just copy this new VExprContext to scanner and use
it to filter data.
IRuntimeFilter* runtime_filter = nullptr;
state->runtime_filter_mgr()->get_consume_filter(_runtime_filter_descs[i].filter_id,
&runtime_filter);
DCHECK(runtime_filter != nullptr);
bool ready = runtime_filter->is_ready();
if (ready) {
- runtime_filter->get_prepared_context(&contexts, row_desc(),
_expr_mem_tracker);
+ runtime_filter->get_prepared_vexprs(&vexprs, row_desc(),
_expr_mem_tracker);
scanner_filter_apply_marks[i] = true;
+ {
+ std::unique_lock<std::mutex> l(*(_rf_locks[i]));
+ if (!_runtime_filter_ready_flag[i]) {
+ // Use all conjuncts and new arrival runtime filters
to construct a new
+ // expression tree here.
+ auto last_expr =
+ _vconjunct_ctx_ptr ?
(*_vconjunct_ctx_ptr)->root() : vexprs[0];
+ for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j <
vexprs.size(); j++) {
+ TExprNode texpr_node;
+
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
+
texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
+ texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
+ VExpr* new_node = _pool->add(new
VcompoundPred(texpr_node));
+ new_node->add_child(last_expr);
+ new_node->add_child(vexprs[j]);
+ last_expr = new_node;
+ }
+ _vconjunct_ctx_ptr.reset(new
doris::vectorized::VExprContext*);
+ auto new_vconjunct_ctx_ptr = _pool->add(new
VExprContext(last_expr));
+ WARN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state,
row_desc(),
Review Comment:
If prepare or open failed, the _vcontjunct_ctx_ptr is reset, the result is
wrong?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]