This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8ef9212ddc [enhancement](exceptionsafe) force check exec node method's
return value (#19538)
8ef9212ddc is described below
commit 8ef9212ddc34e5d209cf0432cdce2d267d2e0e6e
Author: yiguolei <[email protected]>
AuthorDate: Fri May 12 10:21:00 2023 +0800
[enhancement](exceptionsafe) force check exec node method's return value
(#19538)
---
be/src/exec/exec_node.cpp | 2 +-
be/src/exec/exec_node.h | 30 +++++++++++++++++-------------
be/src/runtime/plan_fragment_executor.cpp | 6 +++++-
be/src/vec/exec/vaggregation_node.cpp | 3 ++-
be/src/vec/exec/vanalytic_eval_node.cpp | 3 ++-
be/src/vec/exec/vunion_node.cpp | 16 ++++++++++------
be/src/vec/exec/vunion_node.h | 2 +-
7 files changed, 38 insertions(+), 24 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index ba807bd77d..a1f29aaf8b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -166,7 +166,7 @@ Status ExecNode::reset(RuntimeState* state) {
Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
DCHECK(statistics != nullptr);
for (auto child_node : _children) {
- child_node->collect_query_statistics(statistics);
+ RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
}
return Status::OK();
}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index edbec218d7..d9a7e5afd7 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -71,7 +71,7 @@ public:
/// Initializes this object from the thrift tnode desc. The subclass should
/// do any initialization that can fail in Init() rather than the ctor.
/// If overridden in subclass, must first call superclass's Init().
- virtual Status init(const TPlanNode& tnode, RuntimeState* state);
+ [[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState*
state);
// Sets up internal structures, etc., without doing any actual work.
// Must be called prior to open(). Will only be called once in this
@@ -80,17 +80,17 @@ public:
// in prepare(). Retrieving the jit compiled function pointer must happen
in
// open().
// If overridden in subclass, must first call superclass's prepare().
- virtual Status prepare(RuntimeState* state);
+ [[nodiscard]] virtual Status prepare(RuntimeState* state);
// Performs any preparatory work prior to calling get_next().
// Can be called repeatedly (after calls to close()).
// Caller must not be holding any io buffers. This will cause deadlock.
- virtual Status open(RuntimeState* state);
+ [[nodiscard]] virtual Status open(RuntimeState* state);
// Alloc and open resource for the node
// Only pipeline operator use exec node need to impl the virtual function
// so only vectorized exec node need to impl
- virtual Status alloc_resource(RuntimeState* state);
+ [[nodiscard]] virtual Status alloc_resource(RuntimeState* state);
// Retrieves rows and returns them via row_batch. Sets eos to true
// if subsequent calls will not retrieve any more rows.
@@ -105,9 +105,9 @@ public:
// row_batch's tuple_data_pool.
// Caller must not be holding any io buffers. This will cause deadlock.
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
- virtual Status get_next(RuntimeState* state, vectorized::Block* block,
bool* eos);
+ [[nodiscard]] virtual Status get_next(RuntimeState* state,
vectorized::Block* block, bool* eos);
// new interface to compatible new optimizers in FE
- Status get_next_after_projects(
+ [[nodiscard]] Status get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*,
bool*)>& fn,
bool clear_data = true);
@@ -125,11 +125,13 @@ public:
// Emit data, both need impl with method: sink
// Eg: Aggregation, Sort, Scan
- virtual Status pull(RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
+ [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block*
output_block,
+ bool* eos) {
return get_next(state, output_block, eos);
}
- virtual Status push(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
+ [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block*
input_block,
+ bool eos) {
return Status::OK();
}
@@ -138,7 +140,8 @@ public:
// Sink Data to ExecNode to do some stock work, both need impl with
method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
- virtual Status sink(RuntimeState* state, vectorized::Block* input_block,
bool eos);
+ [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block*
input_block,
+ bool eos);
// Resets the stream of row batches to be retrieved by subsequent
GetNext() calls.
// Clears all internal state, returning this node to the state it was in
after calling
@@ -153,12 +156,12 @@ public:
// implementation calls Reset() on children.
// Note that this function may be called many times (proportional to the
input data),
// so should be fast.
- virtual Status reset(RuntimeState* state);
+ [[nodiscard]] virtual Status reset(RuntimeState* state);
// This should be called before close() and after get_next(), it is
responsible for
// collecting statistics sent with row batch, it can't be called when
prepare() returns
// error.
- virtual Status collect_query_statistics(QueryStatistics* statistics);
+ [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics);
// close() will get called for every exec node, regardless of what else is
called and
// the status of these calls (i.e. prepare() may never have been called, or
@@ -183,8 +186,9 @@ public:
// Creates exec node tree from list of nodes contained in plan via
depth-first
// traversal. All nodes are placed in pool.
// Returns error if 'plan' is corrupted, otherwise success.
- static Status create_tree(RuntimeState* state, ObjectPool* pool, const
TPlan& plan,
- const DescriptorTbl& descs, ExecNode** root);
+ [[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool*
pool,
+ const TPlan& plan, const
DescriptorTbl& descs,
+ ExecNode** root);
// Collect all nodes of given 'node_type' that are part of this subtree,
and return in
// 'nodes'.
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 0b6655193d..ddad12e176 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -371,7 +371,11 @@ Status
PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
void PlanFragmentExecutor::_collect_query_statistics() {
_query_statistics->clear();
- _plan->collect_query_statistics(_query_statistics.get());
+ Status status = _plan->collect_query_statistics(_query_statistics.get());
+ if (!status.ok()) {
+ LOG(INFO) << "collect query statistics failed, st=" << status;
+ return;
+ }
_query_statistics->add_cpu_ms(_fragment_cpu_timer->value() /
NANOS_PER_MILLIS);
if (_runtime_state->backend_id() != -1) {
_collect_node_statistics();
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 3c6dd33483..18ae7f42f2 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -502,7 +502,8 @@ Status AggregationNode::alloc_resource(doris::RuntimeState*
state) {
// because during prepare and open thread is not the same one,
// this could cause unable to get JVM
if (_probe_expr_ctxs.empty()) {
- _create_agg_status(_agg_data->without_key);
+ // _create_agg_status may acquire a lot of memory, may allocate failed
when memory is very few
+ RETURN_IF_CATCH_EXCEPTION(_create_agg_status(_agg_data->without_key));
_agg_data_created_without_key = true;
}
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 8710099039..9ee4b424e2 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -28,6 +28,7 @@
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/exception.h"
#include "common/logging.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
@@ -207,7 +208,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
}
_fn_place_ptr =
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
_align_aggregate_states);
- _create_agg_status();
+ RETURN_IF_CATCH_EXCEPTION(_create_agg_status());
_executor.insert_result =
std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this,
std::placeholders::_1);
_executor.execute =
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index a66e8bceec..d3b73a7ae1 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -189,7 +189,9 @@ Status VUnionNode::get_next_materialized(RuntimeState*
state, Block* block) {
child(_child_idx)->get_next_span(), _child_eos);
SCOPED_TIMER(_materialize_exprs_evaluate_timer);
if (child_block.rows() > 0) {
- RETURN_IF_ERROR(mblock.merge(materialize_block(&child_block,
_child_idx)));
+ Block res;
+ RETURN_IF_ERROR(materialize_block(&child_block, _child_idx, &res));
+ RETURN_IF_ERROR(mblock.merge(res));
}
// It shouldn't be the case that we reached the limit because we
shouldn't have
// incremented '_num_rows_returned' yet.
@@ -267,7 +269,9 @@ Status VUnionNode::materialize_child_block(RuntimeState*
state, int child_id,
_row_descriptor)));
if (input_block->rows() > 0) {
- RETURN_IF_ERROR(mblock.merge(materialize_block(input_block,
child_id)));
+ Block res;
+ RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
+ RETURN_IF_ERROR(mblock.merge(res));
if (!mem_reuse) {
output_block->swap(mblock.to_block());
}
@@ -341,17 +345,17 @@ void VUnionNode::debug_string(int indentation_level,
std::stringstream* out) con
*out << ")" << std::endl;
}
-Block VUnionNode::materialize_block(Block* src_block, int child_idx) {
+Status VUnionNode::materialize_block(Block* src_block, int child_idx, Block*
res_block) {
const std::vector<VExprContext*>& child_exprs =
_child_expr_lists[child_idx];
ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
int result_column_id = -1;
- auto state = child_exprs[i]->execute(src_block, &result_column_id);
- CHECK(state.ok()) << state.to_string();
+ RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
colunms.emplace_back(src_block->get_by_position(result_column_id));
}
_child_row_idx += src_block->rows();
- return {colunms};
+ *res_block = {colunms};
+ return Status::OK();
}
} // namespace vectorized
diff --git a/be/src/vec/exec/vunion_node.h b/be/src/vec/exec/vunion_node.h
index 7532c00875..79ef72106c 100644
--- a/be/src/vec/exec/vunion_node.h
+++ b/be/src/vec/exec/vunion_node.h
@@ -104,7 +104,7 @@ private:
/// Evaluates exprs for the current child and materializes the results
into 'tuple_buf',
/// which is attached to 'dst_block'. Runs until 'dst_block' is at
capacity, or all rows
/// have been consumed from the current child block. Updates
'_child_row_idx'.
- Block materialize_block(Block* dst_block, int child_idx);
+ Status materialize_block(Block* dst_block, int child_idx, Block*
res_block);
Status get_error_msg(const std::vector<VExprContext*>& exprs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]