This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c8a793ad6ae [Exec](RF) Support merge remote rf local first (#31067)
c8a793ad6ae is described below
commit c8a793ad6ae68c6ba0584f3ed216f9f6716f7ee7
Author: HappenLee <[email protected]>
AuthorDate: Wed Feb 21 22:38:33 2024 +0800
[Exec](RF) Support merge remote rf local first (#31067)
---
be/src/agent/be_exec_version_manager.h | 1 +
be/src/exprs/runtime_filter.cpp | 77 +++++++++---------
be/src/exprs/runtime_filter.h | 24 ++----
be/src/exprs/runtime_filter_slots.h | 11 ++-
be/src/pipeline/exec/datagen_operator.cpp | 9 +--
be/src/pipeline/exec/hashjoin_build_sink.cpp | 23 +++---
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
.../exec/nested_loop_join_build_operator.cpp | 4 +-
be/src/pipeline/pipeline_fragment_context.cpp | 5 +-
be/src/pipeline/pipeline_fragment_context.h | 2 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 14 ++--
.../pipeline_x/pipeline_x_fragment_context.h | 2 +-
be/src/runtime/fragment_mgr.cpp | 5 +-
be/src/runtime/query_context.h | 3 +
be/src/runtime/runtime_filter_mgr.cpp | 91 ++++++++++++++++++----
be/src/runtime/runtime_filter_mgr.h | 32 ++++----
be/src/runtime/runtime_state.cpp | 36 +++++++--
be/src/runtime/runtime_state.h | 22 ++++--
be/src/vec/exec/join/vhash_join_node.cpp | 6 +-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 4 +-
be/src/vec/exec/runtime_filter_consumer.cpp | 23 ++----
be/src/vec/exec/runtime_filter_consumer.h | 4 +-
be/src/vec/exec/vdata_gen_scan_node.cpp | 9 +--
.../main/java/org/apache/doris/qe/Coordinator.java | 3 +-
24 files changed, 243 insertions(+), 169 deletions(-)
diff --git a/be/src/agent/be_exec_version_manager.h
b/be/src/agent/be_exec_version_manager.h
index f5213c54089..afe738684aa 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -65,6 +65,7 @@ private:
* d. unix_timestamp function support timestamp with float for datetimev2,
and change nullable mode.
* e. change shuffle serialize/deserialize way
* f. shrink some function's nullable mode.
+ * g. do local merge of remote runtime filter
*/
constexpr inline int BeExecVersionManager::max_be_exec_version = 3;
constexpr inline int BeExecVersionManager::min_be_exec_version = 0;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2ac9ad40c5a..1ec66bf2a87 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -938,11 +938,11 @@ private:
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool*
pool,
const TRuntimeFilterDesc* desc, const
TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
- bool build_bf_exactly, bool is_global, int
parallel_tasks) {
- *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global,
parallel_tasks));
+ bool build_bf_exactly, bool need_local_merge) {
+ *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id,
- is_global ? false : build_bf_exactly);
+ need_local_merge ? false : build_bf_exactly);
}
vectorized::SharedRuntimeFilterContext&
IRuntimeFilter::get_shared_context_ref() {
@@ -954,47 +954,53 @@ void IRuntimeFilter::insert_batch(const
vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}
-Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper,
int* merged_num) {
- SCOPED_TIMER(_merge_local_rf_timer);
- std::unique_lock lock(_local_merge_mutex);
- if (_merged_rf_num == 0) {
- _wrapper = wrapper;
- } else {
- RETURN_IF_ERROR(merge_from(wrapper));
- }
- *merged_num = ++_merged_rf_num;
- return Status::OK();
-}
-
Status IRuntimeFilter::publish(bool publish_local) {
DCHECK(is_producer());
- if (_is_global && _has_local_target) {
- std::vector<IRuntimeFilter*> filters;
-
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters(
- _filter_id, filters));
- // push down
- for (auto filter : filters) {
- int merged_num = 0;
- RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num));
- if (merged_num == _parallel_build_tasks) {
- filter->update_runtime_filter_type_to_profile();
- filter->signal();
- }
- }
- } else if (_has_local_target) {
+ auto send_to_remote = [&](IRuntimeFilter* filter) {
+ TNetworkAddress addr;
+ DCHECK(_state != nullptr);
+ RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
+ return filter->push_to_remote(&addr, _opt_remote_rf);
+ };
+ auto send_to_local = [&](RuntimePredicateWrapper* wrapper) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
+ DCHECK(!filters.empty());
// push down
for (auto filter : filters) {
- filter->_wrapper = _wrapper;
+ filter->_wrapper = wrapper;
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
+ return Status::OK();
+ };
+ auto do_local_merge = [&]() {
+ LocalMergeFilters* local_merge_filters = nullptr;
+
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
+ _filter_id, &local_merge_filters));
+ std::lock_guard l(*local_merge_filters->lock);
+ RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper));
+ local_merge_filters->merge_time--;
+ if (local_merge_filters->merge_time == 0) {
+ if (_has_local_target) {
+
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
+ } else {
+
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0]));
+ }
+ }
+ return Status::OK();
+ };
+
+ if (_need_local_merge && _has_local_target) {
+ RETURN_IF_ERROR(do_local_merge());
+ } else if (_has_local_target) {
+ RETURN_IF_ERROR(send_to_local(_wrapper));
} else if (!publish_local) {
- TNetworkAddress addr;
- DCHECK(_state != nullptr);
- RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
- return push_to_remote(&addr, _opt_remote_rf);
+ if (_is_broadcast_join || _state->be_exec_version < 3) {
+ RETURN_IF_ERROR(send_to_remote(this));
+ } else {
+ RETURN_IF_ERROR(do_local_merge());
+ }
} else {
// remote broadcast join only push onetime in build shared hash table
// publish_local only set true on copy shared hash table
@@ -1366,9 +1372,6 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
- if (_is_global) {
- _merge_local_rf_timer = ADD_TIMER(_profile.get(),
"MergeLocalRuntimeFilterTime");
- }
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index ae8063e5610..d853493889c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -185,7 +185,7 @@ enum RuntimeFilterState {
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
- const TRuntimeFilterDesc* desc, bool is_global = false, int
parallel_tasks = -1)
+ const TRuntimeFilterDesc* desc, bool need_local_merge =
false)
: _state(state),
_pool(pool),
_filter_id(desc->filter_id),
@@ -204,16 +204,14 @@ public:
_name(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)),
- _is_global(is_global),
- _parallel_build_tasks(parallel_tasks) {}
+ _need_local_merge(need_local_merge) {}
~IRuntimeFilter() = default;
static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions*
query_options,
const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
- bool build_bf_exactly = false, bool is_global = false,
- int parallel_tasks = 0);
+ bool build_bf_exactly = false, bool need_local_merge
= false);
vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
@@ -349,8 +347,6 @@ public:
void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
- Status merge_local_filter(RuntimePredicateWrapper* wrapper, int*
merged_num);
-
protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
@@ -426,18 +422,10 @@ protected:
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
- RuntimeProfile::Counter* _merge_local_rf_timer = nullptr;
bool _opt_remote_rf;
- // `_is_global` indicates whether this runtime filter is global on this BE.
- // All runtime filters should be merged on each BE if it is global.
- // This is improvement for pipelineX.
- const bool _is_global = false;
- std::mutex _local_merge_mutex;
- // There are `_parallel_build_tasks` pipeline tasks to build runtime
filter.
- // We should call `signal` once all runtime filters are done and merged to
one
- // (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`).
- int _merged_rf_num = 0;
- const int _parallel_build_tasks = -1;
+ // `_need_local_merge` indicates whether this runtime filter is global on
this BE.
+ // All runtime filters should be merged on each BE before push_to_remote
or publish.
+ const bool _need_local_merge = false;
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
};
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 7a738b8c06d..7f34bf7f2c9 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -34,25 +34,24 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
build_expr_ctxs,
- const std::vector<IRuntimeFilter*>& runtime_filters, bool
is_global = false)
+ const std::vector<IRuntimeFilter*>& runtime_filters, bool
need_local_merge = false)
: _build_expr_context(build_expr_ctxs),
_runtime_filters(runtime_filters),
- _is_global(is_global) {}
+ _need_local_merge(need_local_merge) {}
Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
// 1. we will ignore IN filter when hash_table_size is too big
// 2. we will ignore BLOOM filter and MinMax filter when
hash_table_size
// is too small and IN filter has effect
-
std::map<int, bool> has_in_filter;
auto ignore_local_filter = [&](int filter_id) {
// Now pipeline x have bug in ignore, after fix the problem enable
ignore logic in pipeline x
- if (_is_global) {
+ if (_need_local_merge) {
return Status::OK();
}
- auto runtime_filter_mgr = state->runtime_filter_mgr();
+ auto runtime_filter_mgr = state->local_runtime_filter_mgr();
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id,
filters));
@@ -217,7 +216,7 @@ public:
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
_build_expr_context;
std::vector<IRuntimeFilter*> _runtime_filters;
- const bool _is_global = false;
+ const bool _need_local_merge = false;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
};
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 916ce62aa26..418068ef9d8 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -100,13 +100,8 @@ Status DataGenLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
- if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, state->query_options(), p.node_id(),
&runtime_filter, false));
- } else {
-
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, state->query_options(), p.node_id(),
&runtime_filter, false));
- }
+ RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc,
false, p.node_id(),
+
&runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
return Status::OK();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 19de8fb7bdc..9c6eff4cda8 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -99,9 +99,9 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_hash_table_init(state);
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- p._runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i],
- _build_expr_ctxs.size() == 1, p._use_global_rf,
p._child_x->parallel_tasks()));
+ RETURN_IF_ERROR(state->register_producer_runtime_filter(
+ p._runtime_filter_descs[i], p._need_local_merge,
&_runtime_filters[i],
+ _build_expr_ctxs.size() == 1));
}
return Status::OK();
@@ -370,7 +370,7 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int
operator_id,
const TPlanNode& tnode,
const DescriptorTbl&
descs,
- bool use_global_rf)
+ bool need_local_merge)
: JoinBuildSinkOperatorX(pool, operator_id, tnode, descs),
_join_distribution(tnode.hash_join_node.__isset.dist_type ?
tnode.hash_join_node.dist_type
:
TJoinDistributionType::NONE),
@@ -379,7 +379,7 @@
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope
_partition_exprs(tnode.__isset.distribute_expr_lists &&
!_is_broadcast_join
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
- _use_global_rf(use_global_rf) {}
+ _need_local_merge(need_local_merge) {}
Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
if (_is_broadcast_join) {
@@ -475,10 +475,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._shared_state->build_block =
std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
- const bool use_global_rf =
-
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
+ const bool need_local_merge =
+
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
RETURN_IF_ERROR(vectorized::process_runtime_filter_build(
- state, local_state._shared_state->build_block.get(),
&local_state, use_global_rf));
+ state, local_state._shared_state->build_block.get(),
&local_state,
+ need_local_merge));
RETURN_IF_ERROR(
local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
@@ -528,8 +529,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._shared_state->build_block =
_shared_hash_table_context->block;
local_state._shared_state->build_indexes_null =
_shared_hash_table_context->build_indexes_null;
- const bool use_global_rf =
-
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
+ const bool need_local_merge =
+
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge;
if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
@@ -545,7 +546,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._runtime_filter_slots =
std::make_shared<VRuntimeFilterSlots>(
_build_expr_ctxs,
local_state._runtime_filters,
- use_global_rf);
+ need_local_merge);
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
state, arg.hash_table->size()));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 56a651e4210..134aba69a48 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -190,7 +190,7 @@ private:
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
const std::vector<TExpr> _partition_exprs;
- const bool _use_global_rf;
+ const bool _need_local_merge;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index c30bb5ad67c..8485e2c0b24 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
}
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- p._runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i]));
+
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
false,
+
&_runtime_filters[i], false));
}
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index e251b203794..358086e94eb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -230,8 +230,9 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_runtime_state = RuntimeState::create_unique(
local_params.fragment_instance_id, request.query_id,
request.fragment_id,
request.query_options, _query_ctx->query_globals, _exec_env,
_query_ctx.get());
- if (local_params.__isset.runtime_filter_params) {
-
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
+ if (idx == 0 && local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
}
_runtime_state->set_task_execution_context(shared_from_this());
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 9ffcb40038c..38db8cbe8ff 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -78,7 +78,7 @@ public:
RuntimeState* get_runtime_state() { return _runtime_state.get(); }
virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId
/*fragment_instance_id*/) {
- return _runtime_state->runtime_filter_mgr();
+ return _runtime_state->local_runtime_filter_mgr();
}
QueryContext* get_query_ctx() { return _query_ctx.get(); }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 9087f94db5e..693cb4d5c30 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -210,8 +210,9 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);
- _use_global_rf = request.__isset.parallel_instances &&
(request.__isset.per_node_shared_scans &&
-
!request.per_node_shared_scans.empty());
+ _need_local_merge =
+ request.__isset.parallel_instances &&
+ (request.__isset.per_node_shared_scans &&
!request.per_node_shared_scans.empty());
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
@@ -523,11 +524,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
filterparams->query_ctx = _query_ctx.get();
}
- // build runtime_filter_mgr for each instance
+ // build local_runtime_filter_mgr for each instance
runtime_filter_mgr =
std::make_unique<RuntimeFilterMgr>(request.query_id,
filterparams.get());
- if (local_params.__isset.runtime_filter_params) {
-
runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params);
+ if (i == 0 && local_params.__isset.runtime_filter_params) {
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ local_params.runtime_filter_params);
}
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
@@ -986,7 +988,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
sink.reset(new HashJoinBuildSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs,
- _use_global_rf));
+ _need_local_merge));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 439b0072d72..54714dd4665 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -177,7 +177,7 @@ private:
// this is a [n * m] matrix. n is parallelism of pipeline engine and m is
the number of pipelines.
std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
- bool _use_global_rf = false;
+ bool _need_local_merge = false;
// It is used to manage the lifecycle of RuntimeFilterMergeController
std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>>
_merge_controller_handlers;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index af7370a4c58..64f5d6416a8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1304,7 +1304,7 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_runtime_filter_mgr(fragment_instance_id);
+ runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
@@ -1315,7 +1315,8 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
fragment_executor = iter->second;
DCHECK(fragment_executor != nullptr);
- runtime_filter_mgr =
fragment_executor->runtime_state()->runtime_filter_mgr();
+ runtime_filter_mgr =
+
fragment_executor->runtime_state()->get_query_ctx()->runtime_filter_mgr();
}
return runtime_filter_mgr->update_filter(request, attach_data);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index cf4321beab3..3db91ba2824 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -187,6 +187,9 @@ public:
return _query_options.__isset.fe_process_uuid ?
_query_options.fe_process_uuid : 0;
}
+ // global runtime filter mgr, the runtime filter have remote target or
+ // need local merge should regist here. before publish() or
push_to_remote()
+ // the runtime filter should do the local merge work
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
TUniqueId query_id() const { return _query_id; }
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 799348ef1d3..95f65c5fc32 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -74,7 +74,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int
filter_id,
Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc&
desc,
const TQueryOptions&
options, int node_id,
IRuntimeFilter**
consumer_filter,
- bool build_bf_exactly, bool
is_global) {
+ bool build_bf_exactly, bool
need_local_merge) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
bool has_exist = false;
@@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
}
}
- // TODO: union the remote opt and global two case as one case to one judge
- bool remote_opt_or_global = (desc.__isset.opt_remote_rf &&
desc.opt_remote_rf) || is_global;
-
if (!has_exist) {
IRuntimeFilter* filter;
- RETURN_IF_ERROR(IRuntimeFilter::create(
- _state, remote_opt_or_global ? _state->obj_pool() : &_pool,
&desc, &options,
- RuntimeFilterRole::CONSUMER, node_id, &filter,
build_bf_exactly, is_global));
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::CONSUMER,
node_id, &filter,
+ build_bf_exactly,
need_local_merge));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
- } else if (!remote_opt_or_global) {
+ } else if (!need_local_merge) {
return Status::InvalidArgument("filter has registered");
}
return Status::OK();
}
+Status RuntimeFilterMgr::register_local_merge_producer_filter(
+ const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions&
options,
+ doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) {
+ SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
+ int32_t key = desc.filter_id;
+
+ decltype(_local_merge_producer_map.end()) iter;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ iter = _local_merge_producer_map.find(key);
+ if (iter == _local_merge_producer_map.end()) {
+ auto [new_iter, _] = _local_merge_producer_map.emplace(key,
LocalMergeFilters {});
+ iter = new_iter;
+ }
+ }
+
+ DCHECK(_state != nullptr);
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
+ RuntimeFilterRole::PRODUCER, -1,
producer_filter,
+ build_bf_exactly, true));
+ {
+ std::lock_guard<std::mutex> l(*iter->second.lock);
+ if (iter->second.filters.empty()) {
+ IRuntimeFilter* merge_filter = nullptr;
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc,
&options,
+
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
+ build_bf_exactly, true));
+ iter->second.filters.emplace_back(merge_filter);
+ }
+ iter->second.merge_time++;
+ iter->second.filters.emplace_back(*producer_filter);
+ }
+ return Status::OK();
+}
+
+Status RuntimeFilterMgr::get_local_merge_producer_filters(
+ int filter_id, doris::LocalMergeFilters** local_merge_filters) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto iter = _local_merge_producer_map.find(filter_id);
+ if (iter == _local_merge_producer_map.end()) {
+ return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.",
filter_id);
+ }
+ *local_merge_filters = &iter->second;
+ DCHECK(!iter->second.filters.empty());
+ DCHECK_GT(iter->second.merge_time, 0);
+ return Status::OK();
+}
+
Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc&
desc,
const TQueryOptions& options,
IRuntimeFilter**
producer_filter,
- bool build_bf_exactly, bool
is_global,
- int parallel_tasks) {
+ bool build_bf_exactly) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
std::lock_guard<std::mutex> l(_lock);
@@ -122,7 +166,7 @@ Status RuntimeFilterMgr::register_producer_filter(const
TRuntimeFilterDesc& desc
}
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
RuntimeFilterRole::PRODUCER, -1,
producer_filter,
- build_bf_exactly, is_global,
parallel_tasks));
+ build_bf_exactly));
_producer_map.emplace(key, *producer_filter);
return Status::OK();
}
@@ -133,7 +177,19 @@ Status RuntimeFilterMgr::update_filter(const
PPublishFilterRequest* request,
UpdateRuntimeFilterParams params(request, data, &_pool);
int filter_id = request->filter_id();
std::vector<IRuntimeFilter*> filters;
- RETURN_IF_ERROR(get_consume_filters(filter_id, filters));
+ // The code is organized for upgrade compatibility to prevent infinite
waiting
+ // old way update filter the code should be deleted after the upgrade is
complete.
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ auto iter = _consumer_map.find(filter_id);
+ if (iter == _consumer_map.end()) {
+ return Status::InvalidArgument("unknown filter: {}, role:
CONSUMER.", filter_id);
+ }
+ for (auto& holder : iter->second) {
+ filters.emplace_back(holder.filter);
+ }
+ iter->second.clear();
+ }
for (auto filter : filters) {
RETURN_IF_ERROR(filter->update_filter(¶ms));
}
@@ -143,8 +199,11 @@ Status RuntimeFilterMgr::update_filter(const
PPublishFilterRequest* request,
void RuntimeFilterMgr::set_runtime_filter_params(
const TRuntimeFilterParams& runtime_filter_params) {
- this->_merge_addr = runtime_filter_params.runtime_filter_merge_addr;
- this->_has_merge_addr = true;
+ std::lock_guard l(_lock);
+ if (!_has_merge_addr) {
+ _merge_addr = runtime_filter_params.runtime_filter_merge_addr;
+ _has_merge_addr = true;
+ }
}
Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) {
@@ -454,7 +513,7 @@ RuntimeFilterParamsContext*
RuntimeFilterParamsContext::create(RuntimeState* sta
params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms();
params->enable_pipeline_exec = state->enable_pipeline_exec();
params->execution_timeout = state->execution_timeout();
- params->runtime_filter_mgr = state->runtime_filter_mgr();
+ params->runtime_filter_mgr = state->local_runtime_filter_mgr();
params->exec_env = state->exec_env();
params->query_id.set_hi(state->query_id().hi);
params->query_id.set_lo(state->query_id().lo);
@@ -479,8 +538,6 @@ RuntimeFilterParamsContext*
RuntimeFilterParamsContext::create(QueryContext* que
params->be_exec_version = query_ctx->be_exec_version();
params->query_ctx = query_ctx;
- params->_obj_pool = &query_ctx->obj_pool;
- params->_is_global = true;
return params;
}
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 7caea8011d2..c9b455bc107 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -53,6 +53,12 @@ class QueryContext;
struct RuntimeFilterParamsContext;
class ExecEnv;
+struct LocalMergeFilters {
+ std::unique_ptr<std::mutex> lock = std::make_unique<std::mutex>();
+ int merge_time = 0;
+ std::vector<IRuntimeFilter*> filters;
+};
+
/// producer:
/// Filter filter;
/// get_filter(filter_id, &filter);
@@ -76,10 +82,18 @@ public:
// register filter
Status register_consumer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
int node_id, IRuntimeFilter**
consumer_filter,
- bool build_bf_exactly = false, bool
is_global = false);
+ bool build_bf_exactly = false, bool
need_local_merge = false);
+
+ Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
+ const TQueryOptions& options,
+ IRuntimeFilter**
producer_filter,
+ bool build_bf_exactly = false);
+
+ Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters**
local_merge_filters);
+
Status register_producer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
- IRuntimeFilter** producer_filter, bool
build_bf_exactly = false,
- bool is_global = false, int parallel_tasks
= 0);
+ IRuntimeFilter** producer_filter,
+ bool build_bf_exactly = false);
// update filter by remote
Status update_filter(const PPublishFilterRequest* request,
@@ -100,6 +114,7 @@ private:
/// TODO: should it need protected by a mutex?
std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map;
std::map<int32_t, IRuntimeFilter*> _producer_map;
+ std::map<int32_t, LocalMergeFilters> _local_merge_producer_map;
RuntimeFilterParamsContext* _state = nullptr;
std::unique_ptr<MemTracker> _tracker;
@@ -257,15 +272,6 @@ struct RuntimeFilterParamsContext {
int be_exec_version;
QueryContext* query_ctx;
QueryContext* get_query_ctx() const { return query_ctx; }
- ObjectPool* _obj_pool;
- bool _is_global = false;
- PUniqueId fragment_instance_id() const {
- DCHECK(!_is_global);
- return _fragment_instance_id;
- }
- ObjectPool* obj_pool() const {
- DCHECK(_is_global);
- return _obj_pool;
- }
+ PUniqueId fragment_instance_id() const { return _fragment_instance_id; }
};
} // namespace doris
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d25d914147b..8762366fe04 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -101,7 +101,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
_runtime_filter_mgr.reset(new
RuntimeFilterMgr(fragment_exec_params.query_id,
RuntimeFilterParamsContext::create(this)));
if (fragment_exec_params.__isset.runtime_filter_params) {
-
_runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params);
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ fragment_exec_params.runtime_filter_params);
}
}
@@ -305,11 +306,6 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
return Status::OK();
}
-void RuntimeState::set_runtime_filter_params(
- const TRuntimeFilterParams& runtime_filter_params) const {
- _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params);
-}
-
void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string&
name) {
_query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}",
name, print_id(id)));
@@ -506,4 +502,32 @@ bool RuntimeState::enable_page_cache() const {
(_query_options.__isset.enable_page_cache &&
_query_options.enable_page_cache);
}
+RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() {
+ return _query_ctx->runtime_filter_mgr();
+}
+
+Status RuntimeState::register_producer_runtime_filter(const
doris::TRuntimeFilterDesc& desc,
+ bool need_local_merge,
+ doris::IRuntimeFilter**
producer_filter,
+ bool build_bf_exactly) {
+ if (desc.has_remote_targets || need_local_merge) {
+ return
global_runtime_filter_mgr()->register_local_merge_producer_filter(
+ desc, query_options(), producer_filter, build_bf_exactly);
+ } else {
+ return local_runtime_filter_mgr()->register_producer_filter(
+ desc, query_options(), producer_filter, build_bf_exactly);
+ }
+}
+
+Status RuntimeState::register_consumer_runtime_filter(const
doris::TRuntimeFilterDesc& desc,
+ bool need_local_merge,
int node_id,
+ doris::IRuntimeFilter**
consumer_filter) {
+ if (desc.has_remote_targets || need_local_merge) {
+ return global_runtime_filter_mgr()->register_consumer_filter(desc,
query_options(), node_id,
+
consumer_filter, false, true);
+ } else {
+ return local_runtime_filter_mgr()->register_consumer_filter(desc,
query_options(), node_id,
+
consumer_filter, false, false);
+ }
+}
} // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cdc7e83f042..03b518e5c34 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -44,6 +44,7 @@
#include "util/runtime_profile.h"
namespace doris {
+class IRuntimeFilter;
namespace pipeline {
class PipelineXLocalStateBase;
@@ -98,8 +99,6 @@ public:
Status init(const TUniqueId& fragment_instance_id, const TQueryOptions&
query_options,
const TQueryGlobals& query_globals, ExecEnv* exec_env);
- void set_runtime_filter_params(const TRuntimeFilterParams&
runtime_filter_params) const;
-
// for ut and non-query.
void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; }
void init_mem_trackers(const TUniqueId& id = TUniqueId(), const
std::string& name = "unknown");
@@ -454,7 +453,10 @@ public:
// if load mem limit is not set, or is zero, using query mem limit instead.
int64_t get_load_mem_limit();
- RuntimeFilterMgr* runtime_filter_mgr() {
+ // local runtime filter mgr, the runtime filter do not have remote target
or
+ // not need local merge should regist here. the instance exec finish, the
local
+ // runtime filter mgr can release the memory of local runtime filter
+ RuntimeFilterMgr* local_runtime_filter_mgr() {
if (_pipeline_x_runtime_filter_mgr) {
return _pipeline_x_runtime_filter_mgr;
} else {
@@ -462,6 +464,8 @@ public:
}
}
+ RuntimeFilterMgr* global_runtime_filter_mgr();
+
void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr*
pipeline_x_runtime_filter_mgr) {
_pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
}
@@ -567,6 +571,15 @@ public:
return _task_execution_context;
}
+ Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
+ bool need_local_merge,
+ doris::IRuntimeFilter**
producer_filter,
+ bool build_bf_exactly);
+
+ Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
+ bool need_local_merge, int node_id,
+ doris::IRuntimeFilter**
producer_filter);
+
private:
Status create_error_log_file();
@@ -595,9 +608,6 @@ private:
// owned by PipelineXFragmentContext
RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr;
- // Protects _data_stream_recvrs_pool
- std::mutex _data_stream_recvrs_lock;
-
// Data stream receivers created by a plan fragment are gathered here to
make sure
// they are destroyed before _obj_pool (class members are destroyed in
reverse order).
// Receivers depend on the descriptor table and we need to guarantee that
their control
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 67dffa4b203..de3b63371ee 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -180,9 +180,9 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
#endif
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- _runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i],
- _probe_expr_ctxs.size() == 1));
+
RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i],
false,
+
&_runtime_filters[i],
+
_probe_expr_ctxs.size() == 1));
}
// init left/right output slots flags, only column of slot_id in
_hash_output_slot_ids need
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 150068096ba..ad168ba9c86 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -118,8 +118,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
- RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- _runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i]));
+
RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i],
false,
+
&_runtime_filters[i], false));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs,
_filter_src_expr_ctxs));
return Status::OK();
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index eba11dac45d..e683c4f2be0 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t
filter_id,
_blocked_by_rf = std::make_shared<std::atomic_bool>(false);
}
-Status RuntimeFilterConsumer::init(RuntimeState* state, bool is_global) {
+Status RuntimeFilterConsumer::init(RuntimeState* state, bool need_local_merge)
{
_state = state;
- RETURN_IF_ERROR(_register_runtime_filter(is_global));
+ RETURN_IF_ERROR(_register_runtime_filter(need_local_merge));
return Status::OK();
}
@@ -45,28 +45,15 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile*
profile) {
profile->add_info_string("RuntimeFilters: ", ss.str());
}
-Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) {
+Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) {
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.reserve(filter_size);
_runtime_filter_ready_flag.reserve(filter_size);
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
- if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
- DCHECK(filter_desc.has_remote_targets);
-
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, _state->query_options(), _filter_id,
&runtime_filter, false,
- is_global));
- } else if (is_global) {
- // For pipelineX engine, runtime filter is global iff data
distribution is ignored.
-
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, _state->query_options(), _filter_id,
&runtime_filter, false,
- is_global));
- } else {
-
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, _state->query_options(), _filter_id,
&runtime_filter, false,
- is_global));
- }
+ RETURN_IF_ERROR(_state->register_consumer_runtime_filter(filter_desc,
need_local_merge,
+ _filter_id,
&runtime_filter));
_runtime_filter_ctxs.emplace_back(runtime_filter);
_runtime_filter_ready_flag.emplace_back(false);
}
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/vec/exec/runtime_filter_consumer.h
index 15b9455ac56..b8513e666bc 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -30,7 +30,7 @@ public:
const RowDescriptor& row_descriptor,
VExprContextSPtrs& conjuncts);
~RuntimeFilterConsumer() = default;
- Status init(RuntimeState* state, bool is_global = false);
+ Status init(RuntimeState* state, bool need_local_merge = false);
// Try to append late arrived runtime filters.
// Return num of filters which are applied already.
@@ -42,7 +42,7 @@ public:
protected:
// Register and get all runtime filters at Init phase.
- Status _register_runtime_filter(bool is_global);
+ Status _register_runtime_filter(bool need_local_merge);
// Get all arrived runtime filters at Open phase.
Status _acquire_runtime_filter();
// Append late-arrival runtime filters to the vconjunct_ctx.
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp
b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 42f6250a030..13d19921b03 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -81,13 +81,8 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState*
state) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : _runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
- if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
-
RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, state->query_options(), id(),
&runtime_filter, false));
- } else {
-
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter(
- filter_desc, state->query_options(), id(),
&runtime_filter, false));
- }
+ RETURN_IF_ERROR(
+ state->register_consumer_runtime_filter(filter_desc, false,
id(), &runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ebd6b240028..1093600a485 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2153,7 +2153,8 @@ public class Coordinator implements CoordInterface {
}
for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) {
- ridToBuilderNum.merge(rid, params.instanceExecParams.size(),
Integer::sum);
+ ridToBuilderNum.merge(rid,
+ (int) params.instanceExecParams.stream().map(ins ->
ins.host).distinct().count(), Integer::sum);
}
}
// Use the uppermost fragment as a merged node, the uppermost fragment
has one and only one instance
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]