This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch refactor_rf in repository https://gitbox.apache.org/repos/asf/doris.git
commit 930566af3e039eea8d82a230b0dd7d85e9c71e1c Author: BiteTheDDDDt <[email protected]> AuthorDate: Tue Mar 4 13:13:59 2025 +0800 add more ut of roles --- be/src/runtime_filter/runtime_filter.h | 1 - be/src/runtime_filter/runtime_filter_merger.h | 11 ----- be/src/runtime_filter/runtime_filter_producer.h | 4 +- .../runtime_filter_producer_helper.cpp | 6 +-- .../runtime_filter_consumer_test.cpp | 49 ++++++++++++++++++++++ .../runtime_filter/runtime_filter_merger_test.cpp | 26 +++++++++--- .../runtime_filter_producer_helper_test.cpp | 46 +++++++++++++++++--- .../runtime_filter_producer_test.cpp | 36 ++++++++++++++++ 8 files changed, 151 insertions(+), 28 deletions(-) diff --git a/be/src/runtime_filter/runtime_filter.h b/be/src/runtime_filter/runtime_filter.h index dd63bf9379d..8f3b398e10e 100644 --- a/be/src/runtime_filter/runtime_filter.h +++ b/be/src/runtime_filter/runtime_filter.h @@ -38,7 +38,6 @@ class RuntimeFilter { public: virtual ~RuntimeFilter() = default; - RuntimeFilterWrapper* impl() const { return _wrapper.get(); } RuntimeFilterType type() const { return _runtime_filter_type; } bool has_local_target() const { return _has_local_target; } diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index 7c4f3ac639c..e85d46037fd 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -110,17 +110,6 @@ private: _profile->add_info_string("Info", debug_string()); } - static std::string _to_string(const State& state) { - switch (state) { - case State::READY: - return "READY"; - case State::WAITING_FOR_PRODUCT: - return "WAITING_FOR_PRODUCT"; - default: - throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid State {}", int(state)); - } - } - std::atomic<State> _rf_state; int _expected_producer_num = 0; diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 17107a61d67..94969605536 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -60,8 +60,8 @@ public: // insert data to build filter Status insert(vectorized::ColumnPtr column, size_t start) { - if (_rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { - DCHECK(!_wrapper->is_valid()); + if (!_wrapper->is_valid() || _rf_state == State::READY_TO_PUBLISH || + _rf_state == State::PUBLISHED) { return Status::OK(); } _check_state({State::WAITING_FOR_DATA}); diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp b/be/src/runtime_filter/runtime_filter_producer_helper.cpp index 22c8768b285..7f0b20189db 100644 --- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp +++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp @@ -68,10 +68,6 @@ Status RuntimeFilterProducerHelper::_insert(const vectorized::Block* block, size SCOPED_TIMER(_runtime_filter_compute_timer); for (int i = 0; i < _producers.size(); i++) { auto filter = _producers[i]; - if (!filter->impl()->is_valid()) { - // Skip building if ignored or disabled. - continue; - } int result_column_id = _filter_expr_contexts[i]->get_last_result_column_id(); DCHECK_NE(result_column_id, -1); const auto& column = block->get_by_position(result_column_id).column; @@ -111,6 +107,7 @@ Status RuntimeFilterProducerHelper::process( for (const auto& filter : _producers) { if (shared_hash_table_ctx && !wake_up_early) { + DCHECK(_is_broadcast_join); if (_should_build_hash_table) { filter->copy_to_shared_context(shared_hash_table_ctx); } else { @@ -134,6 +131,7 @@ Status RuntimeFilterProducerHelper::skip_process(RuntimeState* state) { RETURN_IF_ERROR(_publish(state)); _skip_runtime_filters_process = true; + _profile->add_info_string("SkipProcess", "True"); return Status::OK(); } diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp b/be/test/runtime_filter/runtime_filter_consumer_test.cpp index 3e9a456c2a3..04ff0f139f8 100644 --- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp @@ -136,4 +136,53 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) { ASSERT_TRUE(consumer->is_applied()); } +TEST_F(RuntimeFilterConsumerTest, wait_infinity) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + const_cast<TQueryOptions&>(_query_ctx->_query_options) + .__set_runtime_filter_wait_infinitely(true); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterConsumer> registed_consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( + desc, true, 0, ®isted_consumer, &_profile)); +} + +TEST_F(RuntimeFilterConsumerTest, aquire_disabled) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); + + std::vector<vectorized::VRuntimeFilterPtr> push_exprs; + consumer->signal(producer.get()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); + ASSERT_EQ(push_exprs.size(), 0); + ASSERT_TRUE(consumer->is_applied()); +} + +TEST_F(RuntimeFilterConsumerTest, aquire_ignored) { + std::shared_ptr<RuntimeFilterConsumer> consumer; + auto desc = TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0, &consumer, &_profile)); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create( + RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &producer, &_profile)); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::IGNORED); + + std::vector<vectorized::VRuntimeFilterPtr> push_exprs; + consumer->signal(producer.get()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs)); + ASSERT_EQ(push_exprs.size(), 0); + ASSERT_TRUE(consumer->is_applied()); +} + } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_merger_test.cpp b/be/test/runtime_filter/runtime_filter_merger_test.cpp index e05e73fad89..aeaf1b4f806 100644 --- a/be/test/runtime_filter/runtime_filter_merger_test.cpp +++ b/be/test/runtime_filter/runtime_filter_merger_test.cpp @@ -57,9 +57,10 @@ public: } void test_serialize(RuntimeFilterWrapper::State state, - TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM) { + TRuntimeFilterDesc desc = TRuntimeFilterDescBuilder() + .set_type(TRuntimeFilterType::IN_OR_BLOOM) + .build()) { std::shared_ptr<RuntimeFilterMerger> merger; - auto desc = TRuntimeFilterDescBuilder().set_type(type).build(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterMerger::create( RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, &merger, &_profile)); merger->set_expected_producer_num(1); @@ -182,15 +183,30 @@ TEST_F(RuntimeFilterMergerTest, serialize_ignored) { } TEST_F(RuntimeFilterMergerTest, serialize_bloom) { - test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::BLOOM); + test_serialize(RuntimeFilterWrapper::State::READY, + TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::BLOOM).build()); } TEST_F(RuntimeFilterMergerTest, serialize_min_max) { - test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::MIN_MAX); + test_serialize(RuntimeFilterWrapper::State::READY, + TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build()); } TEST_F(RuntimeFilterMergerTest, serialize_in) { - test_serialize(RuntimeFilterWrapper::State::READY, TRuntimeFilterType::type::IN); + test_serialize(RuntimeFilterWrapper::State::READY, + TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::IN).build()); +} + +TEST_F(RuntimeFilterMergerTest, serialize_min_only) { + auto desc = TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build(); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MIN); + test_serialize(RuntimeFilterWrapper::State::READY, desc); +} + +TEST_F(RuntimeFilterMergerTest, serialize_max_only) { + auto desc = TRuntimeFilterDescBuilder().set_type(TRuntimeFilterType::MIN_MAX).build(); + desc.__set_min_max_type(TMinMaxRuntimeFilterType::MAX); + test_serialize(RuntimeFilterWrapper::State::READY, desc); } } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp index 433a612d0c7..af48faffcf4 100644 --- a/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_helper_test.cpp @@ -43,15 +43,17 @@ class RuntimeFilterProducerHelperTest : public RuntimeFilterTest { TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), _tbl)); FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink)); - _task.reset(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[0].get(), nullptr, - &_profile, {}, 0)); - _runtime_states[0]->set_task(_task.get()); + for (int i = 0; i < INSTANCE_NUM; i++) { + _tasks.emplace_back(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[i].get(), + nullptr, &_profile, {}, 0)); + _runtime_states[i]->set_task(_tasks.back().get()); + } } pipeline::OperatorPtr _op; pipeline::DataSinkOperatorPtr _sink; pipeline::PipelinePtr _pipeline; - std::shared_ptr<pipeline::PipelineTask> _task; + std::vector<std::shared_ptr<pipeline::PipelineTask>> _tasks; ObjectPool _pool; }; @@ -100,7 +102,7 @@ TEST_F(RuntimeFilterProducerHelperTest, wake_up_eraly) { block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); vectorized::SharedHashTableContextPtr shared_hash_table_ctx; - _task->set_wake_up_early(); + _tasks[0]->set_wake_up_early(); FAIL_IF_ERROR_OR_CATCH_EXCEPTION( helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); } @@ -121,6 +123,9 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) { FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.skip_process(_runtime_states[0].get())); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.send_filter_size(_runtime_states[0].get(), 123, nullptr)); + vectorized::Block block; auto column = vectorized::ColumnInt32::create(); column->insert(1); @@ -132,4 +137,35 @@ TEST_F(RuntimeFilterProducerHelperTest, skip_process) { helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); } +TEST_F(RuntimeFilterProducerHelperTest, broadcast) { + auto helper = RuntimeFilterProducerHelper(&_profile, true, true); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = {TRuntimeFilterDescBuilder().build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + vectorized::SharedHashTableContextPtr shared_hash_table_ctx = + std::make_shared<vectorized::SharedHashTableContext>(); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.process(_runtime_states[0].get(), &block, shared_hash_table_ctx)); + + auto helper2 = RuntimeFilterProducerHelper(&_profile, false, true); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper2.init(_runtime_states[1].get(), build_expr_ctxs, runtime_filter_descs)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper2.process(_runtime_states[1].get(), &block, shared_hash_table_ctx)); +} + } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp b/be/test/runtime_filter/runtime_filter_producer_test.cpp index 9822e7a0f41..8122004ed58 100644 --- a/be/test/runtime_filter/runtime_filter_producer_test.cpp +++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp @@ -168,4 +168,40 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) { ASSERT_EQ(consumer->_wrapper->_state, RuntimeFilterWrapper::State::DISABLED); } +TEST_F(RuntimeFilterProducerTest, sync_filter_size_local_merge_with_ignored) { + auto desc = TRuntimeFilterDescBuilder() + .set_build_bf_by_runtime_size(true) + .set_is_broadcast_join(false) + .add_planId_to_target_expr(0) + .build(); + + std::shared_ptr<RuntimeFilterProducer> producer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[0]->register_producer_runtime_filter(desc, &producer, &_profile)); + std::shared_ptr<RuntimeFilterProducer> producer2; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + _runtime_states[1]->register_producer_runtime_filter(desc, &producer2, &_profile)); + + std::shared_ptr<RuntimeFilterConsumer> consumer; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter( + desc, true, 0, &consumer, &_profile)); + + ASSERT_EQ(producer->_need_sync_filter_size, true); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE); + + auto dependency = std::make_shared<pipeline::CountedFinishDependency>(0, 0, ""); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + producer->send_size(_runtime_states[0].get(), 123, dependency)); + // global mode, need waitting synced size + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_SYNCED_SIZE); + ASSERT_FALSE(dependency->ready()); + producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED); + + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(producer2->send_size(_runtime_states[1].get(), 1, dependency)); + ASSERT_EQ(producer2->_rf_state, RuntimeFilterProducer::State::WAITING_FOR_DATA); + ASSERT_EQ(producer->_rf_state, RuntimeFilterProducer::State::READY_TO_PUBLISH); + ASSERT_EQ(producer2->_synced_size, 124); + ASSERT_TRUE(dependency->ready()); +} } // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
