This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2f58d4d3d35 [atomicstatus](be) add atomic status to share state
between multi thread (#35002)
2f58d4d3d35 is described below
commit 2f58d4d3d359ce286f99372a65084c101c8fa213
Author: yiguolei <[email protected]>
AuthorDate: Fri May 17 22:47:00 2024 +0800
[atomicstatus](be) add atomic status to share state between multi thread
(#35002)
---
be/src/common/status.h | 44 ++++++++++++++++++++++++++
be/src/vec/sink/writer/async_result_writer.cpp | 16 ++++------
be/src/vec/sink/writer/async_result_writer.h | 8 ++---
3 files changed, 53 insertions(+), 15 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 09a22311af9..6f587d5a28f 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -545,6 +545,50 @@ private:
}
};
+// There are many thread using status to indicate the cancel state, one thread
may update it and
+// the other thread will read it. Status is not thread safe, for example, if
one thread is update it
+// and another thread is call to_string method, it may core, because the
_err_msg is an unique ptr and
+// it is deconstructed during copy method.
+// And also we could not use lock, because we need get status frequently to
check if it is cancelled.
+// The defaule value is ok.
+class AtomicStatus {
+public:
+ AtomicStatus() : error_st_(Status::OK()) {}
+
+ bool ok() const { return error_code_.load() == 0; }
+
+ bool update(const Status& new_status) {
+ // If new status is normal, or the old status is abnormal, then not
need update
+ if (new_status.ok() || error_code_.load() != 0) {
+ return false;
+ }
+ int16_t expected_error_code = 0;
+ if (error_code_.compare_exchange_strong(expected_error_code,
new_status.code(),
+ std::memory_order_acq_rel)) {
+ // lock here for read status, to avoid core during return error_st_
+ std::lock_guard l(mutex_);
+ error_st_ = new_status;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ // will copy a new status object to avoid concurrency
+ Status status() {
+ std::lock_guard l(mutex_);
+ return error_st_;
+ }
+
+private:
+ std::atomic_int16_t error_code_ = 0;
+ Status error_st_;
+ std::mutex mutex_;
+
+ AtomicStatus(const AtomicStatus&) = delete;
+ void operator=(const AtomicStatus&) = delete;
+};
+
inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
ostr << '[' << status.code_as_string() << ']';
ostr << status.msg();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 54949599456..814d1b754c4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -55,7 +55,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
// if io task failed, just return error status to
// end the query
if (!_writer_status.ok()) {
- return _writer_status;
+ return _writer_status.status();
}
if (_dependency && _is_finished()) {
@@ -143,7 +143,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
auto status = write(*block);
if (!status.ok()) [[unlikely]] {
std::unique_lock l(_m);
- _writer_status = status;
+ _writer_status.update(status);
if (_dependency && _is_finished()) {
_dependency->set_ready();
}
@@ -172,14 +172,10 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
// Should not call finish in lock because it may hang, and it will
lock _m too long.
// And get_writer_status will also need this lock, it will block
pipeline exec thread.
Status st = finish(state);
- std::lock_guard l(_m);
- _writer_status = st;
+ _writer_status.update(st);
}
Status st = Status::OK();
- {
- std::lock_guard l(_m);
- st = _writer_status;
- }
+ { st = _writer_status.status(); }
Status close_st = close(st);
{
@@ -187,7 +183,7 @@ void AsyncResultWriter::process_block(RuntimeState* state,
RuntimeProfile* profi
// the real reason.
std::lock_guard l(_m);
if (_writer_status.ok()) {
- _writer_status = close_st;
+ _writer_status.update(close_st);
}
_writer_thread_closed = true;
}
@@ -215,7 +211,7 @@ Status
AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc
void AsyncResultWriter::force_close(Status s) {
std::lock_guard l(_m);
- _writer_status = s;
+ _writer_status.update(s);
if (_dependency && _is_finished()) {
_dependency->set_ready();
}
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 793d7e629eb..b1426a48806 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -78,10 +78,7 @@ public:
// Add the IO thread task process block() to thread pool to dispose the IO
Status start_writer(RuntimeState* state, RuntimeProfile* profile);
- Status get_writer_status() {
- std::lock_guard l(_m);
- return _writer_status;
- }
+ Status get_writer_status() { return _writer_status.status(); }
protected:
Status _projection_block(Block& input_block, Block* output_block);
@@ -103,7 +100,8 @@ private:
std::mutex _m;
std::condition_variable _cv;
std::deque<std::unique_ptr<Block>> _data_queue;
- Status _writer_status = Status::OK();
+ // Default value is ok
+ AtomicStatus _writer_status;
bool _eos = false;
// The writer is not started at the beginning. If prepare failed but not
open, the the writer
// is not started, so should not pending finish on it.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]