This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bf741a2b671 [Fix](Status) Handle returned Status correctly #31434
bf741a2b671 is described below
commit bf741a2b6717303c41e5ebe3c53c1f2279492f91
Author: Uniqueyou <[email protected]>
AuthorDate: Thu Feb 29 23:47:33 2024 +0800
[Fix](Status) Handle returned Status correctly #31434
---
be/src/agent/task_worker_pool.cpp | 2 +-
be/src/exec/odbc_connector.cpp | 2 +-
be/src/exprs/runtime_filter.cpp | 14 ++++++++------
be/src/exprs/runtime_filter.h | 2 +-
be/src/exprs/runtime_filter_slots.h | 2 +-
be/src/http/action/restore_tablet_action.cpp | 3 +--
be/src/http/action/tablet_migration_action.cpp | 8 ++++----
be/src/index-tools/index_tool.cpp | 7 ++++++-
be/src/olap/tablet_manager.cpp | 3 +--
be/src/olap/tablet_manager.h | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 7 +++----
be/src/pipeline/exec/analytic_source_operator.h | 2 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/result_sink_operator.cpp | 4 ++--
be/src/pipeline/pipeline_x/operator.cpp | 2 +-
be/src/runtime/result_buffer_mgr.cpp | 5 ++---
be/src/runtime/result_buffer_mgr.h | 2 +-
be/src/vec/exec/vanalytic_eval_node.cpp | 15 +++++++--------
be/src/vec/exec/vanalytic_eval_node.h | 2 +-
be/src/vec/exec/vsort_node.cpp | 2 +-
be/src/vec/exec/vunion_node.cpp | 2 +-
be/src/vec/functions/function_rpc.cpp | 10 ++++++----
be/src/vec/functions/function_rpc.h | 6 +++---
be/src/vec/olap/vertical_merge_iterator.cpp | 4 ++--
be/src/vec/olap/vgeneric_iterators.cpp | 4 ++--
be/src/vec/sink/async_writer_sink.h | 2 +-
be/src/vec/sink/autoinc_buffer.cpp | 10 ++++++----
be/src/vec/sink/autoinc_buffer.h | 2 +-
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
be/src/vec/sink/vresult_file_sink.cpp | 6 +++---
be/src/vec/sink/vresult_sink.cpp | 6 +++---
be/src/vec/sink/writer/async_result_writer.cpp | 8 +++++---
be/src/vec/sink/writer/async_result_writer.h | 2 +-
be/src/vec/sink/writer/vfile_result_writer.cpp | 5 ++---
be/src/vec/sink/writer/vfile_result_writer.h | 2 +-
35 files changed, 84 insertions(+), 77 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index b47f79734e9..c4fc258552b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -997,7 +997,7 @@ void report_tablet_callback(StorageEngine& engine, const
TMasterInfo& master_inf
request.__isset.tablets = true;
uint64_t report_version = s_report_version;
-
static_cast<void>(engine.tablet_manager()->build_all_report_tablets_info(&request.tablets));
+ engine.tablet_manager()->build_all_report_tablets_info(&request.tablets);
if (report_version < s_report_version) {
// TODO llj This can only reduce the possibility for report error, but
can't avoid it.
// If FE create a tablet in FE meta and send CREATE task to this BE,
the tablet may not be included in this
diff --git a/be/src/exec/odbc_connector.cpp b/be/src/exec/odbc_connector.cpp
index e25d6ff62f2..0a0f240cec7 100644
--- a/be/src/exec/odbc_connector.cpp
+++ b/be/src/exec/odbc_connector.cpp
@@ -151,7 +151,7 @@ Status ODBCConnector::open(RuntimeState* state, bool read) {
LOG(INFO) << "connect success:" << _connect_string.substr(0,
_connect_string.find("Pwd="));
_is_open = true;
- static_cast<void>(begin_trans());
+ RETURN_IF_ERROR(begin_trans());
return Status::OK();
}
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 786876cc796..24c41613be4 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -331,7 +331,7 @@ public:
return Status::OK();
}
- void change_to_bloom_filter(bool need_init_bf = false) {
+ Status change_to_bloom_filter(bool need_init_bf = false) {
CHECK(_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER)
<< "Can not change to bloom filter because of runtime filter
type is "
<< IRuntimeFilter::to_string(_filter_type);
@@ -339,11 +339,12 @@ public:
BloomFilterFuncBase* bf = _context.bloom_filter_func.get();
if (need_init_bf) {
// BloomFilter may be not init
- static_cast<void>(bf->init_with_fixed_length());
+ RETURN_IF_ERROR(bf->init_with_fixed_length());
insert_to_bloom_filter(bf);
}
// release in filter
_context.hybrid_set.reset();
+ return Status::OK();
}
Status init_bloom_filter(const size_t build_bf_cardinality) {
@@ -508,12 +509,12 @@ public:
VLOG_DEBUG << " change runtime filter to bloom
filter(id=" << _filter_id
<< ") because: in_num(" <<
_context.hybrid_set->size()
<< ") >= max_in_num(" << _max_in_num << ")";
- change_to_bloom_filter(true);
+ RETURN_IF_ERROR(change_to_bloom_filter(true));
}
} else {
VLOG_DEBUG << " change runtime filter to bloom filter(id="
<< _filter_id
<< ") because: already exist a bloom filter";
- change_to_bloom_filter();
+ RETURN_IF_ERROR(change_to_bloom_filter());
RETURN_IF_ERROR(_context.bloom_filter_func->merge(
wrapper->_context.bloom_filter_func.get()));
}
@@ -1307,8 +1308,9 @@ Status IRuntimeFilter::create_wrapper(const
UpdateRuntimeFilterParamsV2* param,
}
}
-void IRuntimeFilter::change_to_bloom_filter() {
- _wrapper->change_to_bloom_filter();
+Status IRuntimeFilter::change_to_bloom_filter() {
+ RETURN_IF_ERROR(_wrapper->change_to_bloom_filter());
+ return Status::OK();
}
Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 74b2580a4e6..91456cccced 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -278,7 +278,7 @@ public:
static Status create_wrapper(const UpdateRuntimeFilterParamsV2* param,
RuntimePredicateWrapper** wrapper);
- void change_to_bloom_filter();
+ Status change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Status update_filter(const UpdateRuntimeFilterParams* param);
void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t
merge_time,
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index c9c1a996064..1fadf81e809 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -105,7 +105,7 @@ public:
if (over_max_in_num &&
runtime_filter->type() ==
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- runtime_filter->change_to_bloom_filter();
+ RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
}
if (runtime_filter->is_bloomfilter()) {
diff --git a/be/src/http/action/restore_tablet_action.cpp
b/be/src/http/action/restore_tablet_action.cpp
index 9b2428befaa..0413a94dd66 100644
--- a/be/src/http/action/restore_tablet_action.cpp
+++ b/be/src/http/action/restore_tablet_action.cpp
@@ -183,8 +183,7 @@ Status RestoreTabletAction::_restore(const std::string&
key, int64_t tablet_id,
Status s = _create_hard_link_recursive(latest_tablet_path,
restore_schema_hash_path);
if (!s.ok()) {
// do not check the status of delete_directory, return status of link
operation
- static_cast<void>(
-
io::global_local_filesystem()->delete_directory(restore_schema_hash_path));
+
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(restore_schema_hash_path));
return s;
}
std::string restore_shard_path =
store->get_absolute_shard_path(tablet_meta.shard_id());
diff --git a/be/src/http/action/tablet_migration_action.cpp
b/be/src/http/action/tablet_migration_action.cpp
index b24aebdfffd..3e44d985881 100644
--- a/be/src/http/action/tablet_migration_action.cpp
+++ b/be/src/http/action/tablet_migration_action.cpp
@@ -40,10 +40,10 @@ const static std::string HEADER_JSON = "application/json";
void TabletMigrationAction::_init_migration_action() {
int32_t max_thread_num = config::max_tablet_migration_threads;
int32_t min_thread_num = config::min_tablet_migration_threads;
- static_cast<void>(ThreadPoolBuilder("MigrationTaskThreadPool")
- .set_min_threads(min_thread_num)
- .set_max_threads(max_thread_num)
- .build(&_migration_thread_pool));
+ THROW_IF_ERROR(ThreadPoolBuilder("MigrationTaskThreadPool")
+ .set_min_threads(min_thread_num)
+ .set_max_threads(max_thread_num)
+ .build(&_migration_thread_pool));
}
void TabletMigrationAction::handle(HttpRequest* req) {
diff --git a/be/src/index-tools/index_tool.cpp
b/be/src/index-tools/index_tool.cpp
index ade72ae6809..91beb905ae2 100644
--- a/be/src/index-tools/index_tool.cpp
+++ b/be/src/index-tools/index_tool.cpp
@@ -265,7 +265,12 @@ int main(int argc, char** argv) {
std::vector<FileInfo> files;
bool exists = false;
std::filesystem::path root_dir(FLAGS_directory);
- static_cast<void>(fs->list(root_dir, true, &files, &exists));
+ doris::Status status = fs->list(root_dir, true, &files,
&exists);
+ if (!status.ok) {
+ std::cerr << "can't search from directory's all files,err
: " << status
+ << std::endl;
+ return -1;
+ }
if (!exists) {
std::cerr << FLAGS_directory << " is not exists" <<
std::endl;
return -1;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index df9a45b03c8..248acf2270d 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -993,7 +993,7 @@ Status TabletManager::report_tablet_info(TTabletInfo*
tablet_info) {
return res;
}
-Status TabletManager::build_all_report_tablets_info(std::map<TTabletId,
TTablet>* tablets_info) {
+void TabletManager::build_all_report_tablets_info(std::map<TTabletId,
TTablet>* tablets_info) {
DCHECK(tablets_info != nullptr);
VLOG_NOTICE << "begin to build all report tablets info";
@@ -1032,7 +1032,6 @@ Status
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
tablet_version_num_hist);
LOG(INFO) << "success to build all report tablets info. tablet_count=" <<
tablets_info->size();
- return Status::OK();
}
Status TabletManager::start_trash_sweep() {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 04f79b4f0f0..178ba50d9eb 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -132,7 +132,7 @@ public:
// Status::Error<INVALID_ARGUMENT>(), if tables is null
Status report_tablet_info(TTabletInfo* tablet_info);
- Status build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablets_info);
+ void build_all_report_tablets_info(std::map<TTabletId, TTablet>*
tablets_info);
Status start_trash_sweep();
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index 2f0f827d088..cb98b2e6466 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -240,13 +240,12 @@ Status AnalyticLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
return Status::OK();
}
-Status AnalyticLocalState::_reset_agg_status() {
+void AnalyticLocalState::_reset_agg_status() {
for (size_t i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i]->reset(
_fn_place_ptr +
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states[i]);
}
- return Status::OK();
}
Status AnalyticLocalState::_create_agg_status() {
@@ -359,7 +358,7 @@ Status AnalyticLocalState::_get_next_for_rows(size_t
current_block_rows) {
range_end = _shared_state->current_row_position +
1; //going on calculate,add up data, no need to reset
state
} else {
- static_cast<void>(_reset_agg_status());
+ _reset_agg_status();
if (!_parent->cast<AnalyticSourceOperatorX>()
._window.__isset
.window_start) { //[preceding, offset]
--unbound: [preceding, following]
@@ -437,7 +436,7 @@ bool
AnalyticLocalState::init_next_partition(vectorized::BlockRowPos found_parti
_partition_by_start = _shared_state->partition_by_end;
_shared_state->partition_by_end = found_partition_end;
_shared_state->current_row_position = _partition_by_start.pos;
- static_cast<void>(_reset_agg_status());
+ _reset_agg_status();
return true;
}
return false;
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index b2ab5b24b3c..eeb790ebf94 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -88,7 +88,7 @@ private:
bool need_check_first =
false);
bool _whether_need_next_partition(vectorized::BlockRowPos&
found_partition_end);
- Status _reset_agg_status();
+ void _reset_agg_status();
Status _create_agg_status();
Status _destroy_agg_status();
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 88646ba4db3..4ed414a0774 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -187,9 +187,9 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
_sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
static_cast<void>(_sender->close(final_status));
}
- static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
+ state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id()));
+ state->fragment_instance_id());
} else {
if (final_status.ok()) {
bool all_receiver_eof = true;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 11a208e1392..ddaf1cd4a2b 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -182,9 +182,9 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
}
static_cast<void>(_sender->close(final_status));
}
- static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
+ state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id()));
+ state->fragment_instance_id());
RETURN_IF_ERROR(Base::close(state, exec_status));
return final_status;
}
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 30ff16dde80..a29516a29af 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -490,7 +490,7 @@ template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
- _writer->start_writer(state, _profile);
+ RETURN_IF_ERROR(_writer->start_writer(state, _profile));
return Status::OK();
}
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index a2009c5ec3c..04831fcefb2 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -92,7 +92,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
// details see issue https://github.com/apache/doris/issues/16203
// add extra 5s for avoid corner case
int64_t max_timeout = time(nullptr) + exec_timout + 5;
- static_cast<void>(cancel_at_time(max_timeout, query_id));
+ cancel_at_time(max_timeout, query_id);
}
*sender = control_block;
return Status::OK();
@@ -173,7 +173,7 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) {
return Status::OK();
}
-Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId&
query_id) {
+void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId&
query_id) {
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator iter = _timeout_map.find(cancel_time);
@@ -184,7 +184,6 @@ Status ResultBufferMgr::cancel_at_time(time_t cancel_time,
const TUniqueId& quer
}
iter->second.push_back(query_id);
- return Status::OK();
}
void ResultBufferMgr::cancel_thread() {
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index 7995496cbf9..06d13104205 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -74,7 +74,7 @@ public:
Status cancel(const TUniqueId& fragment_id);
// cancel one query at a future time.
- Status cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
+ void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
private:
using BufferMap = std::unordered_map<TUniqueId,
std::shared_ptr<BufferControlBlock>>;
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index 4491d291e2d..fbd49aa145a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -203,7 +203,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
}
_fn_place_ptr =
_agg_arena_pool->aligned_alloc(_total_size_of_aggregate_states,
_align_aggregate_states);
- RETURN_IF_CATCH_EXCEPTION(static_cast<void>(_create_agg_status()));
+ RETURN_IF_ERROR(_create_agg_status());
_executor.insert_result =
std::bind<void>(&VAnalyticEvalNode::_insert_result_info, this,
std::placeholders::_1);
_executor.execute =
@@ -211,7 +211,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
std::placeholders::_2, std::placeholders::_3,
std::placeholders::_4);
for (const auto& ctx : _agg_expr_ctxs) {
- static_cast<void>(VExpr::prepare(ctx, state, child(0)->row_desc()));
+ RETURN_IF_ERROR(VExpr::prepare(ctx, state, child(0)->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() ||
!_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
@@ -274,9 +274,9 @@ Status VAnalyticEvalNode::pull(doris::RuntimeState*
/*state*/, vectorized::Block
return Status::OK();
}
_next_partition = _init_next_partition(_found_partition_end);
- static_cast<void>(_init_result_columns());
+ RETURN_IF_ERROR(_init_result_columns());
size_t current_block_rows = _input_blocks[_output_block_index].rows();
- static_cast<void>(_executor.get_next(current_block_rows));
+ RETURN_IF_ERROR(_executor.get_next(current_block_rows));
if (_window_end_position == current_block_rows) {
break;
}
@@ -373,7 +373,7 @@ Status VAnalyticEvalNode::_get_next_for_rows(size_t
current_block_rows) {
range_end = _current_row_position +
1; //going on calculate,add up data, no need to reset
state
} else {
- static_cast<void>(_reset_agg_status());
+ _reset_agg_status();
if (!_window.__isset
.window_start) { //[preceding, offset]
--unbound: [preceding, following]
range_start = _partition_by_start.pos;
@@ -610,7 +610,7 @@ bool VAnalyticEvalNode::_init_next_partition(BlockRowPos
found_partition_end) {
_partition_by_start = _partition_by_end;
_partition_by_end = found_partition_end;
_current_row_position = _partition_by_start.pos;
- static_cast<void>(_reset_agg_status());
+ _reset_agg_status();
return true;
}
return false;
@@ -730,11 +730,10 @@ Status VAnalyticEvalNode::_init_result_columns() {
return Status::OK();
}
-Status VAnalyticEvalNode::_reset_agg_status() {
+void VAnalyticEvalNode::_reset_agg_status() {
for (size_t i = 0; i < _agg_functions_size; ++i) {
_agg_functions[i]->reset(_fn_place_ptr +
_offsets_of_aggregate_states[i]);
}
- return Status::OK();
}
Status VAnalyticEvalNode::_create_agg_status() {
diff --git a/be/src/vec/exec/vanalytic_eval_node.h
b/be/src/vec/exec/vanalytic_eval_node.h
index 8c26c1ad79e..45f7ce5b1e8 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -98,7 +98,7 @@ private:
void _execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
int64_t frame_end);
- Status _reset_agg_status();
+ void _reset_agg_status();
Status _init_result_columns();
Status _create_agg_status();
Status _destroy_agg_status();
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4206d9b6a2f..848256a0aee 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -188,7 +188,7 @@ Status VSortNode::open(RuntimeState* state) {
} while (!eos);
- static_cast<void>(child(0)->close(state));
+ RETURN_IF_ERROR(child(0)->close(state));
mem_tracker()->consume(_sorter->data_size());
COUNTER_UPDATE(_sort_blocks_memory_usage, _sorter->data_size());
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index cc7eabc3bd2..2405373a062 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -276,7 +276,7 @@ Status VUnionNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
// The previous child needs to be closed if passthrough was enabled
for it. In the non
// passthrough case, the child was already closed in the previous call
to get_next().
DCHECK(is_child_passthrough(_to_close_child_idx));
- static_cast<void>(child(_to_close_child_idx)->close(state));
+ RETURN_IF_ERROR(child(_to_close_child_idx)->close(state));
_to_close_child_idx = -1;
}
diff --git a/be/src/vec/functions/function_rpc.cpp
b/be/src/vec/functions/function_rpc.cpp
index a900436ffc5..ba171ffbbc9 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -26,6 +26,7 @@
#include <memory>
#include <utility>
+#include "common/status.h"
#include "runtime/exec_env.h"
#include "util/brpc_client_cache.h"
#include "vec/columns/column.h"
@@ -46,7 +47,7 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block&
block, const ColumnN
PFunctionCallRequest request;
PFunctionCallResponse response;
request.set_function_name(_function_name);
- _convert_block_to_proto(block, arguments, input_rows_count, &request);
+ RETURN_IF_ERROR(_convert_block_to_proto(block, arguments,
input_rows_count, &request));
brpc::Controller cntl;
_client->fn_call(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
@@ -65,16 +66,17 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block&
block, const ColumnN
return Status::OK();
}
-void RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers&
arguments,
- size_t input_rows_count,
PFunctionCallRequest* request) {
+Status RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers&
arguments,
+ size_t input_rows_count,
PFunctionCallRequest* request) {
size_t row_count = std::min(block.rows(), input_rows_count);
for (size_t col_idx : arguments) {
PValues* arg = request->add_args();
ColumnWithTypeAndName& column = block.get_by_position(col_idx);
arg->set_has_null(column.column->has_null(row_count));
auto col = column.column->convert_to_full_column_if_const();
- static_cast<void>(column.type->get_serde()->write_column_to_pb(*col,
*arg, 0, row_count));
+ RETURN_IF_ERROR(column.type->get_serde()->write_column_to_pb(*col,
*arg, 0, row_count));
}
+ return Status::OK();
}
void RPCFnImpl::_convert_to_block(Block& block, const PValues& result, size_t
pos) {
diff --git a/be/src/vec/functions/function_rpc.h
b/be/src/vec/functions/function_rpc.h
index fa23e5b97cb..ae71632f974 100644
--- a/be/src/vec/functions/function_rpc.h
+++ b/be/src/vec/functions/function_rpc.h
@@ -52,9 +52,9 @@ public:
bool available() { return _client != nullptr; }
private:
- void _convert_block_to_proto(vectorized::Block& block,
- const vectorized::ColumnNumbers& arguments,
- size_t input_rows_count,
PFunctionCallRequest* request);
+ Status _convert_block_to_proto(vectorized::Block& block,
+ const vectorized::ColumnNumbers& arguments,
+ size_t input_rows_count,
PFunctionCallRequest* request);
void _convert_to_block(vectorized::Block& block, const PValues& result,
size_t pos);
std::shared_ptr<PFunctionService_Stub> _client;
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 5fbd1ed2c2e..49916048b5c 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -363,7 +363,7 @@ Status VerticalMergeIteratorContext::_load_next_block() {
}
for (auto it = _block_list.begin(); it != _block_list.end(); it++) {
if (it->use_count() == 1) {
- static_cast<void>(block_reset(*it));
+ RETURN_IF_ERROR(block_reset(*it));
_block = *it;
_block_list.erase(it);
break;
@@ -371,7 +371,7 @@ Status VerticalMergeIteratorContext::_load_next_block() {
}
if (_block == nullptr) {
_block = std::make_shared<Block>();
- static_cast<void>(block_reset(_block));
+ RETURN_IF_ERROR(block_reset(_block));
}
Status st = _iter->next_batch(_block.get());
if (!st.ok()) {
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 0e97f1d1720..2d45c884a78 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -292,7 +292,7 @@ Status VMergeIteratorContext::_load_next_block() {
}
for (auto it = _block_list.begin(); it != _block_list.end(); it++) {
if (it->use_count() == 1) {
- static_cast<void>(block_reset(*it));
+ RETURN_IF_ERROR(block_reset(*it));
_block = *it;
_block_list.erase(it);
break;
@@ -300,7 +300,7 @@ Status VMergeIteratorContext::_load_next_block() {
}
if (_block == nullptr) {
_block = std::make_shared<Block>();
- static_cast<void>(block_reset(_block));
+ RETURN_IF_ERROR(block_reset(_block));
}
Status st = _iter->next_batch(_block.get());
if (!st.ok()) {
diff --git a/be/src/vec/sink/async_writer_sink.h
b/be/src/vec/sink/async_writer_sink.h
index 8105ff96573..337fbcf3f99 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -72,7 +72,7 @@ public:
// Prepare the exprs to run.
RETURN_IF_ERROR(VExpr::open(_output_vexpr_ctxs, state));
if (state->enable_pipeline_exec()) {
- _writer->start_writer(state, _profile);
+ RETURN_IF_ERROR(_writer->start_writer(state, _profile));
} else {
RETURN_IF_ERROR(_writer->open(state, _profile));
}
diff --git a/be/src/vec/sink/autoinc_buffer.cpp
b/be/src/vec/sink/autoinc_buffer.cpp
index 844d8ed8524..39613159aca 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -19,6 +19,7 @@
#include <gen_cpp/HeartbeatService_types.h>
+#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
@@ -48,7 +49,7 @@ void AutoIncIDBuffer::_wait_for_prefetching() {
Status AutoIncIDBuffer::sync_request_ids(size_t length,
std::vector<std::pair<int64_t,
size_t>>* result) {
std::unique_lock<std::mutex> lock(_mutex);
- _prefetch_ids(_prefetch_size());
+ RETURN_IF_ERROR(_prefetch_ids(_prefetch_size()));
if (_front_buffer.second > 0) {
auto min_length = std::min(_front_buffer.second, length);
length -= min_length;
@@ -75,13 +76,13 @@ Status AutoIncIDBuffer::sync_request_ids(size_t length,
return Status::OK();
}
-void AutoIncIDBuffer::_prefetch_ids(size_t length) {
+Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
if (_front_buffer.second > _low_water_level_mark() || _is_fetching) {
- return;
+ return Status::OK();
}
TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
_is_fetching = true;
- static_cast<void>(_rpc_token->submit_func([=, this]() {
+ RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
@@ -113,6 +114,7 @@ void AutoIncIDBuffer::_prefetch_ids(size_t length) {
}
_is_fetching = false;
}));
+ return Status::OK();
}
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h
index 80a469596fd..3ec009b0960 100644
--- a/be/src/vec/sink/autoinc_buffer.h
+++ b/be/src/vec/sink/autoinc_buffer.h
@@ -64,7 +64,7 @@ public:
Status sync_request_ids(size_t length, std::vector<std::pair<int64_t,
size_t>>* result);
private:
- void _prefetch_ids(size_t length);
+ Status _prefetch_ids(size_t length);
[[nodiscard]] size_t _prefetch_size() const {
return _batch_size * config::auto_inc_prefetch_size_ratio;
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index a7f1b4b8584..b6e4d0b962b 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -798,7 +798,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status
exec_status) {
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
- static_cast<void>(DataSink::close(state, exec_status));
+ RETURN_IF_ERROR(DataSink::close(state, exec_status));
return final_st;
}
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index 08dd881bf4b..035a76a2f7f 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -132,11 +132,11 @@ Status VResultFileSink::close(RuntimeState* state, Status
exec_status) {
// close sender, this is normal path end
if (_sender) {
_sender->update_return_rows(_writer == nullptr ? 0 :
_writer->get_written_rows());
- static_cast<void>(_sender->close(final_status));
+ RETURN_IF_ERROR(_sender->close(final_status));
}
- static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
+ state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id()));
+ state->fragment_instance_id());
} else {
if (final_status.ok()) {
auto st = _stream_sender->send(state, _output_block.get(), true);
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 59bf82483c5..a7da5a6a5ff 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -170,11 +170,11 @@ Status VResultSink::close(RuntimeState* state, Status
exec_status) {
if (_writer) {
_sender->update_return_rows(_writer->get_written_rows());
}
- static_cast<void>(_sender->close(final_status));
+ RETURN_IF_ERROR(_sender->close(final_status));
}
- static_cast<void>(state->exec_env()->result_mgr()->cancel_at_time(
+ state->exec_env()->result_mgr()->cancel_at_time(
time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id()));
+ state->fragment_instance_id());
return DataSink::close(state, exec_status);
}
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 0d88b9dedfa..fc431b6e863 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -17,6 +17,7 @@
#include "async_result_writer.h"
+#include "common/status.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -86,7 +87,7 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_block_from_queue() {
return block;
}
-void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile*
profile) {
+Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile*
profile) {
// Should set to false here, to
_writer_thread_closed = false;
// This is a async thread, should lock the task ctx, to make sure
runtimestate and profile
@@ -94,7 +95,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state,
RuntimeProfile* profil
auto task_ctx = state->get_task_execution_context();
if (state->get_query_ctx() &&
state->get_query_ctx()->get_non_pipe_exec_thread_pool()) {
ThreadPool* pool_ptr =
state->get_query_ctx()->get_non_pipe_exec_thread_pool();
- static_cast<void>(pool_ptr->submit_func([this, state, profile,
task_ctx]() {
+ RETURN_IF_ERROR(pool_ptr->submit_func([this, state, profile,
task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_writer_thread_closed = true;
@@ -103,7 +104,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state,
RuntimeProfile* profil
this->process_block(state, profile);
}));
} else {
-
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
@@ -113,6 +114,7 @@ void AsyncResultWriter::start_writer(RuntimeState* state,
RuntimeProfile* profil
this->process_block(state, profile);
}));
}
+ return Status::OK();
}
void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile*
profile) {
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 8ed39aeb795..ac14616d333 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -76,7 +76,7 @@ public:
Status sink(Block* block, bool eos);
// Add the IO thread task process block() to thread pool to dispose the IO
- void start_writer(RuntimeState* state, RuntimeProfile* profile);
+ Status start_writer(RuntimeState* state, RuntimeProfile* profile);
Status get_writer_status() {
std::lock_guard l(_m);
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index c30a23ec219..661afa15935 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -199,7 +199,7 @@ Status VFileResultWriter::_get_next_file_name(std::string*
file_name) {
// S3: {file_path}{fragment_instance_id}_
// BROKER: {file_path}{fragment_instance_id}_
-Status VFileResultWriter::_get_file_url(std::string* file_url) {
+void VFileResultWriter::_get_file_url(std::string* file_url) {
std::stringstream ss;
if (_storage_type == TStorageBackendType::LOCAL) {
ss << "file:///" << BackendOptions::get_localhost();
@@ -207,7 +207,6 @@ Status VFileResultWriter::_get_file_url(std::string*
file_url) {
ss << _file_opts->file_path;
ss << print_id(_fragment_instance_id) << "_";
*file_url = ss.str();
- return Status::OK();
}
std::string VFileResultWriter::_file_format_to_name() {
@@ -306,7 +305,7 @@ Status VFileResultWriter::_send_result() {
row_buffer.push_bigint(_written_rows_counter->value()); // total rows
row_buffer.push_bigint(_written_data_bytes->value()); // file size
std::string file_url;
- static_cast<void>(_get_file_url(&file_url));
+ _get_file_url(&file_url);
std::stringstream ss;
ss << file_url << "*";
file_url = ss.str();
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h
b/be/src/vec/sink/writer/vfile_result_writer.h
index 864d0966a77..c52050ba6f1 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -83,7 +83,7 @@ private:
// get next export file name
Status _get_next_file_name(std::string* file_name);
Status _get_success_file_name(std::string* file_name);
- Status _get_file_url(std::string* file_url);
+ void _get_file_url(std::string* file_url);
std::string _file_format_to_name();
// close file writer, and if !done, it will create new writer for next
file.
Status _close_file_writer(bool done);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]