This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 17b809210a [Bug](runtime filter) fix bug for late-arrival runtime 
filters (#12049)
17b809210a is described below

commit 17b809210a1354f7218a2c211b39abde1075ec31
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 26 09:13:10 2022 +0800

    [Bug](runtime filter) fix bug for late-arrival runtime filters (#12049)
---
 be/src/vec/exec/volap_scan_node.cpp           | 12 ++++++++----
 be/src/vec/exprs/varray_literal.cpp           |  2 +-
 be/src/vec/exprs/vbloom_predicate.cpp         |  7 +------
 be/src/vec/exprs/vbloom_predicate.h           |  2 --
 be/src/vec/exprs/vcase_expr.cpp               |  8 +-------
 be/src/vec/exprs/vcase_expr.h                 |  1 -
 be/src/vec/exprs/vcast_expr.cpp               |  2 +-
 be/src/vec/exprs/vectorized_fn_call.cpp       |  2 +-
 be/src/vec/exprs/vexpr.cpp                    | 15 +++++----------
 be/src/vec/exprs/vexpr.h                      | 12 +++++++++---
 be/src/vec/exprs/vexpr_context.cpp            |  6 ------
 be/src/vec/exprs/vexpr_context.h              |  5 -----
 be/src/vec/exprs/vin_predicate.cpp            |  8 ++------
 be/src/vec/exprs/vin_predicate.h              |  3 ---
 be/src/vec/exprs/vruntimefilter_wrapper.cpp   | 16 +++-------------
 be/src/vec/exprs/vruntimefilter_wrapper.h     |  2 --
 be/src/vec/exprs/vslot_ref.cpp                |  1 +
 be/src/vec/exprs/vtuple_is_null_predicate.cpp |  2 +-
 18 files changed, 34 insertions(+), 72 deletions(-)

diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 53e8baee92..881c53742d 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -285,7 +285,6 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
                 _status = status;
                 break;
             }
-            (*(scanner->vconjunct_ctx_ptr()))->debug_valid();
         }
     }
 
@@ -450,7 +449,6 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             std::shared_lock<std::shared_mutex> l(_rf_lock);
             WARN_IF_ERROR((*_vconjunct_ctx_ptr)->clone(state, 
scanner->vconjunct_ctx_ptr()),
                           "Something wrong for runtime filters: ");
-            (*(scanner->vconjunct_ctx_ptr()))->debug_valid();
         }
     }
 
