This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6150c54df5e [bugfix](asyncwriter) async writer's lock should not
include finish or close method (#33077)
6150c54df5e is described below
commit 6150c54df5e8410494b60fa9c90469d23a9a1b52
Author: yiguolei <[email protected]>
AuthorDate: Mon Apr 1 10:41:21 2024 +0800
[bugfix](asyncwriter) async writer's lock should not include finish or
close method (#33077)
close or finish method will take a lot of time, and the lock will hold a
lot of time. If there is a bug in close or finish method, it will affect
pipeline execute thread.
writer's close method will need this lock, so that it will hang when close
method is called.
---
be/src/vec/sink/writer/async_result_writer.cpp | 50 +++++++++++++++++---------
1 file changed, 34 insertions(+), 16 deletions(-)
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 379261266ea..e9bb563da96 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -153,27 +153,45 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
}
}
- // If the last block is sent successfuly, then call finish to clear the
buffer or commit
- // transactions.
- // Using lock to make sure the writer status is not modified
- // There is a unique ptr err_msg in Status, if it is modified, the unique
ptr
- // maybe released. And it will core because use after free.
- std::lock_guard l(_m);
+ bool need_finish = false;
+ {
+ // If the last block is sent successfuly, then call finish to clear
the buffer or commit
+ // transactions.
+ // Using lock to make sure the writer status is not modified
+ // There is a unique ptr err_msg in Status, if it is modified, the
unique ptr
+ // maybe released. And it will core because use after free.
+ std::lock_guard l(_m);
+ if (_writer_status.ok() && _eos) {
+ need_finish = true;
+ }
+ }
// eos only means the last block is input to the queue and there is no
more block to be added,
// it is not sure that the block is written to stream.
- if (_writer_status.ok() && _eos) {
- _writer_status = finish(state);
+ if (need_finish) {
+ // Should not call finish in lock because it may hang, and it will
lock _m too long.
+ // And get_writer_status will also need this lock, it will block
pipeline exec thread.
+ Status st = finish(state);
+ std::lock_guard l(_m);
+ _writer_status = st;
+ }
+ Status st = Status::OK();
+ {
+ std::lock_guard l(_m);
+ st = _writer_status;
}
- // should set _finish_dependency first, as close function maybe blocked by
wait_close of execution_timeout
- _set_ready_to_finish();
- Status close_st = close(_writer_status);
- // If it is already failed before, then not update the write status so
that we could get
- // the real reason.
- if (_writer_status.ok()) {
- _writer_status = close_st;
+ Status close_st = close(st);
+ {
+ // If it is already failed before, then not update the write status so
that we could get
+ // the real reason.
+ std::lock_guard l(_m);
+ if (_writer_status.ok()) {
+ _writer_status = close_st;
+ }
+ _writer_thread_closed = true;
}
- _writer_thread_closed = true;
+ // should set _finish_dependency first, as close function maybe blocked by
wait_close of execution_timeout
+ _set_ready_to_finish();
}
void AsyncResultWriter::_set_ready_to_finish() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]