This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9a7f05caf86 [refactor](status) refactor querycontext and runtime state
status (#35035)
9a7f05caf86 is described below
commit 9a7f05caf865bfac301ef2c62ba01aa4a3b03801
Author: yiguolei <[email protected]>
AuthorDate: Mon May 20 13:12:53 2024 +0800
[refactor](status) refactor querycontext and runtime state status (#35035)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/status.h | 6 ++--
be/src/olap/delta_writer_v2.cpp | 2 +-
.../local_exchange_sink_operator.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 10 ++----
be/src/pipeline/pipeline_fragment_context.h | 10 ------
be/src/runtime/fragment_mgr.cpp | 8 ++---
be/src/runtime/group_commit_mgr.cpp | 4 +--
be/src/runtime/plan_fragment_executor.cpp | 5 ++-
be/src/runtime/query_context.cpp | 19 ++++-------
be/src/runtime/query_context.h | 29 ++++-------------
be/src/runtime/runtime_state.cpp | 19 ++---------
be/src/runtime/runtime_state.h | 37 ++++------------------
be/src/udf/udf.cpp | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 2 +-
14 files changed, 40 insertions(+), 115 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 6f587d5a28f..f0a02157c22 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -575,7 +575,7 @@ public:
}
// will copy a new status object to avoid concurrency
- Status status() {
+ Status status() const {
std::lock_guard l(mutex_);
return error_st_;
}
@@ -583,7 +583,9 @@ public:
private:
std::atomic_int16_t error_code_ = 0;
Status error_st_;
- std::mutex mutex_;
+ // mutex's lock is not a const method, but we will use this mutex in
+ // some const method, so that it should be mutable.
+ mutable std::mutex mutex_;
AtomicStatus(const AtomicStatus&) = delete;
void operator=(const AtomicStatus&) = delete;
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index 34c03fee95e..80978280b92 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -156,7 +156,7 @@ Status DeltaWriterV2::write(const vectorized::Block* block,
const std::vector<ui
{ memtable_flush_running_count_limit = 0; });
while (_memtable_writer->flush_running_count() >=
memtable_flush_running_count_limit) {
if (_state->is_cancelled()) {
- return Status::Cancelled(_state->cancel_reason());
+ return _state->cancel_reason();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 9f6cb670b7d..043b66b2e08 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -54,7 +54,7 @@ Status LocalExchangeSinkLocalState::close(RuntimeState*
state, Status exec_statu
if (exec_status.ok()) {
DCHECK(_release_count) << "Do not finish correctly! " <<
debug_string(0)
<< " state: { cancel = " <<
state->is_cancelled() << ", "
- << state->query_status().to_string() << "}
query ctx: { cancel = "
+ << state->cancel_reason().to_string() << "}
query ctx: { cancel = "
<< state->get_query_ctx()->is_cancelled() << ",
"
<<
state->get_query_ctx()->exec_status().to_string() << "}";
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index dfc8ab434a7..8f90e2389ab 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1441,8 +1441,8 @@ Status PipelineFragmentContext::submit() {
for (auto& t : task) {
st = scheduler->schedule_task(t.get());
if (!st) {
- std::lock_guard<std::mutex> l(_status_lock);
cancel(Status::InternalError("submit context to executor
fail"));
+ std::lock_guard<std::mutex> l(_task_mutex);
_total_tasks = submit_tasks;
break;
}
@@ -1539,12 +1539,7 @@ void PipelineFragmentContext::close_a_pipeline() {
}
Status PipelineFragmentContext::send_report(bool done) {
- Status exec_status = Status::OK();
- {
- std::lock_guard<std::mutex> l(_status_lock);
- exec_status = _query_ctx->exec_status();
- }
-
+ Status exec_status = _query_ctx->exec_status();
// If plan is done successfully, but _is_report_success is false,
// no need to send report.
if (!_is_report_success && done && exec_status.ok()) {
@@ -1577,7 +1572,6 @@ Status PipelineFragmentContext::send_report(bool done) {
TUniqueId(),
-1,
_runtime_state.get(),
- [this](Status st) { return update_status(st); },
[this](const Status& reason) { cancel(reason); }};
return _report_status_cb(
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 383706ee9e5..f4e324b6f53 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -107,14 +107,6 @@ public:
Status send_report(bool);
- Status update_status(Status status) {
- std::lock_guard<std::mutex> l(_status_lock);
- if (!status.ok() && _query_ctx->exec_status().ok()) {
- _query_ctx->set_exec_status(status);
- }
- return _query_ctx->exec_status();
- }
-
void trigger_report_if_necessary();
void refresh_next_report_time();
@@ -207,8 +199,6 @@ private:
std::atomic_bool _prepared = false;
bool _submitted = false;
- std::mutex _status_lock;
-
Pipelines _pipelines;
PipelineId _next_pipeline_id = 0;
std::mutex _task_mutex;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index faefadfc49a..d910dfe97d5 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -211,14 +211,14 @@ Status FragmentMgr::trigger_pipeline_context_report(
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
- Status exec_status = req.update_fn(req.status);
+ Status exec_status = req.status;
Status coord_status;
FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
req.coord_addr,
&coord_status);
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(req.query_id.hi, req.query_id.lo);
- static_cast<void>(req.update_fn(Status::InternalError(
+ static_cast<void>(req.cancel_fn(Status::InternalError(
"query_id: {}, couldn't get a client for {}, reason is {}",
uid.to_string(),
PrintThriftNetworkAddress(req.coord_addr),
coord_status.to_string())));
return;
@@ -438,7 +438,6 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
- static_cast<void>(req.update_fn(rpc_status));
req.cancel_fn(rpc_status);
return;
}
@@ -455,7 +454,6 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
LOG_INFO("Going to cancel instance {} since report exec status got rpc
failed: {}",
print_id(req.fragment_instance_id), rpc_status.to_string());
// we need to cancel the execution of this fragment
- static_cast<void>(req.update_fn(rpc_status));
req.cancel_fn(rpc_status);
}
}
@@ -589,7 +587,7 @@ Status FragmentMgr::start_query_execution(const
PExecPlanFragmentStartRequest* r
"timeout or be cancelled. host: {}",
BackendOptions::get_localhost());
}
- search->second->set_ready_to_execute(false);
+ search->second->set_ready_to_execute(Status::OK());
return Status::OK();
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 62fbbd37979..7c182814e12 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -57,7 +57,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
}
}
if (UNLIKELY(runtime_state->is_cancelled())) {
- return Status::Cancelled<false>(runtime_state->cancel_reason());
+ return runtime_state->cancel_reason();
}
RETURN_IF_ERROR(status);
if (block->rows() > 0) {
@@ -134,7 +134,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
}
if (runtime_state->is_cancelled()) {
- auto st = Status::Cancelled<false>(runtime_state->cancel_reason());
+ auto st = runtime_state->cancel_reason();
_cancel_without_lock(st);
return st;
}
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 64bc0cde977..304a633773a 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -502,7 +502,6 @@ void PlanFragmentExecutor::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
- std::bind(&PlanFragmentExecutor::update_status, this,
std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, this,
std::placeholders::_1)};
// This will send a report even if we are cancelled. If the query
completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the
coordinator will
@@ -550,9 +549,9 @@ void PlanFragmentExecutor::cancel(const Status& reason) {
if (reason.is<ErrorCode::LIMIT_REACH>()) {
_is_report_on_cancel = false;
}
- _runtime_state->set_is_cancelled(reason.to_string());
+ _runtime_state->cancel(reason);
// To notify wait_for_start()
- _query_ctx->set_ready_to_execute(true);
+ _query_ctx->set_ready_to_execute(reason);
// must close stream_mgr to avoid dead lock in Exchange Node
_exec_env->vstream_mgr()->cancel(_fragment_instance_id, reason);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c141fc3b223..5360fbe4e4b 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -182,17 +182,15 @@ QueryContext::~QueryContext() {
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id),
mem_tracker_msg);
}
-void QueryContext::set_ready_to_execute(bool is_cancelled) {
+void QueryContext::set_ready_to_execute(Status reason) {
set_execution_dependency_ready();
{
std::lock_guard<std::mutex> l(_start_lock);
- if (!_is_cancelled) {
- _is_cancelled = is_cancelled;
- }
+ _exec_status.update(reason);
_ready_to_execute = true;
}
- if (query_mem_tracker && is_cancelled) {
- query_mem_tracker->set_is_query_cancelled(is_cancelled);
+ if (query_mem_tracker && !reason.ok()) {
+ query_mem_tracker->set_is_query_cancelled(!reason.ok());
}
_start_cond.notify_all();
}
@@ -211,16 +209,11 @@ void QueryContext::set_execution_dependency_ready() {
}
void QueryContext::cancel(Status new_status, int fragment_id) {
- // we must get this wrong status once query ctx's `_is_cancelled` = true.
- set_exec_status(new_status);
- // Just for CAS need a left value
- bool false_cancel = false;
- if (!_is_cancelled.compare_exchange_strong(false_cancel, true)) {
+ if (!_exec_status.update(new_status)) {
return;
}
- DCHECK(!false_cancel && _is_cancelled);
- set_ready_to_execute(true);
+ set_ready_to_execute(new_status);
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index bc2d8fbee7c..37f17b21c87 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -61,7 +61,6 @@ struct ReportStatusRequest {
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
- std::function<Status(Status)> update_fn;
std::function<void(const Status&)> cancel_fn;
};
@@ -103,9 +102,9 @@ public:
ThreadPoolToken* get_token() { return _thread_token.get(); }
- void set_ready_to_execute(bool is_cancelled);
+ void set_ready_to_execute(Status reason);
- [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
+ [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
void cancel_all_pipeline_context(const Status& reason);
Status cancel_pipeline_context(const int fragment_id, const Status&
reason);
@@ -113,21 +112,9 @@ public:
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
void cancel(Status new_status, int fragment_id = -1);
- void set_exec_status(Status new_status) {
- if (new_status.ok()) {
- return;
- }
- std::lock_guard<std::mutex> l(_exec_status_lock);
- if (!_exec_status.ok()) {
- return;
- }
- _exec_status = new_status;
- }
+ void set_exec_status(Status new_status) { _exec_status.update(new_status);
}
- [[nodiscard]] Status exec_status() {
- std::lock_guard<std::mutex> l(_exec_status_lock);
- return _exec_status;
- }
+ [[nodiscard]] Status exec_status() { return _exec_status.status(); }
void set_execution_dependency_ready();
@@ -141,10 +128,10 @@ public:
bool wait_for_start() {
int wait_time = config::max_fragment_start_wait_time_seconds;
std::unique_lock<std::mutex> l(_start_lock);
- while (!_ready_to_execute.load() && !_is_cancelled.load() &&
--wait_time > 0) {
+ while (!_ready_to_execute.load() && _exec_status.ok() && --wait_time >
0) {
_start_cond.wait_for(l, std::chrono::seconds(1));
}
- return _ready_to_execute.load() && !_is_cancelled.load();
+ return _ready_to_execute.load() && _exec_status.ok();
}
std::shared_ptr<vectorized::SharedHashTableController>
get_shared_hash_table_controller() {
@@ -313,7 +300,6 @@ private:
// Only valid when _need_wait_execution_trigger is set to true in
PlanFragmentExecutor.
// And all fragments of this query will start execution when this is set
to true.
std::atomic<bool> _ready_to_execute {false};
- std::atomic<bool> _is_cancelled {false};
void _init_query_mem_tracker();
@@ -325,10 +311,9 @@ private:
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;
- std::mutex _exec_status_lock;
// All pipeline tasks use the same query context to report status. So we
need a `_exec_status`
// to report the real message if failed.
- Status _exec_status = Status::OK();
+ AtomicStatus _exec_status;
doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index b5974ebd1da..ac560c2c7e1 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -58,7 +58,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
_query_id(fragment_exec_params.query_id),
- _is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
_num_rows_load_filtered(0),
@@ -114,7 +113,6 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id,
const TUniqueId& query_
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
- _is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
_num_rows_load_filtered(0),
@@ -152,7 +150,6 @@
RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
- _is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
_num_rows_load_filtered(0),
@@ -186,7 +183,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id,
int32_t fragment_id,
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
- _is_cancelled(false),
_per_fragment_instance_idx(0),
_num_rows_load_total(0),
_num_rows_load_filtered(0),
@@ -219,7 +215,6 @@ RuntimeState::RuntimeState(const TQueryGlobals&
query_globals)
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
- _is_cancelled(false),
_per_fragment_instance_idx(0) {
_query_options.batch_size = DEFAULT_BATCH_SIZE;
if (query_globals.__isset.time_zone && query_globals.__isset.nano_seconds)
{
@@ -254,7 +249,6 @@ RuntimeState::RuntimeState()
_obj_pool(new ObjectPool()),
_data_stream_recvrs_pool(new ObjectPool()),
_unreported_error_idx(0),
- _is_cancelled(false),
_per_fragment_instance_idx(0) {
_query_options.batch_size = DEFAULT_BATCH_SIZE;
_timezone = TimezoneUtils::default_time_zone;
@@ -358,20 +352,13 @@ void
RuntimeState::get_unreported_errors(std::vector<std::string>* new_errors) {
}
}
-Status RuntimeState::query_status() {
- auto st = _query_ctx->exec_status();
- RETURN_IF_ERROR(st);
- std::lock_guard<std::mutex> l(_process_status_lock);
- return _process_status;
-}
-
bool RuntimeState::is_cancelled() const {
// Maybe we should just return _is_cancelled.load()
- return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
+ return !_exec_status.ok() || (_query_ctx && _query_ctx->is_cancelled());
}
-std::string RuntimeState::cancel_reason() const {
- return _cancel_reason;
+Status RuntimeState::cancel_reason() const {
+ return _exec_status.status();
}
const int64_t MAX_ERROR_NUM = 50;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f2e2c887571..d5ebac0f3fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -190,7 +190,6 @@ public:
return _query_options.__isset.mysql_row_binary_format &&
_query_options.mysql_row_binary_format;
}
- Status query_status();
// Appends error to the _error_log if there is space
bool log_error(const std::string& error);
@@ -206,21 +205,19 @@ public:
void get_unreported_errors(std::vector<std::string>* new_errors);
[[nodiscard]] bool is_cancelled() const;
- std::string cancel_reason() const;
- int codegen_level() const { return _query_options.codegen_level; }
- void set_is_cancelled(std::string msg) {
- if (!_is_cancelled.exchange(true)) {
- _cancel_reason = msg;
+ Status cancel_reason() const;
+ void cancel(const Status& reason) {
+ if (_exec_status.update(reason)) {
// Create a error status, so that we could print error stack, and
// we could know which path call cancel.
LOG(WARNING) << "Task is cancelled, instance: "
<< PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
- << ", st = " <<
Status::Error<ErrorCode::CANCELLED>(msg);
+ << ", st = " << reason;
} else {
LOG(WARNING) << "Task is already cancelled, instance: "
<< PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
- << ", original cancel msg: " << _cancel_reason
- << ", new cancel msg: " <<
Status::Error<ErrorCode::CANCELLED>(msg);
+ << ", original cancel msg: " << _exec_status.status()
+ << ", new cancel msg: " << reason;
}
}
@@ -230,18 +227,6 @@ public:
void set_be_number(int be_number) { _be_number = be_number; }
int be_number(void) const { return _be_number; }
- // Sets _process_status with err_msg if no error has been set yet.
- void set_process_status(const Status& status) {
- if (status.ok()) {
- return;
- }
- std::lock_guard<std::mutex> l(_process_status_lock);
- if (!_process_status.ok()) {
- return;
- }
- _process_status = status;
- }
-
std::vector<std::string>& output_files() { return _output_files; }
void set_import_label(const std::string& import_label) { _import_label =
import_label; }
@@ -693,9 +678,7 @@ private:
TQueryOptions _query_options;
ExecEnv* _exec_env = nullptr;
- // if true, execution should stop with a CANCELLED status
- std::atomic<bool> _is_cancelled;
- std::string _cancel_reason;
+ AtomicStatus _exec_status;
int _per_fragment_instance_idx;
int _num_per_fragment_instances = 0;
@@ -709,12 +692,6 @@ private:
// used as send id
int _be_number;
- // Non-OK if an error has occurred and query execution should abort. Used
only for
- // asynchronously reporting such errors (e.g., when a UDF reports an
error), so this
- // will not necessarily be set in all error cases.
- std::mutex _process_status_lock;
- Status _process_status;
-
// put here to collect files??
std::vector<std::string> _output_files;
std::atomic<int64_t> _num_rows_load_total; // total rows read from
source
diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp
index 6dd1b5112d0..3e04208e59a 100644
--- a/be/src/udf/udf.cpp
+++ b/be/src/udf/udf.cpp
@@ -84,7 +84,7 @@ void FunctionContext::set_error(const char* error_msg) {
ss << "UDF ERROR: " << error_msg;
if (_state != nullptr) {
- _state->set_process_status(Status::InternalError(ss.str()));
+ _state->cancel(Status::InternalError(ss.str()));
}
}
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index e4065a66618..621a9ad1131 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -905,7 +905,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
_close_time_ms = UnixMillis() - _close_time_ms;
if (_cancelled || state->is_cancelled()) {
- cancel(state->cancel_reason());
+ cancel(state->cancel_reason().to_string());
}
if (_add_batches_finished) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]