This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch refactor_rf
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 930566af3e039eea8d82a230b0dd7d85e9c71e1c
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Tue Mar 4 13:13:59 2025 +0800

    add more ut of roles
---
 be/src/runtime_filter/runtime_filter.h             |  1 -
 be/src/runtime_filter/runtime_filter_merger.h      | 11 -----
 be/src/runtime_filter/runtime_filter_producer.h    |  4 +-
 .../runtime_filter_producer_helper.cpp             |  6 +--
 .../runtime_filter_consumer_test.cpp               | 49 ++++++++++++++++++++++
 .../runtime_filter/runtime_filter_merger_test.cpp  | 26 +++++++++---
 .../runtime_filter_producer_helper_test.cpp        | 46 +++++++++++++++++---
 .../runtime_filter_producer_test.cpp               | 36 ++++++++++++++++
 8 files changed, 151 insertions(+), 28 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index dd63bf9379d..8f3b398e10e 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -38,7 +38,6 @@ class RuntimeFilter {
 public:
     virtual ~RuntimeFilter() = default;
 
-    RuntimeFilterWrapper* impl() const { return _wrapper.get(); }
     RuntimeFilterType type() const { return _runtime_filter_type; }
 
     bool has_local_target() const { return _has_local_target; }
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index 7c4f3ac639c..e85d46037fd 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -110,17 +110,6 @@ private:
         _profile->add_info_string("Info", debug_string());
     }
 
-    static std::string _to_string(const State& state) {
-        switch (state) {
-        case State::READY:
-            return "READY";
-        case State::WAITING_FOR_PRODUCT:
-            return "WAITING_FOR_PRODUCT";
-        default:
-            throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", 
int(state));
-        }
-    }
-
     std::atomic<State> _rf_state;
 
     int _expected_producer_num = 0;
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 17107a61d67..94969605536 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -60,8 +60,8 @@ public:
 
     // insert data to build filter
     Status insert(vectorized::ColumnPtr column, size_t start) {
-        if (_rf_state == State::READY_TO_PUBLISH || _rf_state == 
State::PUBLISHED) {
-            DCHECK(!_wrapper->is_valid());
+        if (!_wrapper->is_valid() || _rf_state == State::READY_TO_PUBLISH ||
+            _rf_state == State::PUBLISHED) {
             return Status::OK();
         }
         _check_state({State::WAITING_FOR_DATA});
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 22c8768b285..7f0b20189db 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -68,10 +68,6 @@ Status RuntimeFilterProducerHelper::_insert(const 
vectorized::Block* block, size
     SCOPED_TIMER(_runtime_filter_compute_timer);
     for (int i = 0; i < _producers.size(); i++) {
         auto filter = _producers[i];
-        if (!filter->impl()->is_valid()) {
-            // Skip building if ignored or disabled.
-            continue;
-        }
         int result_column_id = 
_filter_expr_contexts[i]->get_last_result_column_id();
         DCHECK_NE(result_column_id, -1);
         const auto& column = block->get_by_position(result_column_id).column;
@@ -111,6 +107,7 @@ Status RuntimeFilterProducerHelper::process(
 
     for (const auto& filter : _producers) {
         if (shared_hash_table_ctx && !wake_up_early) {
+            DCHECK(_is_broadcast_join);
             if (_should_build_hash_table) {
                 filter->copy_to_shared_context(shared_hash_table_ctx);
             } else {
@@ -134,6 +131,7 @@ Status 
RuntimeFilterProducerHelper::skip_process(RuntimeState* state) {
 
     RETURN_IF_ERROR(_publish(state));
     _skip_runtime_filters_process = true;
+    _profile->add_info_string("SkipProcess", "True");
     return Status::OK();
 }
 
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp 
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index 3e9a456c2a3..04ff0f139f8 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -136,4 +136,53 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
     ASSERT_TRUE(consumer->is_applied());
 }
 
+TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    const_cast<TQueryOptions&>(_query_ctx->_query_options)
+            .__set_runtime_filter_wait_infinitely(true);
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+
+    std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
+            desc, true, 0, &registed_consumer, &_profile));
+}
+
+TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
+
+    std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
+    consumer->signal(producer.get());
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
+    ASSERT_EQ(push_exprs.size(), 0);
+    ASSERT_TRUE(consumer->is_applied());
+}
+
+TEST_F(RuntimeFilterConsumerTest, aquire_ignored) {
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    auto desc = 
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, 
&consumer, &_profile));
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
+            RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&producer, &_profile));
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::IGNORED);
+
+    std::vector<vectorized::VRuntimeFilterPtr> push_exprs;
+    consumer->signal(producer.get());
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
+    ASSERT_EQ(push_exprs.size(), 0);
+    ASSERT_TRUE(consumer->is_applied());
+}
+
 } // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp 
b/be/test/runtime_filter/runtime_filter_merger_test.cpp
index e05e73fad89..aeaf1b4f806 100644
--- a/be/test/runtime_filter/runtime_filter_merger_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp
@@ -57,9 +57,10 @@ public:
     }
 
     void test_serialize(RuntimeFilterWrapper::State state,
-                        TRuntimeFilterType::type type = 
TRuntimeFilterType::IN_OR_BLOOM) {
+                        TRuntimeFilterDesc desc = TRuntimeFilterDescBuilder()
+                                                          
.set_type(TRuntimeFilterType::IN_OR_BLOOM)
+                                                          .build()) {
         std::shared_ptr<RuntimeFilterMerger> merger;
-        auto desc = TRuntimeFilterDescBuilder().set_type(type).build();
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create(
                 RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 
&merger, &_profile));
         merger->set_expected_producer_num(1);
