This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 17b809210a [Bug](runtime filter) fix bug for late-arrival runtime
filters (#12049)
17b809210a is described below
commit 17b809210a1354f7218a2c211b39abde1075ec31
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 26 09:13:10 2022 +0800
[Bug](runtime filter) fix bug for late-arrival runtime filters (#12049)
---
be/src/vec/exec/volap_scan_node.cpp | 12 ++++++++----
be/src/vec/exprs/varray_literal.cpp | 2 +-
be/src/vec/exprs/vbloom_predicate.cpp | 7 +------
be/src/vec/exprs/vbloom_predicate.h | 2 --
be/src/vec/exprs/vcase_expr.cpp | 8 +-------
be/src/vec/exprs/vcase_expr.h | 1 -
be/src/vec/exprs/vcast_expr.cpp | 2 +-
be/src/vec/exprs/vectorized_fn_call.cpp | 2 +-
be/src/vec/exprs/vexpr.cpp | 15 +++++----------
be/src/vec/exprs/vexpr.h | 12 +++++++++---
be/src/vec/exprs/vexpr_context.cpp | 6 ------
be/src/vec/exprs/vexpr_context.h | 5 -----
be/src/vec/exprs/vin_predicate.cpp | 8 ++------
be/src/vec/exprs/vin_predicate.h | 3 ---
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 16 +++-------------
be/src/vec/exprs/vruntimefilter_wrapper.h | 2 --
be/src/vec/exprs/vslot_ref.cpp | 1 +
be/src/vec/exprs/vtuple_is_null_predicate.cpp | 2 +-
18 files changed, 34 insertions(+), 72 deletions(-)
diff --git a/be/src/vec/exec/volap_scan_node.cpp
b/be/src/vec/exec/volap_scan_node.cpp
index 53e8baee92..881c53742d 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -285,7 +285,6 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
_status = status;
break;
}
- (*(scanner->vconjunct_ctx_ptr()))->debug_valid();
}
}
@@ -450,7 +449,6 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
std::shared_lock<std::shared_mutex> l(_rf_lock);
WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state,
scanner->vconjunct_ctx_ptr()),
"Something wrong for runtime filters: ");
- (*(scanner->vconjunct_ctx_ptr()))->debug_valid();
}
}
@@ -1789,7 +1787,14 @@ VExpr* VOlapScanNode::_normalize_predicate(RuntimeState*
state, VExpr* conjunct_
Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state,
std::vector<VExpr*>& vexprs) {
if (!vexprs.empty()) {
- auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() :
vexprs[0];
+ VExpr* last_expr = nullptr;
+ if (_vconjunct_ctx_ptr) {
+ last_expr = (*_vconjunct_ctx_ptr)->root();
+ } else {
+ DCHECK(_rf_vexpr_set.find(vexprs[0]) == _rf_vexpr_set.end());
+ last_expr = vexprs[0];
+ _rf_vexpr_set.insert(vexprs[0]);
+ }
for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
continue;
@@ -1832,7 +1837,6 @@ Status
VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector
}
_vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
*(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
- new_vconjunct_ctx_ptr->debug_valid();
}
return Status::OK();
}
diff --git a/be/src/vec/exprs/varray_literal.cpp
b/be/src/vec/exprs/varray_literal.cpp
index d289c17988..f2a2c7fa09 100644
--- a/be/src/vec/exprs/varray_literal.cpp
+++ b/be/src/vec/exprs/varray_literal.cpp
@@ -23,7 +23,7 @@ Status VArrayLiteral::prepare(RuntimeState* state, const
RowDescriptor& row_desc
VExprContext* context) {
DCHECK_EQ(type().children.size(), 1) << "array children type not 1";
- RETURN_IF_ERROR(VExpr::prepare(state, row_desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context));
bool is_null = (_node_type == TExprNodeType::NULL_LITERAL);
Field array = is_null ? Field() : Array();
for (const auto child : _children) {
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index 1613d839e7..dc3be41027 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -29,17 +29,12 @@ VBloomPredicate::VBloomPredicate(const TExprNode& node)
Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
- if (_prepared) {
- return Status::OK();
- }
if (_children.size() != 1) {
return Status::InternalError("Invalid argument for VBloomPredicate.");
}
- _prepared = true;
-
ColumnsWithTypeAndName argument_template;
argument_template.reserve(_children.size());
for (auto child : _children) {
diff --git a/be/src/vec/exprs/vbloom_predicate.h
b/be/src/vec/exprs/vbloom_predicate.h
index 0f00d6ca55..b4e9f54a31 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -44,7 +44,5 @@ public:
private:
std::shared_ptr<IBloomFilterFuncBase> _filter;
std::string _expr_name;
-
- bool _prepared;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index 9058aa1710..d8f067f6b8 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -24,7 +24,6 @@ namespace doris::vectorized {
VCaseExpr::VCaseExpr(const TExprNode& node)
: VExpr(node),
- _is_prepare(false),
_has_case_expr(node.case_expr.has_case_expr),
_has_else_expr(node.case_expr.has_else_expr) {
if (_has_case_expr) {
@@ -37,12 +36,7 @@ VCaseExpr::VCaseExpr(const TExprNode& node)
Status VCaseExpr::prepare(doris::RuntimeState* state, const
doris::RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
-
- if (_is_prepare) {
- return Status::OK();
- }
- _is_prepare = true;
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
ColumnsWithTypeAndName argument_template;
DataTypes arguments;
diff --git a/be/src/vec/exprs/vcase_expr.h b/be/src/vec/exprs/vcase_expr.h
index a6ca9d6808..6b056f9431 100644
--- a/be/src/vec/exprs/vcase_expr.h
+++ b/be/src/vec/exprs/vcase_expr.h
@@ -41,7 +41,6 @@ public:
virtual std::string debug_string() const override;
private:
- bool _is_prepare;
bool _has_case_expr;
bool _has_else_expr;
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index e257e6fe8c..173df0365d 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -29,7 +29,7 @@ namespace doris::vectorized {
doris::Status VCastExpr::prepare(doris::RuntimeState* state, const
doris::RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
DCHECK_EQ(_children.size(), 1);
auto child = _children[0];
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp
b/be/src/vec/exprs/vectorized_fn_call.cpp
index f2fd7bcb5f..f92ec8bc3a 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -37,7 +37,7 @@ VectorizedFnCall::VectorizedFnCall(const doris::TExprNode&
node) : VExpr(node) {
doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
const doris::RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
ColumnsWithTypeAndName argument_template;
argument_template.reserve(_children.size());
std::vector<std::string_view> child_expr_name;
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 183406926f..6d0de17dac 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -46,7 +46,8 @@ using doris::TypeDescriptor;
VExpr::VExpr(const doris::TExprNode& node)
: _node_type(node.node_type),
_type(TypeDescriptor::from_thrift(node.type)),
- _fn_context_index(-1) {
+ _fn_context_index(-1),
+ _prepared(false) {
if (node.__isset.fn) {
_fn = node.fn;
}
@@ -65,10 +66,11 @@ VExpr::VExpr(const VExpr& vexpr)
_children(vexpr._children),
_fn(vexpr._fn),
_fn_context_index(vexpr._fn_context_index),
- _constant_col(vexpr._constant_col) {}
+ _constant_col(vexpr._constant_col),
+ _prepared(vexpr._prepared) {}
VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
- : _type(type), _fn_context_index(-1) {
+ : _type(type), _fn_context_index(-1), _prepared(false) {
if (is_slotref) {
_node_type = TExprNodeType::SLOT_REF;
}
@@ -363,11 +365,4 @@ void VExpr::close_function_context(VExprContext* context,
FunctionContext::Funct
}
}
-void VExpr::debug_valid(VExprContext* context) {
- DCHECK_LT(_fn_context_index, context->get_fn_context_size());
- for (int i = 0; i < _children.size(); ++i) {
- _children[i]->debug_valid(context);
- }
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b19a415368..e0e0defac4 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -32,6 +32,14 @@
namespace doris {
namespace vectorized {
+#define RETURN_IF_ERROR_OR_PREPARED(stmt) \
+ if (_prepared) { \
+ return Status::OK(); \
+ } else { \
+ _prepared = true; \
+ RETURN_IF_ERROR(stmt); \
+ }
+
class VExpr {
public:
// resize inserted param column to make sure column size equal to
block.rows()
@@ -157,9 +165,6 @@ public:
return nullptr;
}
- // Just for debug. Should be removed after fixing #11995
- void debug_valid(VExprContext* context);
-
protected:
/// Simple debug string that provides no expr subclass-specific information
std::string debug_string(const std::string& expr_name) const {
@@ -198,6 +203,7 @@ protected:
// If this expr is constant, this will store and cache the value generated
by
// get_const_col()
std::shared_ptr<ColumnPtrWrapper> _constant_col;
+ bool _prepared;
};
} // namespace vectorized
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index da0deda56d..d6a64c9d9b 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -150,10 +150,4 @@ Block VExprContext::get_output_block_after_execute_exprs(
return {result_columns};
}
-void VExprContext::debug_valid() {
-#ifndef NDEBUG
- _root->debug_valid(this);
-#endif
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 53cfbdb389..b454833ed3 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -76,11 +76,6 @@ public:
_stale = true;
}
- int get_fn_context_size() const { return _fn_contexts.size(); }
-
- // Just for debug. Should be removed after fixing #11995
- void debug_valid();
-
private:
friend class VExpr;
diff --git a/be/src/vec/exprs/vin_predicate.cpp
b/be/src/vec/exprs/vin_predicate.cpp
index fb36206a11..b0aa2c2f65 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -30,22 +30,18 @@
namespace doris::vectorized {
VInPredicate::VInPredicate(const TExprNode& node)
- : VExpr(node), _is_not_in(node.in_predicate.is_not_in),
_is_prepare(false) {}
+ : VExpr(node), _is_not_in(node.in_predicate.is_not_in) {}
Status VInPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
- if (_is_prepare) {
- return Status::OK();
- }
if (_children.size() < 1) {
return Status::InternalError("no Function operator in.");
}
_expr_name =
fmt::format("({} {} set)", _children[0]->expr_name(), _is_not_in ?
"not_in" : "in");
- _is_prepare = true;
DCHECK(_children.size() >= 1);
ColumnsWithTypeAndName argument_template;
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 6b2264d40a..556c724471 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -50,9 +50,6 @@ private:
std::string _expr_name;
const bool _is_not_in;
- bool _is_prepare;
-
-private:
static const constexpr char* function_name = "in";
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index cdc3d2d5dc..869de36a26 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -29,24 +29,18 @@
namespace doris::vectorized {
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr*
impl)
- : VExpr(node),
- _impl(impl),
- _always_true(false),
- _filtered_rows(0),
- _scan_rows(0),
- _is_closed(false) {}
+ : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0),
_scan_rows(0) {}
VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper&
vexpr)
: VExpr(vexpr),
_impl(vexpr._impl),
_always_true(vexpr._always_true),
_filtered_rows(vexpr._filtered_rows.load()),
- _scan_rows(vexpr._scan_rows.load()),
- _is_closed(false) {}
+ _scan_rows(vexpr._scan_rows.load()) {}
Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const
RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context));
_expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
return Status::OK();
}
@@ -58,10 +52,6 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state,
VExprContext* context,
void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
- if (_is_closed) {
- return;
- }
- _is_closed = true;
_impl->close(state, context, scope);
}
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index e04faba9ef..91a6bdbcac 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -61,7 +61,5 @@ private:
constexpr static double EXPECTED_FILTER_RATE = 0.2;
std::string _expr_name;
-
- bool _is_closed;
};
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 7ef39786da..f53bcf2900 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -41,6 +41,7 @@ VSlotRef::VSlotRef(const SlotDescriptor* desc)
Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor& desc,
VExprContext* context) {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
DCHECK_EQ(_children.size(), 0);
if (_slot_id == -1) {
return Status::OK();
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
index 6342dd32dc..acb41f73b0 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
@@ -36,7 +36,7 @@ VTupleIsNullPredicate::VTupleIsNullPredicate(const TExprNode&
node)
Status VTupleIsNullPredicate::prepare(RuntimeState* state, const
RowDescriptor& desc,
VExprContext* context) {
- RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
DCHECK_EQ(0, _children.size());
_column_to_check =
_is_left_null_side ? desc.num_materialized_slots() :
desc.num_materialized_slots() + 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]