This is an automated email from the ASF dual-hosted git repository.
zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f5df730d6c0 [fix](load) Use atomic operations for _try_close flag and
remove unused _close_wait (#61593)
f5df730d6c0 is described below
commit f5df730d6c0268e5fc81729f0c6d7c9ce1f4eb3f
Author: zclllyybb <[email protected]>
AuthorDate: Mon Mar 23 15:37:47 2026 +0800
[fix](load) Use atomic operations for _try_close flag and remove unused
_close_wait (#61593)
- Change `_try_close` from `bool` to `std::atomic<bool>` with
acquire/release memory ordering
- Use `memory_order_acquire` when reading in `_send_batch_process()`
(bthread)
- Use `memory_order_release` when writing in `_do_try_close()` (pthread)
- Remove unused `_close_wait` field
This fixes a potential race condition where `_try_close` is written by
pthread and read by bthread without proper synchronization.
It's ok on x86 so no test added.
---
be/src/exec/sink/writer/vtablet_writer.cpp | 5 ++---
be/src/exec/sink/writer/vtablet_writer.h | 5 ++---
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 52097d90357..39b23500e06 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -1395,7 +1395,7 @@ void VTabletWriter::_send_batch_process() {
// we must RECHECK opened_nodes below, after got closed signal,
because it may changed. Think of this:
// checked opened_nodes = 0 ---> new block arrived ---> task
finished, close() was called ---> we got _try_close here
// if we don't check again, we may lose the last package.
- if (_try_close) {
+ if (_try_close.load(std::memory_order_acquire)) {
opened_nodes = 0;
std::ranges::for_each(_channels,
[&opened_nodes](const
std::shared_ptr<IndexChannel>& ich) {
@@ -1785,7 +1785,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
status = _send_new_partition_batch();
}
- _try_close = true; // will stop periodic thread
+ _try_close.store(true, std::memory_order_release); // will stop periodic
thread
if (status.ok()) {
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
@@ -1866,7 +1866,6 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
if (!status.ok()) {
_cancel_all_channel(status);
_close_status = status;
- _close_wait = true;
}
}
diff --git a/be/src/exec/sink/writer/vtablet_writer.h
b/be/src/exec/sink/writer/vtablet_writer.h
index d9e3869e68e..d3e6e8da0f1 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -752,9 +752,8 @@ private:
// Save the status of try_close() and close() method
Status _close_status;
// if we called try_close(), for auto partition the periodic send thread
should stop if it's still waiting for node channels first-time open.
- bool _try_close = false;
- // for non-pipeline, if close() did something, close_wait() should wait it.
- bool _close_wait = false;
+ // atomic: written by pthread (_do_try_close), read by bthread
(_send_batch_process)
+ std::atomic<bool> _try_close {false};
bool _inited = false;
bool _write_file_cache = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]