This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5ee537f3007cbd01b772c1ad46c342fb60e8aec2
Author: Gabriel <[email protected]>
AuthorDate: Wed Mar 5 12:42:04 2025 +0800

    test RuntimeFilterMgr (#48672)
---
 be/src/runtime/runtime_state.cpp                   |   9 +-
 be/src/runtime_filter/runtime_filter_mgr.cpp       |  58 +++++----
 be/src/runtime_filter/runtime_filter_mgr.h         |  17 ++-
 be/test/runtime_filter/runtime_filter_mgr_test.cpp | 143 +++++++++++++++++++++
 4 files changed, 193 insertions(+), 34 deletions(-)

diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e71a7c50b60..42294438127 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -499,10 +499,10 @@ Status RuntimeState::register_producer_runtime_filter(
         RuntimeProfile* parent_profile) {
     // Producers are created by local runtime filter mgr and shared by global 
runtime filter manager.
     // When RF is published, consumers in both global and local RF mgr will be 
found.
-    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
-            desc, query_options(), producer_filter, parent_profile));
+    RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, 
producer_filter,
+                                                                         
parent_profile));
     
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merger_producer_filter(
-            desc, query_options(), *producer_filter, &_profile));
+            desc, *producer_filter, &_profile));
     return Status::OK();
 }
 
@@ -511,8 +511,7 @@ Status RuntimeState::register_consumer_runtime_filter(
         std::shared_ptr<RuntimeFilterConsumer>* consumer_filter, 
RuntimeProfile* parent_profile) {
     bool need_merge = desc.has_remote_targets || need_local_merge;
     RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : 
local_runtime_filter_mgr();
-    return mgr->register_consumer_filter(desc, query_options(), node_id, 
consumer_filter,
-                                         need_merge, parent_profile);
+    return mgr->register_consumer_filter(desc, node_id, consumer_filter, 
parent_profile);
 }
 
 bool RuntimeState::is_nereids() const {
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp 
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index a9603285f3f..1eb7db6804d 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -70,28 +70,31 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
RuntimeFilterMgr::get_consum
     return iter->second;
 }
 
-Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc,
-                                                  const TQueryOptions& 
options, int node_id,
+Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& 
desc, int node_id,
                                                   
std::shared_ptr<RuntimeFilterConsumer>* consumer,
-                                                  bool need_local_merge,
                                                   RuntimeProfile* 
parent_profile) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
     std::lock_guard<std::mutex> l(_lock);
-    DCHECK(!(_is_global xor need_local_merge))
-            << " _is_global: " << _is_global << " need_local_merge: " << 
need_local_merge;
     RETURN_IF_ERROR(
             RuntimeFilterConsumer::create(_state, &desc, node_id, consumer, 
parent_profile));
     _consumer_map[key].push_back(*consumer);
