This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5ee537f3007cbd01b772c1ad46c342fb60e8aec2 Author: Gabriel <[email protected]> AuthorDate: Wed Mar 5 12:42:04 2025 +0800 test RuntimeFilterMgr (#48672) --- be/src/runtime/runtime_state.cpp | 9 +- be/src/runtime_filter/runtime_filter_mgr.cpp | 58 +++++---- be/src/runtime_filter/runtime_filter_mgr.h | 17 ++- be/test/runtime_filter/runtime_filter_mgr_test.cpp | 143 +++++++++++++++++++++ 4 files changed, 193 insertions(+), 34 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index e71a7c50b60..42294438127 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -499,10 +499,10 @@ Status RuntimeState::register_producer_runtime_filter( RuntimeProfile* parent_profile) { // Producers are created by local runtime filter mgr and shared by global runtime filter manager. // When RF is published, consumers in both global and local RF mgr will be found. - RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, parent_profile)); + RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, producer_filter, + parent_profile)); RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter( - desc, query_options(), *producer_filter, &_profile)); + desc, *producer_filter, &_profile)); return Status::OK(); } @@ -511,8 +511,7 @@ Status RuntimeState::register_consumer_runtime_filter( std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, RuntimeProfile* parent_profile) { bool need_merge = desc.has_remote_targets || need_local_merge; RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); - return mgr->register_consumer_filter(desc, query_options(), node_id, consumer_filter, - need_merge, parent_profile); + return mgr->register_consumer_filter(desc, node_id, consumer_filter, parent_profile); } bool RuntimeState::is_nereids() const { diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp b/be/src/runtime_filter/runtime_filter_mgr.cpp index a9603285f3f..1eb7db6804d 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.cpp +++ b/be/src/runtime_filter/runtime_filter_mgr.cpp @@ -70,28 +70,31 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> RuntimeFilterMgr::get_consum return iter->second; } -Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, int node_id, +Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer, - bool need_local_merge, RuntimeProfile* parent_profile) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); - DCHECK(!(_is_global xor need_local_merge)) - << " _is_global: " << _is_global << " need_local_merge: " << need_local_merge; RETURN_IF_ERROR( RuntimeFilterConsumer::create(_state, &desc, node_id, consumer, parent_profile)); _consumer_map[key].push_back(*consumer); - return Status::OK(); } Status RuntimeFilterMgr::register_local_merger_producer_filter( - const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr<RuntimeFilterProducer> producer, RuntimeProfile* parent_profile) { - DCHECK(_is_global); + const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer> producer, + RuntimeProfile* parent_profile) { + if (!_is_global) [[unlikely]] { + return Status::InternalError( + "A local merge filter can not be registered in Local RuntimeFilterMgr"); + } + if (producer == nullptr) [[unlikely]] { + return Status::InternalError( + "Producer should be created in local RuntimeFilterMgr before registered in Global " + "RuntimeFilterMgr"); + } SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; @@ -102,21 +105,29 @@ Status RuntimeFilterMgr::register_local_merger_producer_filter( } DCHECK(_state != nullptr); - { - std::lock_guard<std::mutex> l(context->mtx); - if (!context->merger) { - RETURN_IF_ERROR( - RuntimeFilterMerger::create(_state, &desc, &context->merger, parent_profile)); - } - context->producers.emplace_back(producer); - context->merger->set_expected_producer_num(context->producers.size()); + RETURN_IF_ERROR(context->register_producer(_state, &desc, parent_profile, producer)); + return Status::OK(); +} + +Status LocalMergeContext::register_producer(RuntimeFilterParamsContext* state, + const TRuntimeFilterDesc* desc, + RuntimeProfile* parent_profile, + std::shared_ptr<RuntimeFilterProducer> producer) { + std::lock_guard<std::mutex> l(mtx); + if (!merger) { + RETURN_IF_ERROR(RuntimeFilterMerger::create(state, desc, &merger, parent_profile)); } + producers.emplace_back(producer); + merger->set_expected_producer_num(producers.size()); return Status::OK(); } Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters) { - DCHECK(_is_global); + if (!_is_global) [[unlikely]] { + return Status::InternalError( + "A local merge filter can not be registered in Local RuntimeFilterMgr"); + } std::lock_guard<std::mutex> l(_lock); auto iter = _local_merge_map.find(filter_id); if (iter == _local_merge_map.end()) { @@ -131,10 +142,12 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id, } Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, - const TQueryOptions& options, std::shared_ptr<RuntimeFilterProducer>* producer, RuntimeProfile* parent_profile) { - DCHECK(!_is_global); + if (_is_global) [[unlikely]] { + return Status::InternalError( + "A local producer filter should not be registered in Global RuntimeFilterMgr"); + } SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; DCHECK(_state != nullptr); @@ -148,17 +161,18 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc return Status::OK(); } -void RuntimeFilterMgr::set_runtime_filter_params( +bool RuntimeFilterMgr::set_runtime_filter_params( const TRuntimeFilterParams& runtime_filter_params) { std::lock_guard l(_lock); if (!_has_merge_addr) { _merge_addr = runtime_filter_params.runtime_filter_merge_addr; _has_merge_addr = true; + return true; } + return false; } Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { - DCHECK(_has_merge_addr); if (_has_merge_addr) { *addr = this->_merge_addr; return Status::OK(); diff --git a/be/src/runtime_filter/runtime_filter_mgr.h b/be/src/runtime_filter/runtime_filter_mgr.h index 896a987fad5..7709fda7969 100644 --- a/be/src/runtime_filter/runtime_filter_mgr.h +++ b/be/src/runtime_filter/runtime_filter_mgr.h @@ -60,6 +60,10 @@ struct LocalMergeContext { std::mutex mtx; std::shared_ptr<RuntimeFilterMerger> merger; std::vector<std::shared_ptr<RuntimeFilterProducer>> producers; + + Status register_producer(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc, + RuntimeProfile* parent_profile, + std::shared_ptr<RuntimeFilterProducer> producer); }; struct GlobalMergeContext { @@ -83,24 +87,23 @@ public: // get/set consumer std::vector<std::shared_ptr<RuntimeFilterConsumer>> get_consume_filters(int filter_id); - Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - int node_id, + Status register_consumer_filter(const TRuntimeFilterDesc& desc, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, - bool need_local_merge, RuntimeProfile* parent_profile); + RuntimeProfile* parent_profile); Status register_local_merger_producer_filter( - const TRuntimeFilterDesc& desc, const TQueryOptions& options, - std::shared_ptr<RuntimeFilterProducer> producer_filter, RuntimeProfile* parent_profile); + const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer> producer_filter, + RuntimeProfile* parent_profile); Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** local_merge_filters); // Create local producer. This producer is hold by RuntimeFilterProducerHelper. - Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, + Status register_producer_filter(const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer>* producer_filter, RuntimeProfile* parent_profile); // update filter by remote - void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); + bool set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params); Status get_merge_addr(TNetworkAddress* addr); Status sync_filter_size(const PSyncFilterSizeRequest* request); diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp b/be/test/runtime_filter/runtime_filter_mgr_test.cpp new file mode 100644 index 00000000000..2ba6ce9948c --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_mgr.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "pipeline/thrift_builder.h" +#include "runtime/query_context.h" +#include "runtime_filter/runtime_filter_producer.h" +#include "testutil/mock/mock_runtime_state.h" + +namespace doris { + +class RuntimeFilterMgrTest : public testing::Test { +public: + RuntimeFilterMgrTest() = default; + ~RuntimeFilterMgrTest() override = default; + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) { + auto filter_id = 0; + std::shared_ptr<RuntimeFilterMgr> global_runtime_filter_mgr; + std::shared_ptr<RuntimeFilterMgr> local_runtime_filter_mgr; + std::shared_ptr<QueryContext> ctx; + RuntimeState state; + auto profile = std::make_shared<RuntimeProfile>("Test"); + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + { + // Create + auto query_options = TQueryOptionsBuilder().build(); + auto fe_address = TNetworkAddress(); + fe_address.hostname = BackendOptions::get_localhost(); + fe_address.port = config::brpc_port; + ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), query_options, fe_address, + true, fe_address, QuerySource::INTERNAL_FRONTEND); + state._query_ctx = ctx.get(); + + global_runtime_filter_mgr = std::make_shared<RuntimeFilterMgr>( + TUniqueId(), RuntimeFilterParamsContext::create(ctx.get()), + ctx->query_mem_tracker(), true); + local_runtime_filter_mgr = std::make_shared<RuntimeFilterMgr>( + TUniqueId(), RuntimeFilterParamsContext::create(&state), ctx->query_mem_tracker(), + false); + } + + { + // Get / Register consumer + EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); + std::shared_ptr<RuntimeFilterConsumer> consumer_filter; + EXPECT_TRUE(global_runtime_filter_mgr + ->register_consumer_filter(desc, 0, &consumer_filter, profile.get()) + .ok()); + EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty()); + } + + { + // Get / Register producer + + std::shared_ptr<RuntimeFilterProducer> producer_filter; + // producer_filter should not be nullptr + EXPECT_FALSE(global_runtime_filter_mgr + ->register_local_merger_producer_filter(desc, producer_filter, + profile.get()) + .ok()); + // local merge filter should not be registered in local mgr + EXPECT_FALSE(local_runtime_filter_mgr + ->register_local_merger_producer_filter(desc, producer_filter, + profile.get()) + .ok()); + // producer should not registered in global mgr + EXPECT_FALSE(global_runtime_filter_mgr + ->register_producer_filter(desc, &producer_filter, profile.get()) + .ok()); + EXPECT_EQ(producer_filter, nullptr); + // Register in local mgr + EXPECT_TRUE(local_runtime_filter_mgr + ->register_producer_filter(desc, &producer_filter, profile.get()) + .ok()); + EXPECT_NE(producer_filter, nullptr); + // Register in local mgr twice + EXPECT_FALSE(local_runtime_filter_mgr + ->register_producer_filter(desc, &producer_filter, profile.get()) + .ok()); + EXPECT_NE(producer_filter, nullptr); + + LocalMergeContext* local_merge_filters = nullptr; + EXPECT_FALSE(global_runtime_filter_mgr + ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + .ok()); + EXPECT_FALSE(local_runtime_filter_mgr + ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + .ok()); + // Register local merge filter + EXPECT_TRUE(global_runtime_filter_mgr + ->register_local_merger_producer_filter(desc, producer_filter, + profile.get()) + .ok()); + EXPECT_TRUE(global_runtime_filter_mgr + ->get_local_merge_producer_filters(filter_id, &local_merge_filters) + .ok()); + EXPECT_NE(local_merge_filters, nullptr); + EXPECT_EQ(local_merge_filters->producers.size(), 1); + local_merge_filters->producers.front()->_rf_state = + RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE; + } + { + TNetworkAddress addr; + EXPECT_FALSE(global_runtime_filter_mgr->get_merge_addr(&addr).ok()); + + TRuntimeFilterParams param; + TNetworkAddress new_addr; + param.__set_runtime_filter_merge_addr(new_addr); + EXPECT_TRUE(global_runtime_filter_mgr->set_runtime_filter_params(param)); + EXPECT_FALSE(global_runtime_filter_mgr->set_runtime_filter_params(param)); + EXPECT_TRUE(global_runtime_filter_mgr->get_merge_addr(&addr).ok()); + } + { + PSyncFilterSizeRequest request; + request.set_filter_id(filter_id); + request.set_filter_size(16); + EXPECT_TRUE(global_runtime_filter_mgr->sync_filter_size(&request).ok()); + } +} + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
