This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ef9212ddc [enhancement](exceptionsafe) force check exec node method's 
return value  (#19538)
8ef9212ddc is described below

commit 8ef9212ddc34e5d209cf0432cdce2d267d2e0e6e
Author: yiguolei <[email protected]>
AuthorDate: Fri May 12 10:21:00 2023 +0800

    [enhancement](exceptionsafe) force check exec node method's return value  
(#19538)
---
 be/src/exec/exec_node.cpp                 |  2 +-
 be/src/exec/exec_node.h                   | 30 +++++++++++++++++-------------
 be/src/runtime/plan_fragment_executor.cpp |  6 +++++-
 be/src/vec/exec/vaggregation_node.cpp     |  3 ++-
 be/src/vec/exec/vanalytic_eval_node.cpp   |  3 ++-
 be/src/vec/exec/vunion_node.cpp           | 16 ++++++++++------
 be/src/vec/exec/vunion_node.h             |  2 +-
 7 files changed, 38 insertions(+), 24 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ba807bd77d..a1f29aaf8b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -166,7 +166,7 @@ Status ExecNode::reset(RuntimeState* state) {
 Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
     DCHECK(statistics != nullptr);
     for (auto child_node : _children) {
-        child_node->collect_query_statistics(statistics);
+        RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
     }
     return Status::OK();
 }
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index edbec218d7..d9a7e5afd7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -71,7 +71,7 @@ public:
     /// Initializes this object from the thrift tnode desc. The subclass should
     /// do any initialization that can fail in Init() rather than the ctor.
     /// If overridden in subclass, must first call superclass's Init().
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state);
+    [[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState* 
state);
 
     // Sets up internal structures, etc., without doing any actual work.
     // Must be called prior to open(). Will only be called once in this
@@ -80,17 +80,17 @@ public:
     // in prepare().  Retrieving the jit compiled function pointer must happen 
in
     // open().
     // If overridden in subclass, must first call superclass's prepare().
-    virtual Status prepare(RuntimeState* state);
+    [[nodiscard]] virtual Status prepare(RuntimeState* state);
 
     // Performs any preparatory work prior to calling get_next().
     // Can be called repeatedly (after calls to close()).
     // Caller must not be holding any io buffers. This will cause deadlock.
-    virtual Status open(RuntimeState* state);
+    [[nodiscard]] virtual Status open(RuntimeState* state);
 
     // Alloc and open resource for the node
     // Only pipeline operator use exec node need to impl the virtual function
     // so only vectorized exec node need to impl
-    virtual Status alloc_resource(RuntimeState* state);
+    [[nodiscard]] virtual Status alloc_resource(RuntimeState* state);
 
     // Retrieves rows and returns them via row_batch. Sets eos to true
     // if subsequent calls will not retrieve any more rows.
@@ -105,9 +105,9 @@ public:
     // row_batch's tuple_data_pool.
     // Caller must not be holding any io buffers. This will cause deadlock.
     // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
-    virtual Status get_next(RuntimeState* state, vectorized::Block* block, 
bool* eos);
+    [[nodiscard]] virtual Status get_next(RuntimeState* state, 
vectorized::Block* block, bool* eos);
     // new interface to compatible new optimizers in FE
-    Status get_next_after_projects(
+    [[nodiscard]] Status get_next_after_projects(
             RuntimeState* state, vectorized::Block* block, bool* eos,
             const std::function<Status(RuntimeState*, vectorized::Block*, 
bool*)>& fn,
             bool clear_data = true);
@@ -125,11 +125,13 @@ public:
 
     // Emit data, both need impl with method: sink
     // Eg: Aggregation, Sort, Scan
-    virtual Status pull(RuntimeState* state, vectorized::Block* output_block, 
bool* eos) {
+    [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* 
output_block,
+                                      bool* eos) {
         return get_next(state, output_block, eos);
     }
 
-    virtual Status push(RuntimeState* state, vectorized::Block* input_block, 
bool eos) {
+    [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* 
input_block,
+                                      bool eos) {
         return Status::OK();
     }
 
@@ -138,7 +140,8 @@ public:
     // Sink Data to ExecNode to do some stock work, both need impl with 
method: get_result
     // `eos` means source is exhausted, exec node should do some finalize work
     // Eg: Aggregation, Sort
-    virtual Status sink(RuntimeState* state, vectorized::Block* input_block, 
bool eos);
+    [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* 
input_block,
+                                      bool eos);
 
     // Resets the stream of row batches to be retrieved by subsequent 
GetNext() calls.
     // Clears all internal state, returning this node to the state it was in 
after calling
@@ -153,12 +156,12 @@ public:
     // implementation calls Reset() on children.
     // Note that this function may be called many times (proportional to the 
input data),
     // so should be fast.
-    virtual Status reset(RuntimeState* state);
+    [[nodiscard]] virtual Status reset(RuntimeState* state);
 
     // This should be called before close() and after get_next(), it is 
responsible for
     // collecting statistics sent with row batch, it can't be called when 
prepare() returns
     // error.
-    virtual Status collect_query_statistics(QueryStatistics* statistics);
+    [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* 
statistics);
 
     // close() will get called for every exec node, regardless of what else is 
called and
     // the status of these calls (i.e. prepare() may never have been called, or
@@ -183,8 +186,9 @@ public:
     // Creates exec node tree from list of nodes contained in plan via 
depth-first
     // traversal. All nodes are placed in pool.
     // Returns error if 'plan' is corrupted, otherwise success.
-    static Status create_tree(RuntimeState* state, ObjectPool* pool, const 
TPlan& plan,
-                              const DescriptorTbl& descs, ExecNode** root);
+    [[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool* 
pool,
+                                            const TPlan& plan, const 
DescriptorTbl& descs,
+                                            ExecNode** root);
 
     // Collect all nodes of given 'node_type' that are part of this subtree, 
and return in
     // 'nodes'.
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 0b6655193d..ddad12e176 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -371,7 +371,11 @@ Status 
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
 
 void PlanFragmentExecutor::_collect_query_statistics() {
     _query_statistics->clear();
-    _plan->collect_query_statistics(_query_statistics.get());
+    Status status = _plan->collect_query_statistics(_query_statistics.get());
+    if (!status.ok()) {
+        LOG(INFO) << "collect query statistics failed, st=" << status;
+        return;
+    }
     _query_statistics->add_cpu_ms(_fragment_cpu_timer->value() / 
NANOS_PER_MILLIS);
     if (_runtime_state->backend_id() != -1) {
         _collect_node_statistics();
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 3c6dd33483..18ae7f42f2 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -502,7 +502,8 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* 
state) {
     // because during prepare and open thread is not the same one,
     // this could cause unable to get JVM
     if (_probe_expr_ctxs.empty()) {
-        _create_agg_status(_agg_data->without_key);
+        // _create_agg_status may acquire a lot of memory, may allocate failed 
when memory is very few
+        RETURN_IF_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
         _agg_data_created_without_key = true;
     }
 
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 8710099039..9ee4b424e2 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -28,6 +28,7 @@
 
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/exception.h"
 #include "common/logging.h"
 #include "runtime/descriptors.h"
 #include "runtime/memory/mem_tracker.h"
@@ -207,7 +208,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
     }
     _fn_place_ptr = 
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
                                                    _align_aggregate_states);
-    _create_agg_status();
+    RETURN_IF_CATCH_EXCEPTION(_create_agg_status());
     _executor.insert_result =
             std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this, 
std::placeholders::_1);
     _executor.execute =
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index a66e8bceec..d3b73a7ae1 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -189,7 +189,9 @@ Status VUnionNode::get_next_materialized(RuntimeState* 
state, Block* block) {
                 child(_child_idx)->get_next_span(), _child_eos);
         SCOPED_TIMER(_materialize_exprs_evaluate_timer);
         if (child_block.rows() > 0) {
-            RETURN_IF_ERROR(mblock.merge(materialize_block(&child_block, 
_child_idx)));
+            Block res;
+            RETURN_IF_ERROR(materialize_block(&child_block, _child_idx, &res));
+            RETURN_IF_ERROR(mblock.merge(res));
         }
         // It shouldn't be the case that we reached the limit because we 
shouldn't have
         // incremented '_num_rows_returned' yet.
@@ -267,7 +269,9 @@ Status VUnionNode::materialize_child_block(RuntimeState* 
state, int child_id,
                                 _row_descriptor)));
 
     if (input_block->rows() > 0) {
-        RETURN_IF_ERROR(mblock.merge(materialize_block(input_block, 
child_id)));
+        Block res;
+        RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
+        RETURN_IF_ERROR(mblock.merge(res));
         if (!mem_reuse) {
             output_block->swap(mblock.to_block());
         }
@@ -341,17 +345,17 @@ void VUnionNode::debug_string(int indentation_level, 
std::stringstream* out) con
     *out << ")" << std::endl;
 }
 
-Block VUnionNode::materialize_block(Block* src_block, int child_idx) {
+Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block* 
res_block) {
     const std::vector<VExprContext*>& child_exprs = 
_child_expr_lists[child_idx];
     ColumnsWithTypeAndName colunms;
     for (size_t i = 0; i < child_exprs.size(); ++i) {
         int result_column_id = -1;
-        auto state = child_exprs[i]->execute(src_block, &result_column_id);
-        CHECK(state.ok()) << state.to_string();
+        RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
         colunms.emplace_back(src_block->get_by_position(result_column_id));
     }
     _child_row_idx += src_block->rows();
-    return {colunms};
+    *res_block = {colunms};
+    return Status::OK();
 }
 
 } // namespace vectorized
diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h
index 7532c00875..79ef72106c 100644
--- a/be/src/vec/exec/vunion_node.h
+++ b/be/src/vec/exec/vunion_node.h
@@ -104,7 +104,7 @@ private:
     /// Evaluates exprs for the current child and materializes the results 
into 'tuple_buf',
     /// which is attached to 'dst_block'. Runs until 'dst_block' is at 
capacity, or all rows
     /// have been consumed from the current child block. Updates 
'_child_row_idx'.
-    Block materialize_block(Block* dst_block, int child_idx);
+    Status materialize_block(Block* dst_block, int child_idx, Block* 
res_block);
 
     Status get_error_msg(const std::vector<VExprContext*>& exprs);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to