@@ -1789,7 +1787,14 @@ VExpr* VOlapScanNode::_normalize_predicate(RuntimeState* 
state, VExpr* conjunct_
 
 Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, 
std::vector<VExpr*>& vexprs) {
     if (!vexprs.empty()) {
-        auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : 
vexprs[0];
+        VExpr* last_expr = nullptr;
+        if (_vconjunct_ctx_ptr) {
+            last_expr = (*_vconjunct_ctx_ptr)->root();
+        } else {
+            DCHECK(_rf_vexpr_set.find(vexprs[0]) == _rf_vexpr_set.end());
+            last_expr = vexprs[0];
+            _rf_vexpr_set.insert(vexprs[0]);
+        }
         for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
             if (_rf_vexpr_set.find(vexprs[j]) != _rf_vexpr_set.end()) {
                 continue;
@@ -1832,7 +1837,6 @@ Status 
VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector
         }
         _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
         *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
-        new_vconjunct_ctx_ptr->debug_valid();
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exprs/varray_literal.cpp 
b/be/src/vec/exprs/varray_literal.cpp
index d289c17988..f2a2c7fa09 100644
--- a/be/src/vec/exprs/varray_literal.cpp
+++ b/be/src/vec/exprs/varray_literal.cpp
@@ -23,7 +23,7 @@ Status VArrayLiteral::prepare(RuntimeState* state, const 
RowDescriptor& row_desc
                               VExprContext* context) {
     DCHECK_EQ(type().children.size(), 1) << "array children type not 1";
 
-    RETURN_IF_ERROR(VExpr::prepare(state, row_desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context));
     bool is_null = (_node_type == TExprNodeType::NULL_LITERAL);
     Field array = is_null ? Field() : Array();
     for (const auto child : _children) {
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index 1613d839e7..dc3be41027 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -29,17 +29,12 @@ VBloomPredicate::VBloomPredicate(const TExprNode& node)
 
 Status VBloomPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
                                 VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
 
-    if (_prepared) {
-        return Status::OK();
-    }
     if (_children.size() != 1) {
         return Status::InternalError("Invalid argument for VBloomPredicate.");
     }
 
-    _prepared = true;
-
     ColumnsWithTypeAndName argument_template;
     argument_template.reserve(_children.size());
     for (auto child : _children) {
diff --git a/be/src/vec/exprs/vbloom_predicate.h 
b/be/src/vec/exprs/vbloom_predicate.h
index 0f00d6ca55..b4e9f54a31 100644
--- a/be/src/vec/exprs/vbloom_predicate.h
+++ b/be/src/vec/exprs/vbloom_predicate.h
@@ -44,7 +44,5 @@ public:
 private:
     std::shared_ptr<IBloomFilterFuncBase> _filter;
     std::string _expr_name;
-
-    bool _prepared;
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index 9058aa1710..d8f067f6b8 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -24,7 +24,6 @@ namespace doris::vectorized {
 
 VCaseExpr::VCaseExpr(const TExprNode& node)
         : VExpr(node),
-          _is_prepare(false),
           _has_case_expr(node.case_expr.has_case_expr),
           _has_else_expr(node.case_expr.has_else_expr) {
     if (_has_case_expr) {
@@ -37,12 +36,7 @@ VCaseExpr::VCaseExpr(const TExprNode& node)
 
 Status VCaseExpr::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor& desc,
                           VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
-
-    if (_is_prepare) {
-        return Status::OK();
-    }
-    _is_prepare = true;
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
 
     ColumnsWithTypeAndName argument_template;
     DataTypes arguments;
diff --git a/be/src/vec/exprs/vcase_expr.h b/be/src/vec/exprs/vcase_expr.h
index a6ca9d6808..6b056f9431 100644
--- a/be/src/vec/exprs/vcase_expr.h
+++ b/be/src/vec/exprs/vcase_expr.h
@@ -41,7 +41,6 @@ public:
     virtual std::string debug_string() const override;
 
 private:
-    bool _is_prepare;
     bool _has_case_expr;
     bool _has_else_expr;
 
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index e257e6fe8c..173df0365d 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -29,7 +29,7 @@ namespace doris::vectorized {
 
 doris::Status VCastExpr::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor& desc,
                                  VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
 
     DCHECK_EQ(_children.size(), 1);
     auto child = _children[0];
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index f2fd7bcb5f..f92ec8bc3a 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -37,7 +37,7 @@ VectorizedFnCall::VectorizedFnCall(const doris::TExprNode& 
node) : VExpr(node) {
 
 doris::Status VectorizedFnCall::prepare(doris::RuntimeState* state,
                                         const doris::RowDescriptor& desc, 
VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
     ColumnsWithTypeAndName argument_template;
     argument_template.reserve(_children.size());
     std::vector<std::string_view> child_expr_name;
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 183406926f..6d0de17dac 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -46,7 +46,8 @@ using doris::TypeDescriptor;
 VExpr::VExpr(const doris::TExprNode& node)
         : _node_type(node.node_type),
           _type(TypeDescriptor::from_thrift(node.type)),
-          _fn_context_index(-1) {
+          _fn_context_index(-1),
+          _prepared(false) {
     if (node.__isset.fn) {
         _fn = node.fn;
     }
@@ -65,10 +66,11 @@ VExpr::VExpr(const VExpr& vexpr)
           _children(vexpr._children),
           _fn(vexpr._fn),
           _fn_context_index(vexpr._fn_context_index),
-          _constant_col(vexpr._constant_col) {}
+          _constant_col(vexpr._constant_col),
+          _prepared(vexpr._prepared) {}
 
 VExpr::VExpr(const TypeDescriptor& type, bool is_slotref, bool is_nullable)
-        : _type(type), _fn_context_index(-1) {
+        : _type(type), _fn_context_index(-1), _prepared(false) {
     if (is_slotref) {
         _node_type = TExprNodeType::SLOT_REF;
     }
@@ -363,11 +365,4 @@ void VExpr::close_function_context(VExprContext* context, 
FunctionContext::Funct
     }
 }
 
-void VExpr::debug_valid(VExprContext* context) {
-    DCHECK_LT(_fn_context_index, context->get_fn_context_size());
-    for (int i = 0; i < _children.size(); ++i) {
-        _children[i]->debug_valid(context);
-    }
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b19a415368..e0e0defac4 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -32,6 +32,14 @@
 namespace doris {
 namespace vectorized {
 
+#define RETURN_IF_ERROR_OR_PREPARED(stmt) \
+    if (_prepared) {                      \
+        return Status::OK();              \
+    } else {                              \
+        _prepared = true;                 \
+        RETURN_IF_ERROR(stmt);            \
+    }
+
 class VExpr {
 public:
     // resize inserted param column to make sure column size equal to 
block.rows()
@@ -157,9 +165,6 @@ public:
         return nullptr;
     }
 
-    // Just for debug. Should be removed after fixing #11995
-    void debug_valid(VExprContext* context);
-
 protected:
     /// Simple debug string that provides no expr subclass-specific information
     std::string debug_string(const std::string& expr_name) const {
@@ -198,6 +203,7 @@ protected:
     // If this expr is constant, this will store and cache the value generated 
by
     // get_const_col()
     std::shared_ptr<ColumnPtrWrapper> _constant_col;
+    bool _prepared;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index da0deda56d..d6a64c9d9b 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -150,10 +150,4 @@ Block VExprContext::get_output_block_after_execute_exprs(
     return {result_columns};
 }
 
-void VExprContext::debug_valid() {
-#ifndef NDEBUG
-    _root->debug_valid(this);
-#endif
-}
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 53cfbdb389..b454833ed3 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -76,11 +76,6 @@ public:
         _stale = true;
     }
 
-    int get_fn_context_size() const { return _fn_contexts.size(); }
-
-    // Just for debug. Should be removed after fixing #11995
-    void debug_valid();
-
 private:
     friend class VExpr;
 
diff --git a/be/src/vec/exprs/vin_predicate.cpp 
b/be/src/vec/exprs/vin_predicate.cpp
index fb36206a11..b0aa2c2f65 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -30,22 +30,18 @@
 namespace doris::vectorized {
 
 VInPredicate::VInPredicate(const TExprNode& node)
-        : VExpr(node), _is_not_in(node.in_predicate.is_not_in), 
_is_prepare(false) {}
+        : VExpr(node), _is_not_in(node.in_predicate.is_not_in) {}
 
 Status VInPredicate::prepare(RuntimeState* state, const RowDescriptor& desc,
                              VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
 
-    if (_is_prepare) {
-        return Status::OK();
-    }
     if (_children.size() < 1) {
         return Status::InternalError("no Function operator in.");
     }
 
     _expr_name =
             fmt::format("({} {} set)", _children[0]->expr_name(), _is_not_in ? 
"not_in" : "in");
-    _is_prepare = true;
 
     DCHECK(_children.size() >= 1);
     ColumnsWithTypeAndName argument_template;
diff --git a/be/src/vec/exprs/vin_predicate.h b/be/src/vec/exprs/vin_predicate.h
index 6b2264d40a..556c724471 100644
--- a/be/src/vec/exprs/vin_predicate.h
+++ b/be/src/vec/exprs/vin_predicate.h
@@ -50,9 +50,6 @@ private:
     std::string _expr_name;
 
     const bool _is_not_in;
-    bool _is_prepare;
-
-private:
     static const constexpr char* function_name = "in";
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index cdc3d2d5dc..869de36a26 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -29,24 +29,18 @@
 namespace doris::vectorized {
 
 VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExpr* 
impl)
-        : VExpr(node),
-          _impl(impl),
-          _always_true(false),
-          _filtered_rows(0),
-          _scan_rows(0),
-          _is_closed(false) {}
+        : VExpr(node), _impl(impl), _always_true(false), _filtered_rows(0), 
_scan_rows(0) {}
 
 VRuntimeFilterWrapper::VRuntimeFilterWrapper(const VRuntimeFilterWrapper& 
vexpr)
         : VExpr(vexpr),
           _impl(vexpr._impl),
           _always_true(vexpr._always_true),
           _filtered_rows(vexpr._filtered_rows.load()),
-          _scan_rows(vexpr._scan_rows.load()),
-          _is_closed(false) {}
+          _scan_rows(vexpr._scan_rows.load()) {}
 
 Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                                       VExprContext* context) {
-    RETURN_IF_ERROR(_impl->prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context));
     _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
     return Status::OK();
 }
@@ -58,10 +52,6 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, 
VExprContext* context,
 
 void VRuntimeFilterWrapper::close(RuntimeState* state, VExprContext* context,
                                   FunctionContext::FunctionStateScope scope) {
-    if (_is_closed) {
-        return;
-    }
-    _is_closed = true;
     _impl->close(state, context, scope);
 }
 
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h 
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index e04faba9ef..91a6bdbcac 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -61,7 +61,5 @@ private:
     constexpr static double EXPECTED_FILTER_RATE = 0.2;
 
     std::string _expr_name;
-
-    bool _is_closed;
 };
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 7ef39786da..f53bcf2900 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -41,6 +41,7 @@ VSlotRef::VSlotRef(const SlotDescriptor* desc)
 
 Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor& desc,
                          VExprContext* context) {
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
     DCHECK_EQ(_children.size(), 0);
     if (_slot_id == -1) {
         return Status::OK();
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.cpp 
b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
index 6342dd32dc..acb41f73b0 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
@@ -36,7 +36,7 @@ VTupleIsNullPredicate::VTupleIsNullPredicate(const TExprNode& 
node)
 
 Status VTupleIsNullPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
                                       VExprContext* context) {
-    RETURN_IF_ERROR(VExpr::prepare(state, desc, context));
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
     DCHECK_EQ(0, _children.size());
     _column_to_check =
             _is_left_null_side ? desc.num_materialized_slots() : 
desc.num_materialized_slots() + 1;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to