-
     return Status::OK();
 }
 
 Status RuntimeFilterMgr::register_local_merger_producer_filter(
-        const TRuntimeFilterDesc& desc, const TQueryOptions& options,
-        std::shared_ptr<RuntimeFilterProducer> producer, RuntimeProfile* 
parent_profile) {
-    DCHECK(_is_global);
+        const TRuntimeFilterDesc& desc, std::shared_ptr<RuntimeFilterProducer> 
producer,
+        RuntimeProfile* parent_profile) {
+    if (!_is_global) [[unlikely]] {
+        return Status::InternalError(
+                "A local merge filter can not be registered in Local 
RuntimeFilterMgr");
+    }
+    if (producer == nullptr) [[unlikely]] {
+        return Status::InternalError(
+                "Producer should be created in local RuntimeFilterMgr before 
registered in Global "
+                "RuntimeFilterMgr");
+    }
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
@@ -102,21 +105,29 @@ Status 
RuntimeFilterMgr::register_local_merger_producer_filter(
     }
 
     DCHECK(_state != nullptr);
-    {
-        std::lock_guard<std::mutex> l(context->mtx);
-        if (!context->merger) {
-            RETURN_IF_ERROR(
-                    RuntimeFilterMerger::create(_state, &desc, 
&context->merger, parent_profile));
-        }
-        context->producers.emplace_back(producer);
-        context->merger->set_expected_producer_num(context->producers.size());
+    RETURN_IF_ERROR(context->register_producer(_state, &desc, parent_profile, 
producer));
+    return Status::OK();
+}
+
+Status LocalMergeContext::register_producer(RuntimeFilterParamsContext* state,
+                                            const TRuntimeFilterDesc* desc,
+                                            RuntimeProfile* parent_profile,
+                                            
std::shared_ptr<RuntimeFilterProducer> producer) {
+    std::lock_guard<std::mutex> l(mtx);
+    if (!merger) {
+        RETURN_IF_ERROR(RuntimeFilterMerger::create(state, desc, &merger, 
parent_profile));
     }
+    producers.emplace_back(producer);
+    merger->set_expected_producer_num(producers.size());
     return Status::OK();
 }
 
 Status RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
                                                           LocalMergeContext** 
local_merge_filters) {
-    DCHECK(_is_global);
+    if (!_is_global) [[unlikely]] {
+        return Status::InternalError(
+                "A local merge filter can not be registered in Local 
RuntimeFilterMgr");
+    }
     std::lock_guard<std::mutex> l(_lock);
     auto iter = _local_merge_map.find(filter_id);
     if (iter == _local_merge_map.end()) {
@@ -131,10 +142,12 @@ Status 
RuntimeFilterMgr::get_local_merge_producer_filters(int filter_id,
 }
 
 Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& 
desc,
-                                                  const TQueryOptions& options,
                                                   
std::shared_ptr<RuntimeFilterProducer>* producer,
                                                   RuntimeProfile* 
parent_profile) {
-    DCHECK(!_is_global);
+    if (_is_global) [[unlikely]] {
+        return Status::InternalError(
+                "A local producer filter should not be registered in Global 
RuntimeFilterMgr");
+    }
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
     DCHECK(_state != nullptr);
@@ -148,17 +161,18 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
     return Status::OK();
 }
 
-void RuntimeFilterMgr::set_runtime_filter_params(
+bool RuntimeFilterMgr::set_runtime_filter_params(
         const TRuntimeFilterParams& runtime_filter_params) {
     std::lock_guard l(_lock);
     if (!_has_merge_addr) {
         _merge_addr = runtime_filter_params.runtime_filter_merge_addr;
         _has_merge_addr = true;
+        return true;
     }
+    return false;
 }
 
 Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) {
-    DCHECK(_has_merge_addr);
     if (_has_merge_addr) {
         *addr = this->_merge_addr;
         return Status::OK();
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h 
b/be/src/runtime_filter/runtime_filter_mgr.h
index 896a987fad5..7709fda7969 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -60,6 +60,10 @@ struct LocalMergeContext {
     std::mutex mtx;
     std::shared_ptr<RuntimeFilterMerger> merger;
     std::vector<std::shared_ptr<RuntimeFilterProducer>> producers;
+
+    Status register_producer(RuntimeFilterParamsContext* state, const 
TRuntimeFilterDesc* desc,
+                             RuntimeProfile* parent_profile,
+                             std::shared_ptr<RuntimeFilterProducer> producer);
 };
 
 struct GlobalMergeContext {
@@ -83,24 +87,23 @@ public:
 
     // get/set consumer
     std::vector<std::shared_ptr<RuntimeFilterConsumer>> 
get_consume_filters(int filter_id);
-    Status register_consumer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
-                                    int node_id,
+    Status register_consumer_filter(const TRuntimeFilterDesc& desc, int 
node_id,
                                     std::shared_ptr<RuntimeFilterConsumer>* 
consumer_filter,
-                                    bool need_local_merge, RuntimeProfile* 
parent_profile);
+                                    RuntimeProfile* parent_profile);
 
     Status register_local_merger_producer_filter(
-            const TRuntimeFilterDesc& desc, const TQueryOptions& options,
-            std::shared_ptr<RuntimeFilterProducer> producer_filter, 
RuntimeProfile* parent_profile);
+            const TRuntimeFilterDesc& desc, 
std::shared_ptr<RuntimeFilterProducer> producer_filter,
+            RuntimeProfile* parent_profile);
 
     Status get_local_merge_producer_filters(int filter_id, LocalMergeContext** 
local_merge_filters);
 
     // Create local producer. This producer is hold by 
RuntimeFilterProducerHelper.
-    Status register_producer_filter(const TRuntimeFilterDesc& desc, const 
TQueryOptions& options,
+    Status register_producer_filter(const TRuntimeFilterDesc& desc,
                                     std::shared_ptr<RuntimeFilterProducer>* 
producer_filter,
                                     RuntimeProfile* parent_profile);
 
     // update filter by remote
-    void set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
+    bool set_runtime_filter_params(const TRuntimeFilterParams& 
runtime_filter_params);
     Status get_merge_addr(TNetworkAddress* addr);
     Status sync_filter_size(const PSyncFilterSizeRequest* request);
 
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp 
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
new file mode 100644
index 00000000000..2ba6ce9948c
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_mgr.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "pipeline/thrift_builder.h"
+#include "runtime/query_context.h"
+#include "runtime_filter/runtime_filter_producer.h"
+#include "testutil/mock/mock_runtime_state.h"
+
+namespace doris {
+
+class RuntimeFilterMgrTest : public testing::Test {
+public:
+    RuntimeFilterMgrTest() = default;
+    ~RuntimeFilterMgrTest() override = default;
+    void SetUp() override {}
+    void TearDown() override {}
+};
+
+TEST_F(RuntimeFilterMgrTest, TestGlobalMgr) {
+    auto filter_id = 0;
+    std::shared_ptr<RuntimeFilterMgr> global_runtime_filter_mgr;
+    std::shared_ptr<RuntimeFilterMgr> local_runtime_filter_mgr;
+    std::shared_ptr<QueryContext> ctx;
+    RuntimeState state;
+    auto profile = std::make_shared<RuntimeProfile>("Test");
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    {
+        // Create
+        auto query_options = TQueryOptionsBuilder().build();
+        auto fe_address = TNetworkAddress();
+        fe_address.hostname = BackendOptions::get_localhost();
+        fe_address.port = config::brpc_port;
+        ctx = QueryContext::create(TUniqueId(), ExecEnv::GetInstance(), 
query_options, fe_address,
+                                   true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        state._query_ctx = ctx.get();
+
+        global_runtime_filter_mgr = std::make_shared<RuntimeFilterMgr>(
+                TUniqueId(), RuntimeFilterParamsContext::create(ctx.get()),
+                ctx->query_mem_tracker(), true);
+        local_runtime_filter_mgr = std::make_shared<RuntimeFilterMgr>(
+                TUniqueId(), RuntimeFilterParamsContext::create(&state), 
ctx->query_mem_tracker(),
+                false);
+    }
+
+    {
+        // Get / Register consumer
+        
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
+        std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
+        EXPECT_TRUE(global_runtime_filter_mgr
+                            ->register_consumer_filter(desc, 0, 
&consumer_filter, profile.get())
+                            .ok());
+        
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
+    }
+
+    {
+        // Get / Register producer
+
+        std::shared_ptr<RuntimeFilterProducer> producer_filter;
+        // producer_filter should not be nullptr
+        EXPECT_FALSE(global_runtime_filter_mgr
+                             ->register_local_merger_producer_filter(desc, 
producer_filter,
+                                                                     
profile.get())
+                             .ok());
+        // local merge filter should not be registered in local mgr
+        EXPECT_FALSE(local_runtime_filter_mgr
+                             ->register_local_merger_producer_filter(desc, 
producer_filter,
+                                                                     
profile.get())
+                             .ok());
+        // producer should not registered in global mgr
+        EXPECT_FALSE(global_runtime_filter_mgr
+                             ->register_producer_filter(desc, 
&producer_filter, profile.get())
+                             .ok());
+        EXPECT_EQ(producer_filter, nullptr);
+        // Register in local mgr
+        EXPECT_TRUE(local_runtime_filter_mgr
+                            ->register_producer_filter(desc, &producer_filter, 
profile.get())
+                            .ok());
+        EXPECT_NE(producer_filter, nullptr);
+        // Register in local mgr twice
+        EXPECT_FALSE(local_runtime_filter_mgr
+                             ->register_producer_filter(desc, 
&producer_filter, profile.get())
+                             .ok());
+        EXPECT_NE(producer_filter, nullptr);
+
+        LocalMergeContext* local_merge_filters = nullptr;
+        EXPECT_FALSE(global_runtime_filter_mgr
+                             ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                             .ok());
+        EXPECT_FALSE(local_runtime_filter_mgr
+                             ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                             .ok());
+        // Register local merge filter
+        EXPECT_TRUE(global_runtime_filter_mgr
+                            ->register_local_merger_producer_filter(desc, 
producer_filter,
+                                                                    
profile.get())
+                            .ok());
+        EXPECT_TRUE(global_runtime_filter_mgr
+                            ->get_local_merge_producer_filters(filter_id, 
&local_merge_filters)
+                            .ok());
+        EXPECT_NE(local_merge_filters, nullptr);
+        EXPECT_EQ(local_merge_filters->producers.size(), 1);
+        local_merge_filters->producers.front()->_rf_state =
+                RuntimeFilterProducer::State ::WAITING_FOR_SYNCED_SIZE;
+    }
+    {
+        TNetworkAddress addr;
+        EXPECT_FALSE(global_runtime_filter_mgr->get_merge_addr(&addr).ok());
+
+        TRuntimeFilterParams param;
+        TNetworkAddress new_addr;
+        param.__set_runtime_filter_merge_addr(new_addr);
+        
EXPECT_TRUE(global_runtime_filter_mgr->set_runtime_filter_params(param));
+        
EXPECT_FALSE(global_runtime_filter_mgr->set_runtime_filter_params(param));
+        EXPECT_TRUE(global_runtime_filter_mgr->get_merge_addr(&addr).ok());
+    }
+    {
+        PSyncFilterSizeRequest request;
+        request.set_filter_id(filter_id);
+        request.set_filter_size(16);
+        
EXPECT_TRUE(global_runtime_filter_mgr->sync_filter_size(&request).ok());
+    }
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to