This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new a47f436f025 [pick](branch-4.1)pick 62947 63055 63070 to 4.1 (#63297)
a47f436f025 is described below
commit a47f436f0256c4dc8e4eaca106b667477321495a
Author: Mryange <[email protected]>
AuthorDate: Wed May 20 15:37:51 2026 +0800
[pick](branch-4.1)pick 62947 63055 63070 to 4.1 (#63297)
### What problem does this PR solve?
https://github.com/apache/doris/pull/62947
https://github.com/apache/doris/pull/63055
https://github.com/apache/doris/pull/63070
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/CMakeLists.txt | 5 +-
be/src/common/thread_safety_annotations.h | 171 ++++++++++++
be/src/exec/operator/analytic_sink_operator.cpp | 4 +-
be/src/exec/operator/analytic_source_operator.cpp | 9 +-
be/src/exec/operator/data_queue.cpp | 233 +++++++++-------
be/src/exec/operator/data_queue.h | 111 ++++----
be/src/exec/operator/exchange_sink_operator.cpp | 3 +-
be/src/exec/operator/exchange_sink_operator.h | 6 +-
be/src/exec/operator/hashjoin_build_sink.cpp | 8 +-
be/src/exec/operator/hashjoin_build_sink.h | 5 +-
be/src/exec/operator/multi_cast_data_streamer.cpp | 19 +-
be/src/exec/operator/multi_cast_data_streamer.h | 19 +-
.../exec/operator/partition_sort_sink_operator.cpp | 10 +-
.../operator/partition_sort_source_operator.cpp | 9 +-
be/src/exec/operator/scan_operator.cpp | 4 +-
be/src/exec/operator/scan_operator.h | 4 +-
be/src/exec/pipeline/dependency.h | 19 +-
be/test/exec/pipeline/data_queue_test.cpp | 309 ++++++++++++++++++++-
.../pipeline/multi_cast_data_streamer_test.cpp | 17 +-
19 files changed, 740 insertions(+), 225 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1de3541d3eb..d9f9c4bce82 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -335,8 +335,9 @@ if (COMPILER_CLANG)
-Wunused-template
-Wunused-member-function
-Wunused-macros
- -Wconversion)
- add_compile_options( -Wno-gnu-statement-expression
+ -Wconversion
+ -Wthread-safety)
+ add_compile_options(-Wno-gnu-statement-expression
-Wno-implicit-float-conversion
-Wno-sign-conversion
)
diff --git a/be/src/common/thread_safety_annotations.h
b/be/src/common/thread_safety_annotations.h
new file mode 100644
index 00000000000..6cd8d4b0cae
--- /dev/null
+++ b/be/src/common/thread_safety_annotations.h
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Thread safety annotation macros and annotated mutex wrappers for
+// Clang's -Wthread-safety static analysis.
+// Reference: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
+
+#pragma once
+
+#include <mutex>
+
+#ifdef BE_TEST
+namespace doris {
+void mock_random_sleep();
+} // namespace doris
+#endif
+
+// Enable thread safety attributes only with clang.
+// The attributes can be safely erased when compiling with other compilers.
+#if defined(__clang__) && (!defined(SWIG))
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
+#else
+#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
+#endif
+
+#define CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))
+
+#define SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)
+
+#define GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))
+
+#define PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))
+
+#define ACQUIRED_BEFORE(...)
THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))
+
+#define ACQUIRED_AFTER(...)
THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))
+
+#define REQUIRES(...)
THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))
+
+#define REQUIRES_SHARED(...)
THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))
+
+#define ACQUIRE(...)
THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))
+
+#define ACQUIRE_SHARED(...)
THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))
+
+#define RELEASE(...)
THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))
+
+#define RELEASE_SHARED(...)
THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE(...)
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))
+
+#define TRY_ACQUIRE_SHARED(...) \
+ THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))
+
+#define EXCLUDES(...)
THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))
+
+#define ASSERT_CAPABILITY(x)
THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))
+
+#define ASSERT_SHARED_CAPABILITY(x)
THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))
+
+#define RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))
+
+#define NO_THREAD_SAFETY_ANALYSIS
THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)
+
+// Annotated mutex wrapper for use with Clang thread safety analysis.
+// Wraps std::mutex and provides the CAPABILITY annotation so that
+// GUARDED_BY / REQUIRES / etc. annotations can reference it.
+class CAPABILITY("mutex") AnnotatedMutex {
+public:
+ void lock() ACQUIRE() { _mutex.lock(); }
+ void unlock() RELEASE() { _mutex.unlock(); }
+ bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); }
+
+ // Access the underlying std::mutex (e.g., for std::condition_variable).
+ // Use with care — this bypasses thread safety annotations.
+ std::mutex& native_handle() { return _mutex; }
+
+private:
+ std::mutex _mutex;
+};
+
+// RAII scoped lock guard annotated for thread safety analysis.
+// In BE_TEST builds, injects a random sleep before acquiring and after
+// releasing the lock to exercise concurrent code paths.
+template <typename MutexType>
+class SCOPED_CAPABILITY LockGuard {
+public:
+ explicit LockGuard(MutexType& mu) ACQUIRE(mu) : _mu(mu) {
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ _mu.lock();
+ }
+ ~LockGuard() RELEASE() {
+ _mu.unlock();
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ }
+
+ LockGuard(const LockGuard&) = delete;
+ LockGuard& operator=(const LockGuard&) = delete;
+
+private:
+ MutexType& _mu;
+};
+
+// RAII unique lock annotated for thread safety analysis.
+// Supports manual lock/unlock while preserving capability tracking.
+template <typename MutexType>
+class SCOPED_CAPABILITY UniqueLock {
+public:
+ explicit UniqueLock(MutexType& mu) ACQUIRE(mu) : _mu(&mu), _locked(true) {
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ _mu->lock();
+ }
+
+ UniqueLock(MutexType& mu, std::adopt_lock_t) REQUIRES(mu) : _mu(&mu),
_locked(true) {}
+
+ UniqueLock(MutexType& mu, std::defer_lock_t) EXCLUDES(mu) : _mu(&mu),
_locked(false) {}
+
+ ~UniqueLock() RELEASE() {
+ if (_locked) {
+ _mu->unlock();
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ }
+ }
+
+ void lock() ACQUIRE() {
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ _mu->lock();
+ _locked = true;
+ }
+
+ void unlock() RELEASE() {
+ _mu->unlock();
+ _locked = false;
+#ifdef BE_TEST
+ doris::mock_random_sleep();
+#endif
+ }
+
+ bool owns_lock() const { return _locked; }
+
+ UniqueLock(const UniqueLock&) = delete;
+ UniqueLock& operator=(const UniqueLock&) = delete;
+
+private:
+ MutexType* _mu;
+ bool _locked;
+};
diff --git a/be/src/exec/operator/analytic_sink_operator.cpp
b/be/src/exec/operator/analytic_sink_operator.cpp
index db5ac2ecd37..7a6b0d659c0 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -461,7 +461,7 @@ void AnalyticSinkLocalState::_init_result_columns() {
void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(Block*
block) {
size_t buffer_size = 0;
{
- std::unique_lock<std::mutex> lc(_shared_state->buffer_mutex);
+ LockGuard lc(_shared_state->buffer_mutex);
_shared_state->blocks_buffer.push(std::move(*block));
buffer_size = _shared_state->blocks_buffer.size();
}
@@ -756,7 +756,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, Block* input_bloc
RETURN_IF_ERROR(_add_input_block(state, input_block));
RETURN_IF_ERROR(local_state._execute_impl());
if (local_state._input_eos) {
- std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ LockGuard lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read(); // ready for source to
read
}
diff --git a/be/src/exec/operator/analytic_source_operator.cpp
b/be/src/exec/operator/analytic_source_operator.cpp
index de385e3dede..efdb3055ff6 100644
--- a/be/src/exec/operator/analytic_source_operator.cpp
+++ b/be/src/exec/operator/analytic_source_operator.cpp
@@ -53,7 +53,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, Block* output_blo
output_block->clear_column_data();
size_t output_rows = 0;
{
- std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
+ LockGuard lock(local_state._shared_state->buffer_mutex);
if (!local_state._shared_state->blocks_buffer.empty()) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
@@ -61,11 +61,10 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, Block* output_blo
//if buffer have no data and sink not eos, block reading and wait
for signal again
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
output_block,
output_block->columns()));
- if (local_state._shared_state->blocks_buffer.empty() &&
- !local_state._shared_state->sink_eos) {
+ if (local_state._shared_state->blocks_buffer.empty()) {
// add this mutex to check, as in some case maybe is doing
block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have
set eos and set ready, but here set block() by mistake
- std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ LockGuard lc(local_state._shared_state->sink_eos_lock);
if (!local_state._shared_state->sink_eos) {
local_state._dependency->block(); // block
self source
local_state._dependency->set_ready_to_write(); // ready
for sink write
@@ -73,7 +72,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, Block* output_blo
}
} else {
//iff buffer have no data and sink eos, set eos
- std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ LockGuard lc(local_state._shared_state->sink_eos_lock);
*eos = local_state._shared_state->sink_eos;
}
}
diff --git a/be/src/exec/operator/data_queue.cpp
b/be/src/exec/operator/data_queue.cpp
index 84ba08f309f..b41f342ba07 100644
--- a/be/src/exec/operator/data_queue.cpp
+++ b/be/src/exec/operator/data_queue.cpp
@@ -20,43 +20,114 @@
#include <glog/logging.h>
#include <algorithm>
-#include <mutex>
#include <utility>
+#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
#include "exec/pipeline/dependency.h"
namespace doris {
#include "common/compile_check_begin.h"
-DataQueue::DataQueue(int child_count)
- : _queue_blocks_lock(child_count),
- _queue_blocks(child_count),
- _free_blocks_lock(child_count),
- _free_blocks(child_count),
- _child_count(child_count),
- _is_finished(child_count),
- _is_canceled(child_count),
- _cur_bytes_in_queue(child_count),
- _cur_blocks_nums_in_queue(child_count),
- _flag_queue_idx(0) {
- for (int i = 0; i < child_count; ++i) {
- _queue_blocks_lock[i].reset(new std::mutex());
- _free_blocks_lock[i].reset(new std::mutex());
- _is_finished[i] = false;
- _is_canceled[i] = false;
- _cur_bytes_in_queue[i] = 0;
- _cur_blocks_nums_in_queue[i] = 0;
+
+void SubQueue::try_pop(std::unique_ptr<Block>* output_block) {
+ LockGuard l(queue_lock);
+ if (!blocks.empty()) {
+ *output_block = std::move(blocks.front());
+ blocks.pop_front();
+ bytes_in_queue -= (*output_block)->allocated_bytes();
+ blocks_in_queue -= 1;
+ if (blocks.empty()) {
+ sink_dependency->set_ready();
+ }
+ }
+}
+
+bool SubQueue::try_push(std::unique_ptr<Block> block, std::atomic_uint32_t&
total_counter) {
+ LockGuard l(queue_lock);
+ if (is_finished) {
+ return false;
+ }
+ total_counter++;
+ bytes_in_queue += block->allocated_bytes();
+ blocks.emplace_back(std::move(block));
+ blocks_in_queue += 1;
+ if (static_cast<int64_t>(blocks.size()) > max_blocks_in_queue.load()) {
+ sink_dependency->block();
+ }
+ return true;
+}
+
+bool SubQueue::mark_finished(std::atomic_uint32_t& unfinished_counter,
+ std::atomic_bool& all_finished) {
+ LockGuard l(queue_lock);
+ if (is_finished) {
+ return false;
+ }
+ is_finished = true;
+ if (unfinished_counter.fetch_sub(1) == 1) {
+ all_finished = true;
+ }
+ return true;
+}
+
+void SubQueue::clear_blocks() {
+ bool need_set_always_ready = false;
+ {
+ LockGuard l(queue_lock);
+ if (!blocks.empty()) {
+ blocks.clear();
+ bytes_in_queue = 0;
+ blocks_in_queue = 0;
+ need_set_always_ready = true;
+ }
+ }
+ // Notify outside of queue_lock to keep lock ordering simple.
+ if (need_set_always_ready) {
+ sink_dependency->set_always_ready();
+ }
+}
+
+DataQueue::DataQueue(int child_count) : _sub_queues(child_count),
_child_count(child_count) {
+ for (auto& sub : _sub_queues) {
+ sub = std::make_unique<SubQueue>();
}
_un_finished_counter = child_count;
- _sink_dependencies.resize(child_count, nullptr);
+}
+
+bool DataQueue::has_more_data() const {
+ return _cur_blocks_total_nums.load() > 0;
+}
+
+void DataQueue::set_source_dependency(std::shared_ptr<Dependency>
source_dependency)
+ NO_THREAD_SAFETY_ANALYSIS {
+ _source_dependency = std::move(source_dependency);
+}
+
+void DataQueue::set_sink_dependency(Dependency* sink_dependency, int
child_idx) {
+ _sub_queues[child_idx]->sink_dependency = sink_dependency;
+}
+
+void DataQueue::set_max_blocks_in_sub_queue(int64_t max_blocks) {
+ for (auto& sub : _sub_queues) {
+ sub->max_blocks_in_queue = max_blocks;
+ }
+}
+
+void DataQueue::set_low_memory_mode() {
+ _is_low_memory_mode = true;
+ for (auto& sub : _sub_queues) {
+ sub->max_blocks_in_queue = 1;
+ }
+ clear_free_blocks();
}
std::unique_ptr<Block> DataQueue::get_free_block(int child_idx) {
+ auto& sub = *_sub_queues[child_idx];
{
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_free_blocks_lock[child_idx]));
- if (!_free_blocks[child_idx].empty()) {
- auto block = std::move(_free_blocks[child_idx].front());
- _free_blocks[child_idx].pop_front();
+ LockGuard l(sub.free_lock);
+ if (!sub.free_blocks.empty()) {
+ auto block = std::move(sub.free_blocks.front());
+ sub.free_blocks.pop_front();
return block;
}
}
@@ -68,29 +139,24 @@ void DataQueue::push_free_block(std::unique_ptr<Block>
block, int child_idx) {
DCHECK(block->rows() == 0);
if (!_is_low_memory_mode) {
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_free_blocks_lock[child_idx]));
- _free_blocks[child_idx].emplace_back(std::move(block));
+ auto& sub = *_sub_queues[child_idx];
+ LockGuard l(sub.free_lock);
+ sub.free_blocks.emplace_back(std::move(block));
}
}
void DataQueue::clear_free_blocks() {
- for (size_t child_idx = 0; child_idx < _free_blocks.size(); ++child_idx) {
- std::lock_guard<std::mutex> l(*_free_blocks_lock[child_idx]);
+ for (auto& sub : _sub_queues) {
+ LockGuard l(sub->free_lock);
std::deque<std::unique_ptr<Block>> tmp_queue;
- _free_blocks[child_idx].swap(tmp_queue);
+ sub->free_blocks.swap(tmp_queue);
}
}
void DataQueue::terminate() {
- for (int i = 0; i < _queue_blocks.size(); i++) {
+ for (int i = 0; i < _child_count; ++i) {
set_finish(i);
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_queue_blocks_lock[i]));
- if (_cur_blocks_nums_in_queue[i] > 0) {
- _queue_blocks[i].clear();
- _cur_bytes_in_queue[i] = 0;
- _cur_blocks_nums_in_queue[i] = 0;
- _sink_dependencies[i]->set_always_ready();
- }
+ _sub_queues[i]->clear_blocks();
}
clear_free_blocks();
}
@@ -105,7 +171,7 @@ bool DataQueue::remaining_has_data() {
if (_flag_queue_idx == _child_count) {
_flag_queue_idx = 0;
}
- if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
+ if (_sub_queues[_flag_queue_idx]->blocks_in_queue.load() > 0) {
return true;
}
}
@@ -115,28 +181,17 @@ bool DataQueue::remaining_has_data() {
//the _flag_queue_idx indicate which queue has data, and in check can_read
//will be set idx in remaining_has_data function
Status DataQueue::get_block_from_queue(std::unique_ptr<Block>* output_block,
int* child_idx) {
- if (_is_canceled[_flag_queue_idx]) {
- return Status::InternalError("Current queue of idx {} have beed
canceled: ",
- _flag_queue_idx);
- }
+ const int idx = _flag_queue_idx;
+ auto& sub = *_sub_queues[idx];
- {
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_queue_blocks_lock[_flag_queue_idx]));
- if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
- *output_block = std::move(_queue_blocks[_flag_queue_idx].front());
- _queue_blocks[_flag_queue_idx].pop_front();
- if (child_idx) {
- *child_idx = _flag_queue_idx;
- }
- _cur_bytes_in_queue[_flag_queue_idx] -=
(*output_block)->allocated_bytes();
- _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
- if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0) {
- _sink_dependencies[_flag_queue_idx]->set_ready();
- }
- auto old_value = _cur_blocks_total_nums.fetch_sub(1);
- if (old_value == 1 && _source_dependency) {
- set_source_block();
- }
+ sub.try_pop(output_block);
+ if (*output_block) {
+ if (child_idx) {
+ *child_idx = idx;
+ }
+ auto old_total = _cur_blocks_total_nums.fetch_sub(1);
+ if (old_total == 1) {
+ set_source_block();
}
}
return Status::OK();
@@ -146,70 +201,44 @@ Status DataQueue::push_block(std::unique_ptr<Block>
block, int child_idx) {
if (!block) {
return Status::OK();
}
- {
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_queue_blocks_lock[child_idx]));
- if (_is_finished[child_idx]) {
- return Status::EndOfFile("Already finish");
- }
- _cur_bytes_in_queue[child_idx] += block->allocated_bytes();
- _queue_blocks[child_idx].emplace_back(std::move(block));
- _cur_blocks_nums_in_queue[child_idx] += 1;
-
- if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue) {
- _sink_dependencies[child_idx]->block();
- }
- _cur_blocks_total_nums++;
-
- set_source_ready();
+ auto& sub = *_sub_queues[child_idx];
+ // total_counter is incremented inside try_push under queue_lock, only
when the
+ // block is actually enqueued. This ensures get_block_from_queue() always
observes
+ // _cur_blocks_total_nums >= 1 when it successfully pops a block, with no
risk of
+ // underflow or the need for a rollback on failure.
+ if (!sub.try_push(std::move(block), _cur_blocks_total_nums)) {
+ return Status::EndOfFile("SubQueue already finished");
}
+ set_source_ready();
return Status::OK();
}
void DataQueue::set_finish(int child_idx) {
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_queue_blocks_lock[child_idx]));
- if (_is_finished[child_idx]) {
+ auto& sub = *_sub_queues[child_idx];
+ if (!sub.mark_finished(_un_finished_counter, _is_all_finished)) {
return;
}
- _is_finished[child_idx] = true;
- if (_un_finished_counter.fetch_sub(1) == 1) {
- _is_all_finished = true;
- }
- set_source_ready();
-}
-
-void DataQueue::set_canceled(int child_idx) {
- INJECT_MOCK_SLEEP(std::lock_guard<std::mutex>
l(*_queue_blocks_lock[child_idx]));
- DCHECK(!_is_finished[child_idx]);
- _is_canceled[child_idx] = true;
- _is_finished[child_idx] = true;
- if (_un_finished_counter.fetch_sub(1) == 1) {
- _is_all_finished = true;
- }
set_source_ready();
}
-bool DataQueue::is_finish(int child_idx) {
- return _is_finished[child_idx];
-}
-
bool DataQueue::is_all_finish() {
return _is_all_finished;
}
void DataQueue::set_source_ready() {
+ LockGuard lc(_source_lock);
if (_source_dependency) {
- std::unique_lock lc(_source_lock);
_source_dependency->set_ready();
}
}
void DataQueue::set_source_block() {
- if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
- std::unique_lock lc(_source_lock);
- // Performing the judgment twice, attempting to avoid blocking the
source as much as possible.
- if (_cur_blocks_total_nums == 0 && !is_all_finish()) {
- _source_dependency->block();
- }
+ // Re-check under _source_lock to avoid blocking the source when a
concurrent push
+ // has already added new blocks (or all children have finished) since we
observed
+ // the counter drop to zero.
+ LockGuard lc(_source_lock);
+ if (_source_dependency && _cur_blocks_total_nums == 0 && !is_all_finish())
{
+ _source_dependency->block();
}
}
diff --git a/be/src/exec/operator/data_queue.h
b/be/src/exec/operator/data_queue.h
index a3c34d86a1e..0def08c05c4 100644
--- a/be/src/exec/operator/data_queue.h
+++ b/be/src/exec/operator/data_queue.h
@@ -20,10 +20,10 @@
#include <cstdint>
#include <deque>
#include <memory>
-#include <mutex>
#include <vector>
#include "common/status.h"
+#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
namespace doris {
@@ -31,6 +31,52 @@ namespace doris {
class Dependency;
+// Per child sub-queue. Groups all parallel state so that the lock/field
+// relationship is explicit and can be checked by clang -Wthread-safety.
+struct SubQueue {
+ // Protects the `blocks` deque and serializes high-level state
+ // transitions (push/pop/finish/cancel) on this sub-queue.
+ AnnotatedMutex queue_lock;
+ std::deque<std::unique_ptr<Block>> blocks GUARDED_BY(queue_lock);
+
+ // The following fields are only accessed while holding queue_lock.
+ int64_t bytes_in_queue GUARDED_BY(queue_lock) = 0;
+ bool is_finished GUARDED_BY(queue_lock) = false;
+
+ // Protects the `free_blocks` deque only.
+ AnnotatedMutex free_lock;
+ std::deque<std::unique_ptr<Block>> free_blocks GUARDED_BY(free_lock);
+
+ // blocks_in_queue is readable from lock-free fast paths
(remaining_has_data),
+ // so it remains atomic and is intentionally not GUARDED_BY.
+ std::atomic_uint32_t blocks_in_queue {0};
+
+ // Maximum number of blocks allowed in this sub-queue before the sink is
blocked.
+ // Updated by DataQueue::set_max_blocks_in_sub_queue / set_low_memory_mode.
+ std::atomic_int64_t max_blocks_in_queue {1};
+
+ // Set once during init via set_sink_dependency, then read-only.
+ Dependency* sink_dependency = nullptr;
+
+ // Pop a block under queue_lock.
+ // Notifies sink_dependency->set_ready() (outside the lock) if the queue
becomes empty.
+ // output_block is null if the queue was empty.
+ void try_pop(std::unique_ptr<Block>* output_block);
+
+ // Push a block under queue_lock and atomically increment total_counter.
+ // Returns false (without incrementing) if already finished.
+ // Calls sink_dependency->block() (outside the lock) if the queue exceeds
max_blocks_in_queue.
+ bool try_push(std::unique_ptr<Block> block, std::atomic_uint32_t&
total_counter);
+
+ // Mark this sub-queue finished. Returns false if already finished
(idempotent).
+ // Decrements unfinished_counter and may set all_finished within
queue_lock.
+ bool mark_finished(std::atomic_uint32_t& unfinished_counter,
std::atomic_bool& all_finished);
+
+ // Clear all pending blocks under queue_lock.
+ // Calls sink_dependency->set_always_ready() (outside the lock) if any
blocks were cleared.
+ void clear_blocks();
+};
+
class DataQueue {
public:
//always one is enough, but in union node it's has more children
@@ -38,64 +84,37 @@ public:
~DataQueue() = default;
Status get_block_from_queue(std::unique_ptr<Block>* block, int* child_idx
= nullptr);
-
Status push_block(std::unique_ptr<Block> block, int child_idx = 0);
std::unique_ptr<Block> get_free_block(int child_idx = 0);
-
void push_free_block(std::unique_ptr<Block> output_block, int child_idx =
0);
- void clear_free_blocks();
-
void set_finish(int child_idx = 0);
- void set_canceled(int child_idx = 0); // should set before finish
- bool is_finish(int child_idx = 0);
bool is_all_finish();
// This function is not thread safe, should be called in
Operator::get_block()
bool remaining_has_data();
+ bool has_more_data() const;
- bool has_more_data() const { return _cur_blocks_total_nums.load() > 0; }
-
- int64_t max_bytes_in_queue() const { return _max_bytes_in_queue; }
- int64_t max_size_of_queue() const { return _max_size_of_queue; }
-
- void set_source_dependency(std::shared_ptr<Dependency> source_dependency) {
- _source_dependency = source_dependency;
- }
- void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
- _sink_dependencies[child_idx] = sink_dependency;
- }
-
- void set_source_ready();
- void set_source_block();
-
- void set_max_blocks_in_sub_queue(int64_t max_blocks) {
_max_blocks_in_sub_queue = max_blocks; }
-
- void set_low_memory_mode() {
- _is_low_memory_mode = true;
- _max_blocks_in_sub_queue = 1;
- clear_free_blocks();
- }
+ void set_source_dependency(std::shared_ptr<Dependency> source_dependency)
+ NO_THREAD_SAFETY_ANALYSIS;
+ void set_sink_dependency(Dependency* sink_dependency, int child_idx);
+ void set_max_blocks_in_sub_queue(int64_t max_blocks);
+ void set_low_memory_mode();
void terminate();
private:
- std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
- std::vector<std::deque<std::unique_ptr<Block>>> _queue_blocks;
+ void clear_free_blocks();
+ void set_source_ready();
+ void set_source_block();
- std::vector<std::unique_ptr<std::mutex>> _free_blocks_lock;
- std::vector<std::deque<std::unique_ptr<Block>>> _free_blocks;
+ std::vector<std::unique_ptr<SubQueue>> _sub_queues;
//how many deque will be init, always will be one
int _child_count = 0;
- std::vector<std::atomic_bool> _is_finished;
- std::atomic_uint32_t _un_finished_counter;
+ std::atomic_uint32_t _un_finished_counter = 0;
std::atomic_bool _is_all_finished = false;
- std::vector<std::atomic_bool> _is_canceled;
- // int64_t just for counter of profile
- std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
- std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
std::atomic_uint32_t _cur_blocks_total_nums = 0;
//this will be indicate which queue has data, it's useful when have many
queues
@@ -103,17 +122,11 @@ private:
// only used by streaming agg source operator
std::atomic_bool _is_low_memory_mode = false;
- std::atomic_int64_t _max_blocks_in_sub_queue = 1;
-
- //this only use to record the queue[0] for profile
- int64_t _max_bytes_in_queue = 0;
- int64_t _max_size_of_queue = 0;
- static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024L * 1024 * 1024 / 10;
- // data queue is multi sink one source
- std::shared_ptr<Dependency> _source_dependency = nullptr;
- std::vector<Dependency*> _sink_dependencies;
- std::mutex _source_lock;
+ // _source_dependency is written once during initialization
(set_source_dependency)
+ // and read/used only while holding _source_lock thereafter.
+ std::shared_ptr<Dependency> _source_dependency GUARDED_BY(_source_lock) =
nullptr;
+ AnnotatedMutex _source_lock;
};
#include "common/compile_check_end.h"
diff --git a/be/src/exec/operator/exchange_sink_operator.cpp
b/be/src/exec/operator/exchange_sink_operator.cpp
index ffbe0780d87..14061d610c4 100644
--- a/be/src/exec/operator/exchange_sink_operator.cpp
+++ b/be/src/exec/operator/exchange_sink_operator.cpp
@@ -25,7 +25,6 @@
#include <algorithm>
#include <cstdint>
#include <memory>
-#include <mutex>
#include <random>
#include <string>
@@ -217,7 +216,7 @@ void ExchangeSinkLocalState::_create_channels() {
}
void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
- std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+ LockGuard lock(_finished_channels_mutex);
if (_finished_channels.contains(channel_id)) {
LOG(WARNING) << "Query: " << print_id(_state->query_id())
diff --git a/be/src/exec/operator/exchange_sink_operator.h
b/be/src/exec/operator/exchange_sink_operator.h
index 9824e3aaf88..369f03ec6bf 100644
--- a/be/src/exec/operator/exchange_sink_operator.h
+++ b/be/src/exec/operator/exchange_sink_operator.h
@@ -22,9 +22,9 @@
#include <algorithm>
#include <atomic>
#include <memory>
-#include <mutex>
#include "common/status.h"
+#include "common/thread_safety_annotations.h"
#include "exec/exchange/exchange_writer.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/operator/exchange_sink_buffer.h"
@@ -180,8 +180,8 @@ private:
int _last_local_channel_idx = -1;
std::atomic_int _working_channels_count = 0;
- std::set<InstanceLoId> _finished_channels;
- std::mutex _finished_channels_mutex;
+ std::set<InstanceLoId> _finished_channels
GUARDED_BY(_finished_channels_mutex);
+ AnnotatedMutex _finished_channels_mutex;
};
class ExchangeSinkOperatorX MOCK_REMOVE(final) : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp
b/be/src/exec/operator/hashjoin_build_sink.cpp
index fa473a1ea2a..d2d7dcb42c5 100644
--- a/be/src/exec/operator/hashjoin_build_sink.cpp
+++ b/be/src/exec/operator/hashjoin_build_sink.cpp
@@ -67,7 +67,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_dependency->block();
_finish_dependency->block();
{
- std::lock_guard<std::mutex> guard(p._mutex);
+ LockGuard guard(p._mutex);
p._finish_dependencies.push_back(_finish_dependency);
}
} else {
@@ -243,10 +243,12 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
}
if (p._use_shared_hash_table) {
- std::unique_lock lock(p._mutex);
+ LockGuard lock(p._mutex);
// Only signal non-builder tasks when the builder actually built
the hash table.
// When the builder is terminated early, process_build_block() has
not initialized the
- // shared hash table or runtime filter wrappers, so non-builders
must return EOF.
+ // shared hash table or runtime filter wrappers. In that case the
hash table variant is
+ // still monostate, so signaling non-builders would make them
enter std::visit on
+ // monostate and crash with "Hash table type mismatch".
if (!_terminated) {
p._signaled = true;
}
diff --git a/be/src/exec/operator/hashjoin_build_sink.h
b/be/src/exec/operator/hashjoin_build_sink.h
index c6f492e8df7..8bde64603ce 100644
--- a/be/src/exec/operator/hashjoin_build_sink.h
+++ b/be/src/exec/operator/hashjoin_build_sink.h
@@ -17,6 +17,7 @@
#pragma once
+#include "common/thread_safety_annotations.h"
#include "exec/operator/join_build_sink_operator.h"
#include "exec/operator/operator.h"
#include "exec/runtime_filter/runtime_filter_producer_helper.h"
@@ -197,8 +198,8 @@ private:
bool _use_shared_hash_table = false;
std::atomic<bool> _signaled = false;
- std::mutex _mutex;
- std::vector<std::shared_ptr<Dependency>> _finish_dependencies;
+ AnnotatedMutex _mutex;
+ std::vector<std::shared_ptr<Dependency>> _finish_dependencies
GUARDED_BY(_mutex);
std::map<int, std::shared_ptr<RuntimeFilterWrapper>> _runtime_filters;
};
diff --git a/be/src/exec/operator/multi_cast_data_streamer.cpp
b/be/src/exec/operator/multi_cast_data_streamer.cpp
index 142f7800338..da3129b305c 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.cpp
+++ b/be/src/exec/operator/multi_cast_data_streamer.cpp
@@ -48,10 +48,11 @@ MultiCastBlock::MultiCastBlock(Block* block, int
un_finish_copy, size_t mem_size
block->clear();
}
-Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block*
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, Block*
block,
+ bool* eos) NO_THREAD_SAFETY_ANALYSIS {
MultiCastBlock* multi_cast_block = nullptr;
{
- INJECT_MOCK_SLEEP(std::unique_lock l(_mutex));
+ UniqueLock l(_mutex);
for (auto it = _spill_readers[sender_idx].begin();
it != _spill_readers[sender_idx].end();) {
if ((*it)->all_data_read) {
@@ -93,13 +94,15 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int
sender_idx, Block* b
auto spill_func = [this, reader_item, sender_idx]() {
Block block;
bool spill_eos = false;
+ bool has_cached_blocks = false;
size_t read_size = 0;
while (!spill_eos) {
RETURN_IF_ERROR(reader_item->reader->read(&block,
&spill_eos));
if (!block.empty()) {
- std::lock_guard l(_mutex);
+ LockGuard l(_mutex);
read_size += block.allocated_bytes();
_cached_blocks[sender_idx].emplace_back(std::move(block));
+ has_cached_blocks = true;
if (_cached_blocks[sender_idx].size() >= 32 ||
read_size > 2 * 1024 * 1024) {
break;
@@ -107,7 +110,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int
sender_idx, Block* b
}
}
- if (spill_eos || !_cached_blocks[sender_idx].empty()) {
+ if (spill_eos || has_cached_blocks) {
reader_item->all_data_read = spill_eos;
_set_ready_for_read(sender_idx);
}
@@ -159,7 +162,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState*
state, int32_t sender_id
block->get_by_position(i).column =
block->get_by_position(i).column->clone_resized(rows);
}
- INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
+ LockGuard l(_mutex);
multi_cast_block._un_finish_copy--;
auto copying_count = _copying_count.fetch_sub(1) - 1;
if (multi_cast_block._un_finish_copy == 0) {
@@ -293,7 +296,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state,
doris::Block* block, boo
const auto block_mem_size = block->allocated_bytes();
{
- INJECT_MOCK_SLEEP(std::lock_guard l(_mutex));
+ LockGuard l(_mutex);
if (_pending_block) {
DCHECK_GT(_pending_block->rows(), 0);
const auto pending_size = _pending_block->allocated_bytes();
@@ -346,7 +349,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state,
doris::Block* block, boo
_eos = eos;
}
- if (_eos) {
+ if (eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
@@ -377,7 +380,7 @@ std::string MultiCastDataStreamer::debug_string() {
size_t pos_at_end_count = 0;
size_t blocks_count = 0;
{
- std::unique_lock l(_mutex);
+ LockGuard l(_mutex);
blocks_count = _multi_cast_blocks.size();
for (int32_t i = 0; i != _cast_sender_count; ++i) {
if (!_dependencies[i]->is_blocked_by()) {
diff --git a/be/src/exec/operator/multi_cast_data_streamer.h
b/be/src/exec/operator/multi_cast_data_streamer.h
index 773576fb2b9..8d62ca78084 100644
--- a/be/src/exec/operator/multi_cast_data_streamer.h
+++ b/be/src/exec/operator/multi_cast_data_streamer.h
@@ -22,6 +22,7 @@
#include <string>
#include <vector>
+#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
#include "exec/exchange/vdata_stream_sender.h"
#include "exec/pipeline/dependency.h"
@@ -100,16 +101,16 @@ private:
Status _copy_block(RuntimeState* state, int32_t sender_idx, Block* block,
MultiCastBlock& multi_cast_block);
- Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file);
+ Status _start_spill_task(RuntimeState* state, SpillFileSPtr spill_file)
REQUIRES(_mutex);
- Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);
+ Status _trigger_spill_if_need(RuntimeState* state, bool* triggered)
REQUIRES(_mutex);
RuntimeProfile* _profile = nullptr;
- std::list<MultiCastBlock> _multi_cast_blocks;
- std::vector<std::vector<Block>> _cached_blocks;
- std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
- std::mutex _mutex;
- bool _eos = false;
+ std::list<MultiCastBlock> _multi_cast_blocks GUARDED_BY(_mutex);
+ std::vector<std::vector<Block>> _cached_blocks GUARDED_BY(_mutex);
+ std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read
GUARDED_BY(_mutex);
+ AnnotatedMutex _mutex;
+ bool _eos GUARDED_BY(_mutex) = false;
int _cast_sender_count = 0;
int _node_id;
std::atomic_int64_t _cumulative_mem_size = 0;
@@ -120,9 +121,9 @@ private:
Dependency* _write_dependency;
std::vector<Dependency*> _dependencies;
- BlockUPtr _pending_block;
+ BlockUPtr _pending_block GUARDED_BY(_mutex);
- std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers;
+ std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers
GUARDED_BY(_mutex);
RuntimeProfile* _sink_operator_profile;
// operator_profile of each source operator
diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp
b/be/src/exec/operator/partition_sort_sink_operator.cpp
index 0f605b70d0d..66ed84d021e 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.cpp
+++ b/be/src/exec/operator/partition_sort_sink_operator.cpp
@@ -128,7 +128,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, Block* input_block,
if (local_state._is_need_passthrough) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)current_rows);
- std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
+ LockGuard lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
local_state._dependency->set_ready_to_read();
@@ -159,8 +159,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, Block* input_block,
}
local_state._value_places[i]->_blocks.clear();
RETURN_IF_ERROR(sorter->prepare_for_read(false));
- INJECT_MOCK_SLEEP(std::unique_lock<std::mutex> lc(
- local_state._shared_state->prepared_finish_lock));
+ LockGuard lc(local_state._shared_state->prepared_finish_lock);
sorter->set_prepared_finish();
// iff one sorter have data, then could set source ready to read
local_state._dependency->set_ready_to_read();
@@ -171,7 +170,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, Block* input_block,
local_state._sorted_partition_input_rows);
//so all data from child have sink completed
{
- std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ LockGuard lc(local_state._shared_state->sink_eos_lock);
local_state._shared_state->sink_eos = true;
// this ready is also need, as source maybe block by self in some
case
local_state._dependency->set_ready_to_read();
@@ -262,8 +261,7 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)(row + 1));
- std::lock_guard<std::mutex> lock(
-
local_state._shared_state->buffer_mutex);
+ LockGuard
lock(local_state._shared_state->buffer_mutex);
// have emplace (num_rows - row) to hashtable,
and now have row remaining needed in block;
// set_num_rows(x) retains the range [0, x -
1], so row + 1 is needed here.
input_block->set_num_rows(row + 1);
diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp
b/be/src/exec/operator/partition_sort_source_operator.cpp
index eeaf4683c5a..2e3dad1d32a 100644
--- a/be/src/exec/operator/partition_sort_source_operator.cpp
+++ b/be/src/exec/operator/partition_sort_source_operator.cpp
@@ -41,17 +41,16 @@ Status
PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block* outpu
output_block->clear_column_data();
auto get_data_from_blocks_buffer = false;
{
- std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
+ LockGuard lock(local_state._shared_state->buffer_mutex);
get_data_from_blocks_buffer =
!local_state._shared_state->blocks_buffer.empty();
if (get_data_from_blocks_buffer) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
- if (local_state._shared_state->blocks_buffer.empty() &&
- !local_state._shared_state->sink_eos) {
+ if (local_state._shared_state->blocks_buffer.empty()) {
// add this mutex to check, as in some case maybe is doing
block(), and the sink is doing set eos.
// so have to hold mutex to set block(), avoid to sink have
set eos and set ready, but here set block() by mistake
- std::unique_lock<std::mutex>
lc(local_state._shared_state->sink_eos_lock);
+ LockGuard lc(local_state._shared_state->sink_eos_lock);
//if buffer have no data and sink not eos, block reading and
wait for signal again
if (!local_state._shared_state->sink_eos) {
local_state._dependency->block();
@@ -94,7 +93,7 @@ Status
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state, Block
if (current_eos) {
// current sort have eos, so get next idx
local_state._sort_idx++;
- std::unique_lock<std::mutex>
lc(local_state._shared_state->prepared_finish_lock);
+ LockGuard lc(local_state._shared_state->prepared_finish_lock);
if (local_state._sort_idx < sorter_size &&
!sorters[local_state._sort_idx]->prepared_finish()) {
local_state._dependency->block();
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 3d34d32ceb0..87c02aab2b6 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -75,7 +75,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
Status ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState*
state,
int&
arrived_rf_num) {
// Lock needed because _conjuncts can be accessed concurrently by multiple
scanner threads
- std::unique_lock lock(_conjuncts_lock);
+ LockGuard lock(_conjuncts_lock);
RETURN_IF_ERROR(_helper.try_append_late_arrival_runtime_filter(state,
_parent->row_descriptor(),
arrived_rf_num, _conjuncts));
if (state->enable_adjust_conjunct_order_by_cost()) {
@@ -88,7 +88,7 @@ Status
ScanLocalStateBase::update_late_arrival_runtime_filter(RuntimeState* stat
Status ScanLocalStateBase::clone_conjunct_ctxs(VExprContextSPtrs&
scanner_conjuncts) {
// Lock needed because _conjuncts can be accessed concurrently by multiple
scanner threads
- std::unique_lock lock(_conjuncts_lock);
+ LockGuard lock(_conjuncts_lock);
scanner_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, scanner_conjuncts[i]));
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index d6e2407a8d2..cfffa5a50b3 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -18,11 +18,11 @@
#pragma once
#include <cstdint>
-#include <mutex>
#include <set>
#include <string>
#include "common/status.h"
+#include "common/thread_safety_annotations.h"
#include "core/field.h"
#include "exec/common/util.hpp"
#include "exec/operator/operator.h"
@@ -126,7 +126,7 @@ protected:
RuntimeProfile::Counter* _scan_rows = nullptr;
RuntimeProfile::Counter* _scan_bytes = nullptr;
- std::mutex _conjuncts_lock;
+ AnnotatedMutex _conjuncts_lock;
RuntimeFilterConsumerHelper _helper;
// magic number as seed to generate hash value for condition cache
uint64_t _condition_cache_digest = 0;
diff --git a/be/src/exec/pipeline/dependency.h
b/be/src/exec/pipeline/dependency.h
index 28c89b5b990..2c6e3b9aef2 100644
--- a/be/src/exec/pipeline/dependency.h
+++ b/be/src/exec/pipeline/dependency.h
@@ -35,6 +35,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/thread_safety_annotations.h"
#include "core/block/block.h"
#include "core/types.h"
#include "exec/common/agg_utils.h"
@@ -507,10 +508,10 @@ struct AnalyticSharedState : public BasicSharedState {
public:
AnalyticSharedState() = default;
- std::queue<Block> blocks_buffer;
- std::mutex buffer_mutex;
- bool sink_eos = false;
- std::mutex sink_eos_lock;
+ std::queue<Block> blocks_buffer GUARDED_BY(buffer_mutex);
+ AnnotatedMutex buffer_mutex;
+ bool sink_eos GUARDED_BY(sink_eos_lock) = false;
+ AnnotatedMutex sink_eos_lock;
Arena agg_arena_pool;
};
@@ -598,12 +599,12 @@ struct NestedLoopJoinSharedState : public JoinSharedState
{
struct PartitionSortNodeSharedState : public BasicSharedState {
ENABLE_FACTORY_CREATOR(PartitionSortNodeSharedState)
public:
- std::queue<Block> blocks_buffer;
- std::mutex buffer_mutex;
+ std::queue<Block> blocks_buffer GUARDED_BY(buffer_mutex);
+ AnnotatedMutex buffer_mutex;
std::vector<std::unique_ptr<PartitionSorter>> partition_sorts;
- bool sink_eos = false;
- std::mutex sink_eos_lock;
- std::mutex prepared_finish_lock;
+ bool sink_eos GUARDED_BY(sink_eos_lock) = false;
+ AnnotatedMutex sink_eos_lock;
+ AnnotatedMutex prepared_finish_lock;
};
struct SetSharedState : public BasicSharedState {
diff --git a/be/test/exec/pipeline/data_queue_test.cpp
b/be/test/exec/pipeline/data_queue_test.cpp
index 07ef7723632..f8ba26bc664 100644
--- a/be/test/exec/pipeline/data_queue_test.cpp
+++ b/be/test/exec/pipeline/data_queue_test.cpp
@@ -20,6 +20,7 @@
#include <gtest/gtest.h>
#include <memory>
+#include <thread>
#include <vector>
#include "core/data_type/data_type_number.h"
@@ -27,6 +28,173 @@
namespace doris {
+// ---------------------------------------------------------------------------
+// Helpers
+// ---------------------------------------------------------------------------
+
+static std::unique_ptr<Block> make_block(size_t rows = 1) {
+ auto block = std::make_unique<Block>();
+ auto col = ColumnUInt8::create(rows);
+ block->insert(ColumnWithTypeAndName {std::move(col),
std::make_shared<DataTypeUInt8>(), ""});
+ return block;
+}
+
+// Create a Dependency that starts ready.
+static std::shared_ptr<Dependency> make_dep(bool initially_ready = true) {
+ return Dependency::create_shared(1, 1, "Test", initially_ready);
+}
+
+// ---------------------------------------------------------------------------
+// SubQueue tests
+// ---------------------------------------------------------------------------
+
+class SubQueueTest : public testing::Test {
+public:
+ void SetUp() override {
+ dep = make_dep(/*initially_ready=*/true);
+ sub = std::make_unique<SubQueue>();
+ sub->sink_dependency = dep.get();
+ sub->max_blocks_in_queue = 2;
+ }
+
+ std::shared_ptr<Dependency> dep;
+ std::unique_ptr<SubQueue> sub;
+ std::atomic_uint32_t counter_ {0};
+};
+
+// Pop from an empty queue returns OK with null output.
+TEST_F(SubQueueTest, TryPopEmpty) {
+ std::unique_ptr<Block> out;
+ sub->try_pop(&out);
+ EXPECT_EQ(out, nullptr);
+ EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+}
+
+// Basic push then pop returns the block.
+TEST_F(SubQueueTest, TryPushPop_Basic) {
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_EQ(sub->blocks_in_queue.load(), 1u);
+
+ std::unique_ptr<Block> out;
+ sub->try_pop(&out);
+ EXPECT_NE(out, nullptr);
+ EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+}
+
+// push after mark_finished returns EndOfFile.
+TEST_F(SubQueueTest, TryPushAfterFinished) {
+ std::atomic_uint32_t counter {1};
+ std::atomic_bool all_done {false};
+ sub->mark_finished(counter, all_done);
+
+ EXPECT_FALSE(sub->try_push(make_block(), counter_));
+}
+
+// When blocks.size() exceeds max_blocks_in_queue, sink is blocked.
+TEST_F(SubQueueTest, SinkBlockedWhenFull) {
+ sub->max_blocks_in_queue = 2;
+ dep->set_ready(); // start ready
+
+ // Push up to the limit — sink should stay ready.
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_TRUE(dep->ready());
+
+ // Push one over the limit — sink should be blocked.
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_FALSE(dep->ready());
+}
+
+// Sink wakes up only when the queue becomes completely empty.
+TEST_F(SubQueueTest, SinkReadyWhenQueueEmpty) {
+ sub->max_blocks_in_queue = 2;
+
+ // Fill to 3 (one over limit) → sink blocked.
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_FALSE(dep->ready());
+
+ // Pop 1 & 2: queue not empty yet → sink still blocked.
+ std::unique_ptr<Block> out;
+ sub->try_pop(&out);
+ EXPECT_NE(out, nullptr);
+ EXPECT_FALSE(dep->ready());
+
+ sub->try_pop(&out);
+ EXPECT_NE(out, nullptr);
+ EXPECT_FALSE(dep->ready());
+
+ // Pop 3: queue empty → set_ready().
+ sub->try_pop(&out);
+ EXPECT_NE(out, nullptr);
+ EXPECT_TRUE(dep->ready());
+}
+
+// mark_finished is idempotent: second call returns false and counter stays
correct.
+TEST_F(SubQueueTest, MarkFinishedIdempotent) {
+ std::atomic_uint32_t counter {2};
+ std::atomic_bool all_done {false};
+
+ EXPECT_TRUE(sub->mark_finished(counter, all_done));
+ EXPECT_EQ(counter.load(), 1u);
+ EXPECT_FALSE(all_done.load());
+
+ EXPECT_FALSE(sub->mark_finished(counter, all_done));
+ EXPECT_EQ(counter.load(), 1u); // unchanged
+}
+
+// mark_finished sets all_finished when last child finishes.
+TEST_F(SubQueueTest, MarkFinishedSetsAllFinished) {
+ std::atomic_uint32_t counter {1};
+ std::atomic_bool all_done {false};
+ sub->mark_finished(counter, all_done);
+ EXPECT_TRUE(all_done.load());
+}
+
+// clear_blocks empties the queue and calls set_always_ready on sink.
+TEST_F(SubQueueTest, ClearBlocksEmptiesQueue) {
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_TRUE(sub->try_push(make_block(), counter_));
+ EXPECT_EQ(sub->blocks_in_queue.load(), 2u);
+
+ sub->clear_blocks();
+
+ EXPECT_EQ(sub->blocks_in_queue.load(), 0u);
+ // set_always_ready was called → dep is always ready.
+ EXPECT_TRUE(dep->ready());
+}
+
+// clear_blocks on an empty queue is a no-op (set_always_ready not called).
+TEST_F(SubQueueTest, ClearBlocksNoop) {
+ dep->block(); // start blocked
+ sub->clear_blocks();
+ EXPECT_FALSE(dep->ready()); // still blocked — clear_blocks did nothing
+}
+
+// bytes_in_queue tracks push/pop correctly.
+TEST_F(SubQueueTest, BytesInQueueTracking) {
+ auto block = make_block(10);
+ int64_t expected_bytes = block->allocated_bytes();
+
+ EXPECT_TRUE(sub->try_push(std::move(block), counter_));
+ {
+ LockGuard l(sub->queue_lock);
+ EXPECT_EQ(sub->bytes_in_queue, expected_bytes);
+ }
+
+ std::unique_ptr<Block> out;
+ sub->try_pop(&out);
+ {
+ LockGuard l(sub->queue_lock);
+ EXPECT_EQ(sub->bytes_in_queue, 0);
+ }
+}
+
+// ---------------------------------------------------------------------------
+// DataQueue fixtures
+// ---------------------------------------------------------------------------
+
class DataQueueTest : public testing::Test {
public:
DataQueueTest() = default;
@@ -47,6 +215,140 @@ public:
const int child_count = 3;
};
+// ---------------------------------------------------------------------------
+// DataQueue unit tests
+// ---------------------------------------------------------------------------
+
+// Initial state: no data, no finish.
+TEST_F(DataQueueTest, InitialState) {
+ EXPECT_FALSE(data_queue->has_more_data());
+ EXPECT_FALSE(data_queue->is_all_finish());
+ EXPECT_FALSE(data_queue->remaining_has_data());
+}
+
+// Push one block and retrieve it.
+TEST_F(DataQueueTest, SinglePushPop) {
+ EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+ EXPECT_TRUE(data_queue->has_more_data());
+
+ // Find the queue with data.
+ EXPECT_TRUE(data_queue->remaining_has_data());
+
+ std::unique_ptr<Block> out;
+ int child_idx = -1;
+ EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
+ EXPECT_NE(out, nullptr);
+ EXPECT_EQ(child_idx, 0);
+ EXPECT_FALSE(data_queue->has_more_data());
+}
+
+// is_all_finish only becomes true after all children call set_finish.
+TEST_F(DataQueueTest, IsAllFinishAfterAllChildren) {
+ data_queue->set_finish(0);
+ EXPECT_FALSE(data_queue->is_all_finish());
+ data_queue->set_finish(1);
+ EXPECT_FALSE(data_queue->is_all_finish());
+ data_queue->set_finish(2);
+ EXPECT_TRUE(data_queue->is_all_finish());
+}
+
+// set_finish is idempotent.
+TEST_F(DataQueueTest, SetFinishIdempotent) {
+ data_queue->set_finish(0);
+ data_queue->set_finish(0); // second call must not double-decrement
+ data_queue->set_finish(1);
+ data_queue->set_finish(2);
+ EXPECT_TRUE(data_queue->is_all_finish());
+}
+
+// child_idx returned by get_block_from_queue reflects the actual queue.
+TEST_F(DataQueueTest, ChildIdxReturned) {
+ // Push to child 1 only.
+ EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
+ data_queue->remaining_has_data(); // advance _flag_queue_idx to find child
1
+
+ std::unique_ptr<Block> out;
+ int child_idx = -1;
+ EXPECT_TRUE(data_queue->get_block_from_queue(&out, &child_idx).ok());
+ EXPECT_NE(out, nullptr);
+ EXPECT_EQ(child_idx, 1);
+}
+
+// get_free_block returns a new block when free list is empty, reuses when not.
+TEST_F(DataQueueTest, FreeBlockReuse) {
+ // First call: allocates a new block.
+ auto block = data_queue->get_free_block(0);
+ EXPECT_NE(block, nullptr);
+
+ // Return it to the free list.
+ block->clear(); // ensure rows == 0
+ data_queue->push_free_block(std::move(block), 0);
+
+ // Second call: must return the recycled block.
+ auto block2 = data_queue->get_free_block(0);
+ EXPECT_NE(block2, nullptr);
+}
+
+// In low-memory mode push_free_block discards blocks and max drops to 1.
+TEST_F(DataQueueTest, LowMemoryMode) {
+ // Pre-populate the free list.
+ data_queue->push_free_block(Block::create_unique(), 0);
+
+ data_queue->set_low_memory_mode();
+
+ // Free list must be cleared.
+ auto block = data_queue->get_free_block(0);
+ // The free list is empty → a fresh block is returned (not from the list).
+ EXPECT_NE(block, nullptr);
+
+ // push_free_block now discards.
+ block->clear();
+ data_queue->push_free_block(std::move(block), 0);
+ auto block2 = data_queue->get_free_block(0);
+ // Still gets a fresh allocation (free list stays empty).
+ EXPECT_NE(block2, nullptr);
+}
+
+// terminate() finishes all children and clears pending blocks from sub-queues.
+TEST_F(DataQueueTest, Terminate) {
+ EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+ EXPECT_TRUE(data_queue->push_block(make_block(), 1).ok());
+
+ data_queue->terminate();
+
+ EXPECT_TRUE(data_queue->is_all_finish());
+ // remaining_has_data() checks blocks_in_queue per sub-queue,
+ // which clear_blocks() resets to 0.
+ EXPECT_FALSE(data_queue->remaining_has_data());
+}
+
+// set_max_blocks_in_sub_queue propagates to every sub-queue.
+TEST_F(DataQueueTest, SetMaxBlocksInSubQueue) {
+ data_queue->set_max_blocks_in_sub_queue(5);
+ // Push 5 blocks to child 0 — sink must stay ready (not over the limit
yet).
+ for (int i = 0; i < 5; i++) {
+ EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+ }
+ EXPECT_TRUE(sink_deps[0]->ready());
+
+ // 6th push exceeds limit → sink blocked.
+ EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+ EXPECT_FALSE(sink_deps[0]->ready());
+}
+
+// Source dependency is notified ready when a block is pushed.
+TEST_F(DataQueueTest, SourceReadyOnPush) {
+ source_dep->block(); // start blocked
+ EXPECT_FALSE(source_dep->ready());
+
+ EXPECT_TRUE(data_queue->push_block(make_block(), 0).ok());
+ EXPECT_TRUE(source_dep->ready());
+}
+
+// ---------------------------------------------------------------------------
+// Multi-threaded integration test (existing)
+// ---------------------------------------------------------------------------
+
TEST_F(DataQueueTest, MultiTest) {
int output_count = 0;
auto output_func = [&]() {
@@ -107,14 +409,7 @@ TEST_F(DataQueueTest, MultiTest) {
output1.join();
EXPECT_EQ(output_count, 150);
- for (int i = 0; i < 3; i++) {
- EXPECT_TRUE(data_queue->is_finish(i));
- }
EXPECT_TRUE(data_queue->is_all_finish());
- data_queue->clear_free_blocks();
- for (int i = 0; i < 3; i++) {
- EXPECT_TRUE(data_queue->_free_blocks[i].empty());
- }
}
// ./run-be-ut.sh --run --filter=DataQueueTest.*
diff --git a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
index d50523605d0..61589327909 100644
--- a/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
+++ b/be/test/exec/pipeline/multi_cast_data_streamer_test.cpp
@@ -316,13 +316,16 @@ TEST_F(MultiCastDataStreamerTest, SpillTest) {
output2.join();
output3.join();
- ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0);
- ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0);
+ {
+ LockGuard l(multi_cast_data_streamer->_mutex);
+ ASSERT_EQ(multi_cast_data_streamer->_multi_cast_blocks.size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[0].size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[1].size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_cached_blocks[2].size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_spill_readers[0].size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_spill_readers[1].size(), 0);
+ ASSERT_EQ(multi_cast_data_streamer->_spill_readers[2].size(), 0);
+ }
auto debug_string = multi_cast_data_streamer->debug_string();
EXPECT_TRUE(debug_string.find("MemSize:") != std::string::npos);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]