This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7d3a2c4f4 [fix](be) Avoid local runtime filter merge deadlock
(#64866)
9d7d3a2c4f4 is described below
commit 9d7d3a2c4f437e0c2962bde1a49848ed0535063c
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 1 09:30:05 2026 +0800
[fix](be) Avoid local runtime filter merge deadlock (#64866)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Local runtime filter merge can deadlock when one join
build instance publishes a local-merge runtime filter while another
instance sends its runtime filter size. The old local merge context lock
protected both the merger and the producer list, so one path could hold
a producer runtime filter lock and then wait for the context lock while
another path held the context lock and then waited for a producer lock.
This change gives RuntimeFilterMerger its own internal synchronization
and makes LocalMergeContext expose a snapshot of the merger and
producers. Publish, send-size, and sync-size paths take the context lock
only while copying that snapshot, then merge filters or update producer
sizes outside the context lock. RuntimeFilterMerger returns the ready
transition from merge_from directly, removing the separate unlocked
ready check.
### Release note
None
### Check List (For Author)
- Test: Unit Test
- build-support/clang-format.sh
be/src/exec/runtime_filter/runtime_filter_merger.h
be/src/exec/runtime_filter/runtime_filter_mgr.cpp
be/src/exec/runtime_filter/runtime_filter_mgr.h
be/src/exec/runtime_filter/runtime_filter_producer.cpp
be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
- git diff --cached --check
- ./run-be-ut.sh --run --filter=RuntimeFilterMgrTest.*
- ./run-be-ut.sh --run --filter=RuntimeFilterMergerTest.*
- Behavior changed: No
- Does this need documentation: No
---
be/src/exec/runtime_filter/runtime_filter_merger.h | 27 +++-
be/src/exec/runtime_filter/runtime_filter_mgr.cpp | 142 ++++++++++++---------
be/src/exec/runtime_filter/runtime_filter_mgr.h | 35 ++---
.../runtime_filter/runtime_filter_producer.cpp | 42 +++---
be/src/runtime/runtime_state.cpp | 2 +-
.../runtime_filter/runtime_filter_merger_test.cpp | 26 ++--
.../runtime_filter/runtime_filter_mgr_test.cpp | 33 ++---
7 files changed, 181 insertions(+), 126 deletions(-)
diff --git a/be/src/exec/runtime_filter/runtime_filter_merger.h
b/be/src/exec/runtime_filter/runtime_filter_merger.h
index 9405d2405a5..a7bd3605f71 100644
--- a/be/src/exec/runtime_filter/runtime_filter_merger.h
+++ b/be/src/exec/runtime_filter/runtime_filter_merger.h
@@ -17,6 +17,8 @@
#pragma once
+#include <algorithm>
+
#include "exec/runtime_filter/runtime_filter.h"
#include "exec/runtime_filter/runtime_filter_definitions.h"
#include "exprs/vexpr.h"
@@ -46,6 +48,7 @@ public:
}
std::string debug_string() override {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
return fmt::format(
"Merger: ({}, expected_producer_num: {},
received_producer_num: {}, "
"received_rf_size_num: {}, received_sum_size: {})",
@@ -54,12 +57,15 @@ public:
}
// If input is a disabled predicate, the final result is a disabled
predicate.
- Status merge_from(const RuntimeFilter* other) {
+ // Returns true only for the call that makes the merger ready.
+ Status merge_from(const RuntimeFilter* other, bool* ready) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_producer_num++;
if (_expected_producer_num < _received_producer_num) {
return Status::InternalError(
"runtime filter merger input product more than expected,
{}", debug_string());
}
+ *ready = _received_producer_num == _expected_producer_num;
if (_received_producer_num == _expected_producer_num) {
_rf_state = State::READY;
}
@@ -71,18 +77,26 @@ public:
return st;
}
- void set_expected_producer_num(int num) {
+ // Only raise the expected producer count. RuntimeFilterMgr may compute the
+ // count under its own lock and apply it after releasing that lock, so
+ // concurrent registrations can update the merger out of order.
+ void increase_expected_producer_num(int num) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_received_producer_num > 0 || _received_rf_size_num > 0) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"runtime filter merger set expected producer after
receive data, {}",
debug_string());
}
- _expected_producer_num = num;
+ _expected_producer_num = std::max(_expected_producer_num, num);
}
- int get_expected_producer_num() const { return _expected_producer_num; }
+ int get_expected_producer_num() {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ return _expected_producer_num;
+ }
bool add_rf_size(uint64_t size) {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
_received_rf_size_num++;
if (_expected_producer_num < _received_rf_size_num) {
throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -93,7 +107,10 @@ public:
return (_received_rf_size_num == _expected_producer_num);
}
- uint64_t get_received_sum_size() const { return _received_sum_size; }
+ uint64_t get_received_sum_size() {
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ return _received_sum_size;
+ }
bool ready() const { return _rf_state == State::READY; }
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
index f8e687f09b7..9de74e5acb3 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.cpp
@@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const bool is_global)
std::vector<std::shared_ptr<RuntimeFilterConsumer>>
RuntimeFilterMgr::get_consume_filters(
int filter_id) {
- std::lock_guard<std::mutex> l(_lock);
+ LockGuard l(_lock);
auto iter = _consumer_map.find(filter_id);
if (iter == _consumer_map.end()) {
return {};
@@ -69,13 +69,17 @@ Status RuntimeFilterMgr::register_consumer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
- std::lock_guard<std::mutex> l(_lock);
- RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id,
consumer));
- _consumer_map[key].push_back(*consumer);
+ std::shared_ptr<RuntimeFilterConsumer> new_consumer;
+ RETURN_IF_ERROR(RuntimeFilterConsumer::create(state, &desc, node_id,
&new_consumer));
+ {
+ LockGuard l(_lock);
+ _consumer_map[key].push_back(new_consumer);
+ }
+ *consumer = new_consumer;
return Status::OK();
}
-Status RuntimeFilterMgr::register_local_merger_producer_filter(
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
const QueryContext* query_ctx, const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer) {
if (!_is_global) [[unlikely]] {
@@ -90,58 +94,57 @@ Status
RuntimeFilterMgr::register_local_merger_producer_filter(
}
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
+ uint32_t producer_stage = producer->stage();
- LocalMergeContext* context;
+ std::shared_ptr<LocalMergeContext> context;
+ std::shared_ptr<RuntimeFilterMerger> merger;
+ int expected_producer_num = 0;
{
- std::lock_guard<std::mutex> l(_lock);
- context = &_local_merge_map[key]; // may inplace construct default
object
- }
+ LockGuard l(_lock);
+ auto iter = _local_merge_map.find(key);
+ if (iter == _local_merge_map.end() || !iter->second ||
+ producer_stage > iter->second->stage) {
+ auto new_context = std::make_shared<LocalMergeContext>();
+ RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, &desc,
&new_context->merger));
+ new_context->stage = producer_stage;
+ _local_merge_map.insert_or_assign(key, new_context);
+ context = new_context;
+ } else {
+ context = iter->second;
+ }
- RETURN_IF_ERROR(context->register_producer(query_ctx, &desc, producer));
- return Status::OK();
-}
+ context->producers.emplace_back(producer);
+ merger = context->merger;
+ expected_producer_num = cast_set<int>(context->producers.size());
+ }
-Status LocalMergeContext::register_producer(const QueryContext* query_ctx,
- const TRuntimeFilterDesc* desc,
-
std::shared_ptr<RuntimeFilterProducer> producer) {
- std::lock_guard<std::mutex> l(mtx);
- if (producer->stage() > stage) {
- // New recursive CTE round: discard stale merger and producers from
- // the previous round and recreate the merger for the new round.
- merger.reset();
- producers.clear();
- stage = producer->stage();
- }
- if (!merger) {
- RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx, desc, &merger));
- }
- producers.emplace_back(producer);
- merger->set_expected_producer_num(cast_set<int>(producers.size()));
+ merger->increase_expected_producer_num(expected_producer_num);
// Sync the local merger's stage from the producer so that outgoing merge
RPCs
// (via _push_to_remote) carry the correct recursive CTE round number.
- merger->set_stage(producer->stage());
+ merger->set_stage(producer_stage);
return Status::OK();
}
-Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
- LocalMergeContext**
local_merge_filters) {
+Status RuntimeFilterMgr::get_local_merge_context(int filter_id, uint32_t
expected_stage,
+
std::shared_ptr<LocalMergeContext>* context) {
if (!_is_global) [[unlikely]] {
return Status::InternalError(
"A local merge filter can not be registered in Local
RuntimeFilterMgr");
}
- std::lock_guard<std::mutex> l(_lock);
+ context->reset();
+ LockGuard l(_lock);
auto iter = _local_merge_map.find(filter_id);
if (iter == _local_merge_map.end()) {
- // Filter may have been removed during a recursive CTE stage reset.
// Return OK with nullptr to let the caller skip gracefully.
- *local_merge_filters = nullptr;
return Status::OK();
}
- *local_merge_filters = &iter->second;
- if (!iter->second.merger) {
- return Status::InternalError("local merge context merger is nullptr
for filter_id: {}",
- filter_id);
+ if (!iter->second) {
+ return Status::InternalError("local merge context is nullptr for
filter_id: {}", filter_id);
+ }
+ if (expected_stage != iter->second->stage) {
+ return Status::OK();
}
+ *context = iter->second;
return Status::OK();
}
@@ -155,18 +158,28 @@ Status RuntimeFilterMgr::register_producer_filter(
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
- std::lock_guard<std::mutex> l(_lock);
- if (_producer_id_set.contains(key)) {
- return Status::InvalidArgument("filter {} has been registered", key);
+ {
+ LockGuard l(_lock);
+ if (_producer_id_set.contains(key)) {
+ return Status::InvalidArgument("filter {} has been registered",
key);
+ }
+ }
+ std::shared_ptr<RuntimeFilterProducer> new_producer;
+ RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc,
&new_producer));
+ {
+ LockGuard l(_lock);
+ if (_producer_id_set.contains(key)) {
+ return Status::InvalidArgument("filter {} has been registered",
key);
+ }
+ _producer_id_set.insert(key);
}
- RETURN_IF_ERROR(RuntimeFilterProducer::create(query_ctx, &desc, producer));
- _producer_id_set.insert(key);
+ *producer = new_producer;
return Status::OK();
}
bool RuntimeFilterMgr::set_runtime_filter_params(
const TRuntimeFilterParams& runtime_filter_params) {
- std::lock_guard l(_lock);
+ LockGuard l(_lock);
if (!_has_merge_addr) {
_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
_has_merge_addr = true;
@@ -199,7 +212,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->targetv2_info = targetv2_info;
RETURN_IF_ERROR(
RuntimeFilterMerger::create(query_ctx.get(), runtime_filter_desc,
&cnt_val->merger));
- cnt_val->merger->set_expected_producer_num(producer_size);
+ cnt_val->merger->increase_expected_producer_num(producer_size);
return Status::OK();
}
@@ -304,13 +317,13 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
}
Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest*
request) {
- LocalMergeContext* local_merge_filters = nullptr;
- RETURN_IF_ERROR(get_local_merge_producer_filters(request->filter_id(),
&local_merge_filters));
- if (local_merge_filters == nullptr) {
+ std::shared_ptr<LocalMergeContext> context;
+ RETURN_IF_ERROR(get_local_merge_context(request->filter_id(),
request->stage(), &context));
+ if (!context) {
// Filter was removed during a recursive CTE stage reset; discard
stale request.
return Status::OK();
}
- for (auto producer : local_merge_filters->producers) {
+ for (const auto& producer : context->producers) {
producer->set_synced_size(request->filter_size());
}
return Status::OK();
@@ -318,18 +331,32 @@ Status RuntimeFilterMgr::sync_filter_size(const
PSyncFilterSizeRequest* request)
std::string RuntimeFilterMgr::debug_string() {
std::string result = "Local Merger Info:\n";
- std::lock_guard l(_lock);
- for (const auto& [filter_id, ctx] : _local_merge_map) {
+ struct LocalMergeContextSnapshot {
+ std::shared_ptr<RuntimeFilterMerger> merger;
+ std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
+ };
+ std::vector<LocalMergeContextSnapshot> local_merge_contexts;
+ std::vector<std::shared_ptr<RuntimeFilterConsumer>> consumers;
+ {
+ LockGuard l(_lock);
+ for (const auto& [filter_id, ctx] : _local_merge_map) {
+ DORIS_CHECK(ctx);
+ DORIS_CHECK(ctx->merger);
+ local_merge_contexts.push_back({ctx->merger, ctx->producers});
+ }
+ for (const auto& [filter_id, filter_consumers] : _consumer_map) {
+ consumers.insert(consumers.end(), filter_consumers.begin(),
filter_consumers.end());
+ }
+ }
+ for (const auto& ctx : local_merge_contexts) {
result += fmt::format("{}\n", ctx.merger->debug_string());
for (const auto& producer : ctx.producers) {
result += fmt::format("{}\n", producer->debug_string());
}
}
result += "Consumer Info:\n";
- for (const auto& [filter_id, consumers] : _consumer_map) {
- for (const auto& consumer : consumers) {
- result += fmt::format("{}\n", consumer->debug_string());
- }
+ for (const auto& consumer : consumers) {
+ result += fmt::format("{}\n", consumer->debug_string());
}
return result;
}
@@ -373,10 +400,9 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::shared_ptr<QueryContext> q
RETURN_IF_ERROR(tmp_filter->assign(*request, attach_data));
- RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get()));
+ RETURN_IF_ERROR(cnt_val.merger->merge_from(tmp_filter.get(),
&is_ready));
cnt_val.arrive_id.insert(UniqueId(request->fragment_instance_id()));
- is_ready = cnt_val.merger->ready(); // update is_ready in locked scope
}
if (is_ready) {
@@ -491,7 +517,7 @@ Status GlobalMergeContext::reset(QueryContext* query_ctx) {
DORIS_CHECK(merger);
int producer_size = merger->get_expected_producer_num();
RETURN_IF_ERROR(RuntimeFilterMerger::create(query_ctx,
&runtime_filter_desc, &merger));
- merger->set_expected_producer_num(producer_size);
+ merger->increase_expected_producer_num(producer_size);
arrive_id.clear();
source_addrs.clear();
sync_size_callbacks.clear();
diff --git a/be/src/exec/runtime_filter/runtime_filter_mgr.h
b/be/src/exec/runtime_filter/runtime_filter_mgr.h
index 536eb63e152..e5c6494917c 100644
--- a/be/src/exec/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/exec/runtime_filter/runtime_filter_mgr.h
@@ -56,15 +56,12 @@ class HandleErrorBrpcCallback;
class SyncSizeCallback;
struct LocalMergeContext {
- std::mutex mtx;
std::shared_ptr<RuntimeFilterMerger> merger;
std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
// Tracks the recursive CTE round. When a producer from a newer round
- // registers, the context is reset (merger recreated, old producers
dropped).
+ // registers, RuntimeFilterMgr replaces the whole context and old in-flight
+ // users keep the previous context alive through shared_ptr.
uint32_t stage = 0;
-
- Status register_producer(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc,
- std::shared_ptr<RuntimeFilterProducer> producer);
};
struct GlobalMergeContext {
@@ -98,11 +95,12 @@ public:
int node_id,
std::shared_ptr<RuntimeFilterConsumer>*
consumer_filter);
- Status register_local_merger_producer_filter(const QueryContext* query_ctx,
- const TRuntimeFilterDesc&
desc,
-
std::shared_ptr<RuntimeFilterProducer> producer);
+ Status register_local_merge_producer_filter(const QueryContext* query_ctx,
+ const TRuntimeFilterDesc& desc,
+
std::shared_ptr<RuntimeFilterProducer> producer);
- Status get_local_merge_producer_filters(int filter_id, LocalMergeContext**
local_merge_filters);
+ Status get_local_merge_context(int filter_id, uint32_t expected_stage,
+ std::shared_ptr<LocalMergeContext>*
context);
// Create local producer. This producer is hold by
RuntimeFilterProducerHelper.
Status register_producer_filter(const QueryContext* query_ctx, const
TRuntimeFilterDesc& desc,
@@ -116,10 +114,10 @@ public:
std::string debug_string();
void remove_filter(int32_t filter_id) {
- std::lock_guard<std::mutex> l(_lock);
+ LockGuard l(_lock);
_consumer_map.erase(filter_id);
- // NOTE: _local_merge_map is NOT erased here. It is reset lazily in
- // LocalMergeContext::register_producer when a producer from a newer
+ // NOTE: _local_merge_map is NOT erased here. It is replaced lazily in
+ // register_local_merge_producer_filter when a producer from a newer
// recursive CTE round registers. Erasing eagerly here would race with
// multi-fragment REBUILD: a consumer-only fragment's remove_filter
could
// delete the entry that the producer fragment just re-registered.
@@ -142,16 +140,21 @@ private:
// RuntimeFilterMgr is owned by RuntimeState, so we only
// use filter_id as key
// key: "filter-id"
- std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>>
_consumer_map;
- std::set<int32_t> _producer_id_set;
- std::map<int32_t, LocalMergeContext> _local_merge_map;
+ // Protects fields marked GUARDED_BY(_lock). While holding this lock, only
+ // access RuntimeFilterMgr-owned state or copy shared_ptr snapshots; do not
+ // call methods on existing RuntimeFilter objects, because RF objects have
+ // their own locks and may call back into RuntimeFilterMgr.
+ AnnotatedMutex _lock;
+ std::map<int32_t, std::vector<std::shared_ptr<RuntimeFilterConsumer>>>
_consumer_map
+ GUARDED_BY(_lock);
+ std::set<int32_t> _producer_id_set GUARDED_BY(_lock);
+ std::map<int32_t, std::shared_ptr<LocalMergeContext>> _local_merge_map
GUARDED_BY(_lock);
std::unique_ptr<MemTracker> _tracker;
TNetworkAddress _merge_addr;
bool _has_merge_addr = false;
- std::mutex _lock;
};
// controller -> <query-id, entity>
diff --git a/be/src/exec/runtime_filter/runtime_filter_producer.cpp
b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
index 7da8c687112..5a342911627 100644
--- a/be/src/exec/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_producer.cpp
@@ -53,16 +53,16 @@ Status RuntimeFilterProducer::publish(RuntimeState* state,
bool build_hash_table
// when global consumer not exist, send_to_local_targets will do
nothing, so merge rf is useless
return Status::OK();
}
- LocalMergeContext* context = nullptr;
-
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
- _wrapper->filter_id(), &context));
- if (context == nullptr) {
+ std::shared_ptr<LocalMergeContext> context;
+
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
+ _wrapper->filter_id(), _stage, &context));
+ if (!context) {
// Filter was removed during a recursive CTE stage reset; this
producer is stale.
return Status::OK();
}
- std::lock_guard l(context->mtx);
- RETURN_IF_ERROR(context->merger->merge_from(this));
- if (context->merger->ready()) {
+ bool ready = false;
+ RETURN_IF_ERROR(context->merger->merge_from(this, &ready));
+ if (ready) {
if (_has_remote_target) {
RETURN_IF_ERROR(_send_to_remote_targets(state,
context->merger.get()));
} else {
@@ -123,26 +123,26 @@ Status RuntimeFilterProducer::send_size(RuntimeState*
state, uint64_t local_filt
set_state(State::WAITING_FOR_SYNCED_SIZE);
if (_need_do_merge(state)) {
- LocalMergeContext* merger_context = nullptr;
-
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
- _wrapper->filter_id(), &merger_context));
- if (merger_context == nullptr) {
+ std::shared_ptr<LocalMergeContext> context;
+
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_context(
+ _wrapper->filter_id(), _stage, &context));
+ if (!context) {
// Filter was removed during a recursive CTE stage reset; this
producer is stale.
return Status::OK();
}
- std::lock_guard merger_lock(merger_context->mtx);
- if (merger_context->merger->add_rf_size(local_filter_size)) {
- if (!_has_remote_target) {
- for (auto filter : merger_context->producers) {
-
filter->set_synced_size(merger_context->merger->get_received_sum_size());
- }
- return Status::OK();
- } else {
- local_filter_size =
merger_context->merger->get_received_sum_size();
+ uint64_t received_sum_size = 0;
+ bool ready_to_sync = context->merger->add_rf_size(local_filter_size);
+ if (!ready_to_sync) {
+ return Status::OK();
+ }
+ received_sum_size = context->merger->get_received_sum_size();
+ if (!_has_remote_target) {
+ for (const auto& filter : context->producers) {
+ filter->set_synced_size(received_sum_size);
}
- } else {
return Status::OK();
}
+ local_filter_size = received_sum_size;
} else if (!_has_remote_target) {
set_synced_size(local_filter_size);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 54989511eb8..5dfd027d42d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -499,7 +499,7 @@ Status RuntimeState::register_producer_runtime_filter(
DORIS_CHECK(pfc);
(*producer_filter)->set_stage(pfc->rec_cte_stage());
}
-
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
+
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
_query_ctx, desc, *producer_filter));
return Status::OK();
}
diff --git a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
index da9d5b66291..3d0bb701c8b 100644
--- a/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_merger_test.cpp
@@ -35,15 +35,17 @@ public:
auto desc = TRuntimeFilterDescBuilder().build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger));
- merger->set_expected_producer_num(2);
+ merger->increase_expected_producer_num(2);
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state,
RuntimeFilterWrapper::State::UNINITED);
+ bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(first_product_state);
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(),
&ready));
+ ASSERT_FALSE(ready);
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, first_expected_state);
@@ -51,7 +53,8 @@ public:
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
producer2->set_wrapper_state_and_ready_to_publish(second_product_state);
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get()));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer2.get(),
&ready));
+ ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, second_expected_state);
}
@@ -63,15 +66,17 @@ public:
std::shared_ptr<RuntimeFilterMerger> merger;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
RuntimeFilterMerger::create(_query_ctx.get(), &desc, &merger));
- merger->set_expected_producer_num(1);
+ merger->increase_expected_producer_num(1);
ASSERT_FALSE(merger->ready());
+ bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer->init(123));
producer->set_wrapper_state_and_ready_to_publish(state);
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get()));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(),
&ready));
+ ASSERT_TRUE(ready);
ASSERT_TRUE(merger->ready());
PMergeFilterRequest request;
@@ -99,7 +104,7 @@ TEST_F(RuntimeFilterMergerTest, add_rf_size) {
std::shared_ptr<RuntimeFilterMerger> merger;
auto desc = TRuntimeFilterDescBuilder().build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(),
&desc, &merger));
- merger->set_expected_producer_num(2);
+ merger->increase_expected_producer_num(2);
ASSERT_FALSE(merger->add_rf_size(123));
ASSERT_TRUE(merger->add_rf_size(1));
@@ -118,22 +123,25 @@ TEST_F(RuntimeFilterMergerTest, invalid_merge) {
std::shared_ptr<RuntimeFilterMerger> merger;
auto desc = TRuntimeFilterDescBuilder().build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(_query_ctx.get(),
&desc, &merger));
- merger->set_expected_producer_num(1);
+ merger->increase_expected_producer_num(1);
ASSERT_FALSE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::UNINITED);
+ bool ready = false;
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[0]->register_producer_runtime_filter(desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get())); //
ready wrapper
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(merger->merge_from(producer.get(),
&ready));
+ ASSERT_TRUE(ready);
+ ASSERT_TRUE(merger->ready());
ASSERT_EQ(merger->_wrapper->_state, RuntimeFilterWrapper::State::READY);
std::shared_ptr<RuntimeFilterProducer> producer2;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_states[1]->register_producer_runtime_filter(desc,
&producer2));
producer2->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
- auto st = merger->merge_from(producer2.get());
+ auto st = merger->merge_from(producer2.get(), &ready);
ASSERT_EQ(st.code(), ErrorCode::INTERNAL_ERROR);
}
diff --git a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
index d6ccc080961..5a50e762c25 100644
--- a/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_mgr_test.cpp
@@ -77,12 +77,12 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
// producer_filter should not be nullptr
EXPECT_FALSE(
global_runtime_filter_mgr
- ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
+ ->register_local_merge_producer_filter(ctx.get(),
desc, producer_filter)
.ok());
// local merge filter should not be registered in local mgr
EXPECT_FALSE(
local_runtime_filter_mgr
- ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
+ ->register_local_merge_producer_filter(ctx.get(),
desc, producer_filter)
.ok());
// producer should not registered in global mgr
EXPECT_FALSE(global_runtime_filter_mgr
@@ -103,28 +103,29 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
.ok());
EXPECT_NE(producer_filter, nullptr);
- LocalMergeContext* local_merge_filters = nullptr;
+ std::shared_ptr<LocalMergeContext> context;
// filter_id not yet registered: global mgr returns OK with nullptr
// (graceful skip for recursive CTE stage reset).
EXPECT_TRUE(global_runtime_filter_mgr
- ->get_local_merge_producer_filters(filter_id,
&local_merge_filters)
+ ->get_local_merge_context(filter_id,
producer_filter->stage(), &context)
.ok());
- EXPECT_EQ(local_merge_filters, nullptr);
+ EXPECT_EQ(context, nullptr);
// local mgr always returns error (not supported)
- EXPECT_FALSE(local_runtime_filter_mgr
- ->get_local_merge_producer_filters(filter_id,
&local_merge_filters)
- .ok());
- // Register local merge filter
- EXPECT_TRUE(
- global_runtime_filter_mgr
- ->register_local_merger_producer_filter(ctx.get(),
desc, producer_filter)
+ EXPECT_FALSE(
+ local_runtime_filter_mgr
+ ->get_local_merge_context(filter_id,
producer_filter->stage(), &context)
.ok());
+ // Register local merge filter
+ EXPECT_TRUE(global_runtime_filter_mgr
+ ->register_local_merge_producer_filter(ctx.get(),
desc, producer_filter)
+ .ok());
EXPECT_TRUE(global_runtime_filter_mgr
- ->get_local_merge_producer_filters(filter_id,
&local_merge_filters)
+ ->get_local_merge_context(filter_id,
producer_filter->stage(), &context)
.ok());
- EXPECT_NE(local_merge_filters, nullptr);
- EXPECT_EQ(local_merge_filters->producers.size(), 1);
- local_merge_filters->producers.front()->_rf_state =
+ EXPECT_NE(context, nullptr);
+ EXPECT_NE(context->merger, nullptr);
+ EXPECT_EQ(context->producers.size(), 1);
+ context->producers.front()->_rf_state =
RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE;
}
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]