@@ -182,15 +183,30 @@ TEST_F(RuntimeFilterMergerTest, serialize_ignored) {
 }
 
 TEST_F(RuntimeFilterMergerTest, serialize_bloom) {
-    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::BLOOM);
+    test_serialize(RuntimeFilterWrapper::State::READY,
+                   
TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::BLOOM).build());
 }
 
 TEST_F(RuntimeFilterMergerTest, serialize_min_max) {
-    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::MIN_MAX);
+    test_serialize(RuntimeFilterWrapper::State::READY,
+                   
TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build());
 }
 
 TEST_F(RuntimeFilterMergerTest, serialize_in) {
-    test_serialize(RuntimeFilterWrapper::State::READY, 
TRuntimeFilterType::type::IN);
+    test_serialize(RuntimeFilterWrapper::State::READY,
+                   
TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::IN).build());
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_min_only) {
+    auto desc = 
TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build();
+    desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN);
+    test_serialize(RuntimeFilterWrapper::State::READY, desc);
+}
+
+TEST_F(RuntimeFilterMergerTest, serialize_max_only) {
+    auto desc = 
TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build();
+    desc.__set_min_max_type(TMinMaxRuntimeFilterType::MAX);
+    test_serialize(RuntimeFilterWrapper::State::READY, desc);
 }
 
 } // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
index 433a612d0c7..af48faffcf4 100644
--- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp
@@ -43,15 +43,17 @@ class RuntimeFilterProducerHelperTest : public 
RuntimeFilterTest {
                 TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), 
_tbl));
         FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
 
-        _task.reset(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[0].get(), nullptr,
-                                               &_profile, {}, 0));
-        _runtime_states[0]->set_task(_task.get());
+        for (int i = 0; i < INSTANCE_NUM; i++) {
+            _tasks.emplace_back(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[i].get(),
+                                                           nullptr, &_profile, 
{}, 0));
+            _runtime_states[i]->set_task(_tasks.back().get());
+        }
     }
 
     pipeline::OperatorPtr _op;
     pipeline::DataSinkOperatorPtr _sink;
     pipeline::PipelinePtr _pipeline;
-    std::shared_ptr<pipeline::PipelineTask> _task;
+    std::vector<std::shared_ptr<pipeline::PipelineTask>> _tasks;
     ObjectPool _pool;
 };
 
@@ -100,7 +102,7 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) {
     block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
 
     vectorized::SharedHashTableContextPtr shared_hash_table_ctx;
-    _task->set_wake_up_early();
+    _tasks[0]->set_wake_up_early();
     FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
             helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
 }
@@ -121,6 +123,9 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
 
     
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.skip_process(_runtime_states[0].get()));
 
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.send_filter_size(_runtime_states[0].get(), 123, nullptr));
+
     vectorized::Block block;
     auto column = vectorized::ColumnInt32::create();
     column->insert(1);
@@ -132,4 +137,35 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) {
             helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
 }
 
+TEST_F(RuntimeFilterProducerHelperTest, broadcast) {
+    auto helper = RuntimeFilterProducerHelper(&_profile, true, true);
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = 
{TRuntimeFilterDescBuilder().build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    vectorized::SharedHashTableContextPtr shared_hash_table_ctx =
+            std::make_shared<vectorized::SharedHashTableContext>();
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.process(_runtime_states[0].get(), &block, 
shared_hash_table_ctx));
+
+    auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true);
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper2.init(_runtime_states[1].get(), build_expr_ctxs, 
runtime_filter_descs));
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper2.process(_runtime_states[1].get(), &block, 
shared_hash_table_ctx));
+}
+
 } // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index 9822e7a0f41..8122004ed58 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -168,4 +168,40 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) {
     ASSERT_EQ(consumer->_wrapper->_state, 
RuntimeFilterWrapper::State::DISABLED);
 }
 
+TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge_with_ignored) {
+    auto desc = TRuntimeFilterDescBuilder()
+                        .set_build_bf_by_runtime_size(true)
+                        .set_is_broadcast_join(false)
+                        .add_planId_to_target_expr(0)
+                        .build();
+
+    std::shared_ptr<RuntimeFilterProducer> producer;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[0]->register_producer_runtime_filter(desc, 
&producer, &_profile));
+    std::shared_ptr<RuntimeFilterProducer> producer2;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            _runtime_states[1]->register_producer_runtime_filter(desc, 
&producer2, &_profile));
+
+    std::shared_ptr<RuntimeFilterConsumer> consumer;
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
+            desc, true, 0, &consumer, &_profile));
+
+    ASSERT_EQ(producer->_need_sync_filter_size, true);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
+
+    auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 
0, "");
+
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            producer->send_size(_runtime_states[0].get(), 123, dependency));
+    // global mode, need waitting synced size
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE);
+    ASSERT_FALSE(dependency->ready());
+    
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
+
+    
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->send_size(_runtime_states[1].get(), 
1, dependency));
+    ASSERT_EQ(producer2->_rf_state, 
RuntimeFilterProducer::State::WAITING_FOR_DATA);
+    ASSERT_EQ(producer->_rf_state, 
RuntimeFilterProducer::State::READY_TO_PUBLISH);
+    ASSERT_EQ(producer2->_synced_size, 124);
+    ASSERT_TRUE(dependency->ready());
+}
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to