This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new aabcab9dbe [Improvement](runtime filter) Improve merge phase (#18828)
aabcab9dbe is described below
commit aabcab9dbe8a461ba121aee742d78449e4b20b9f
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 26 21:01:20 2023 +0800
[Improvement](runtime filter) Improve merge phase (#18828)
---
be/src/exprs/bloom_filter_func.h | 2 +
be/src/exprs/runtime_filter.cpp | 235 +++++++++++-----
be/src/exprs/runtime_filter.h | 83 ++++--
be/src/exprs/runtime_filter_rpc.cpp | 4 +-
be/src/olap/bloom_filter_predicate.h | 3 +-
be/src/runtime/fragment_mgr.cpp | 61 ++++-
be/src/runtime/fragment_mgr.h | 3 +
be/src/runtime/query_context.h | 34 ++-
be/src/runtime/runtime_filter_mgr.cpp | 299 ++++++++++++++++-----
be/src/runtime/runtime_filter_mgr.h | 19 +-
be/src/service/internal_service.cpp | 24 ++
be/src/service/internal_service.h | 4 +
be/src/vec/exec/join/vhash_join_node.cpp | 3 +-
be/src/vec/exec/join/vhash_join_node.h | 1 +
be/src/vec/exec/scan/vscan_node.cpp | 31 ++-
.../org/apache/doris/planner/RuntimeFilter.java | 5 +
.../main/java/org/apache/doris/qe/Coordinator.java | 43 ++-
gensrc/proto/internal_service.proto | 14 +
gensrc/thrift/PaloInternalService.thrift | 8 +
gensrc/thrift/PlanNodes.thrift | 2 +
20 files changed, 688 insertions(+), 190 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 11831ccdc0..6182bfe781 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -204,6 +204,8 @@ public:
return Status::OK();
}
+ size_t get_size() const { return _bloom_filter ? _bloom_filter->size() :
0; }
+
void light_copy(BloomFilterFuncBase* bloomfilter_func) {
auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e2ec2d57a2..2a9d7cc24f 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -348,28 +348,52 @@ public:
RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
const RuntimeFilterParams* params)
: _state(state),
+ _be_exec_version(_state->be_exec_version()),
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
- _fragment_instance_id(params->fragment_instance_id),
_filter_id(params->filter_id),
-
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
-
_column_return_type)),
- _use_new_hash(_state->be_exec_version() >= 2) {}
+ _use_batch(
+ IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
+ _use_new_hash(_be_exec_version >= 2) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
PrimitiveType column_type,
- RuntimeFilterType type, UniqueId
fragment_instance_id,
- uint32_t filter_id)
+ RuntimeFilterType type, uint32_t filter_id)
: _state(state),
+ _be_exec_version(_state->be_exec_version()),
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
- _fragment_instance_id(fragment_instance_id),
_filter_id(filter_id),
-
_use_batch(IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
-
_column_return_type)),
- _use_new_hash(_state->be_exec_version() >= 2) {}
+ _use_batch(
+ IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
+ _use_new_hash(_be_exec_version >= 2) {}
+
+ RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
+ const RuntimeFilterParams* params)
+ : _query_ctx(query_ctx),
+ _be_exec_version(_query_ctx->be_exec_version()),
+ _pool(pool),
+ _column_return_type(params->column_return_type),
+ _filter_type(params->filter_type),
+ _filter_id(params->filter_id),
+ _use_batch(
+ IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
+ _use_new_hash(_be_exec_version >= 2) {}
+ // for a 'tmp' runtime predicate wrapper
+ // only could called assign method or as a param for merge
+ RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
PrimitiveType column_type,
+ RuntimeFilterType type, uint32_t filter_id)
+ : _query_ctx(query_ctx),
+ _be_exec_version(_query_ctx->be_exec_version()),
+ _pool(pool),
+ _column_return_type(column_type),
+ _filter_type(type),
+ _filter_id(filter_id),
+ _use_batch(
+ IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
+ _use_new_hash(_be_exec_version >= 2) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
Status init(const RuntimeFilterParams* params) {
@@ -542,8 +566,7 @@ public:
void insert_batch(const vectorized::ColumnPtr column, const
std::vector<int>& rows) {
if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
bitmap_filter_insert_batch(column, rows);
- } else if (IRuntimeFilter::enable_use_batch(_state->be_exec_version(),
- _column_return_type)) {
+ } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)) {
insert_fixed_len(column->get_raw_data().data, rows.data(),
rows.size());
} else {
for (int index : rows) {
@@ -571,7 +594,14 @@ public:
return real_filter_type;
}
- Status get_push_vexprs(std::vector<vectorized::VExpr*>* container,
RuntimeState* state,
+ size_t get_bloom_filter_size() {
+ if (_is_bloomfilter) {
+ return _context.bloom_filter_func->get_size();
+ }
+ return 0;
+ }
+
+ Status get_push_vexprs(std::vector<vectorized::VExpr*>* container,
vectorized::VExprContext* prob_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
@@ -583,7 +613,6 @@ public:
_filter_type != wrapper->_filter_type;
CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
- << "fragment instance " << _fragment_instance_id.to_string()
<< " can not merge runtime filter(id=" << _filter_id
<< "), current is filter type is " << to_string(_filter_type)
<< ", other filter type is " <<
to_string(wrapper->_filter_type);
@@ -593,8 +622,7 @@ public:
if (_is_ignored_in_filter) {
break;
} else if (wrapper->_is_ignored_in_filter) {
- VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " ignore merge runtime filter(in filter id " <<
_filter_id
+ VLOG_DEBUG << " ignore merge runtime filter(in filter id " <<
_filter_id
<< ") because: " <<
*(wrapper->get_ignored_in_filter_msg());
_is_ignored_in_filter = true;
@@ -608,8 +636,7 @@ public:
if (_max_in_num >= 0 && _context.hybrid_set->size() >=
_max_in_num) {
#ifdef VLOG_DEBUG_IS_ON
std::stringstream msg;
- msg << "fragment instance " <<
_fragment_instance_id.to_string()
- << " ignore merge runtime filter(in filter id " <<
_filter_id
+ msg << " ignore merge runtime filter(in filter id " <<
_filter_id
<< ") because: in_num(" << _context.hybrid_set->size() <<
") >= max_in_num("
<< _max_in_num << ")";
_ignored_in_filter_msg = _pool->add(new
std::string(msg.str()));
@@ -637,22 +664,19 @@ public:
if (real_filter_type == RuntimeFilterType::IN_FILTER) {
if (wrapper->_filter_type == RuntimeFilterType::IN_FILTER) {
// in merge in
CHECK(!wrapper->_is_ignored_in_filter)
- << "fragment instance " <<
_fragment_instance_id.to_string()
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
<< *(wrapper->get_ignored_in_filter_msg());
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
if (_max_in_num >= 0 && _context.hybrid_set->size() >=
_max_in_num) {
- VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " change runtime filter to bloom
filter(id=" << _filter_id
+ VLOG_DEBUG << " change runtime filter to bloom
filter(id=" << _filter_id
<< ") because: in_num(" <<
_context.hybrid_set->size()
<< ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
}
// in merge bloom filter
} else {
- VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
- << " change runtime filter to bloom filter(id="
<< _filter_id
+ VLOG_DEBUG << " change runtime filter to bloom filter(id="
<< _filter_id
<< ") because: already exist a bloom filter";
change_to_bloom_filter();
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
@@ -661,7 +685,6 @@ public:
if (wrapper->_filter_type ==
RuntimeFilterType::IN_FILTER) { // bloom filter merge in
CHECK(!wrapper->_is_ignored_in_filter)
- << "fragment instance " <<
_fragment_instance_id.to_string()
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
<< *(wrapper->get_ignored_in_filter_msg());
@@ -1036,6 +1059,8 @@ public:
private:
RuntimeState* _state;
+ QueryContext* _query_ctx;
+ int _be_exec_version;
ObjectPool* _pool;
// When a runtime filter received from remote and it is a bloom filter,
_column_return_type will be invalid.
@@ -1047,7 +1072,6 @@ private:
bool _is_bloomfilter = false;
bool _is_ignored_in_filter = false;
std::string* _ignored_in_filter_msg = nullptr;
- UniqueId _fragment_instance_id;
uint32_t _filter_id;
// When _column_return_type is invalid, _use_batch will be always false.
@@ -1063,9 +1087,16 @@ Status IRuntimeFilter::create(RuntimeState* state,
ObjectPool* pool, const TRunt
int node_id, IRuntimeFilter** res, bool
build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(state, pool));
(*res)->set_role(role);
- UniqueId fragment_instance_id(state->fragment_instance_id());
- return (*res)->init_with_desc(desc, query_options, fragment_instance_id,
node_id,
- build_bf_exactly);
+ return (*res)->init_with_desc(desc, query_options, node_id,
build_bf_exactly);
+}
+
+Status IRuntimeFilter::create(QueryContext* query_ctx, ObjectPool* pool,
+ const TRuntimeFilterDesc* desc, const
TQueryOptions* query_options,
+ const RuntimeFilterRole role, int node_id,
IRuntimeFilter** res,
+ bool build_bf_exactly) {
+ *res = pool->add(new IRuntimeFilter(query_ctx, pool));
+ (*res)->set_role(role);
+ return (*res)->init_with_desc(desc, query_options, node_id,
build_bf_exactly);
}
void
IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext&
context) {
@@ -1100,6 +1131,7 @@ Status IRuntimeFilter::publish() {
if (_has_local_target) {
IRuntimeFilter* consumer_filter = nullptr;
// TODO: log if err
+ DCHECK(_state != nullptr);
RETURN_IF_ERROR(
_state->runtime_filter_mgr()->get_consume_filter(_filter_id,
&consumer_filter));
// push down
@@ -1109,8 +1141,9 @@ Status IRuntimeFilter::publish() {
return Status::OK();
} else {
TNetworkAddress addr;
+ DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr));
- return push_to_remote(_state, &addr);
+ return push_to_remote(_state, &addr, _opt_remote_rf);
}
}
@@ -1124,7 +1157,7 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
if (!_is_ignored) {
_set_push_down();
_profile->add_info_string("Info", _format_status());
- return _wrapper->get_push_vexprs(push_vexprs, _state, _vprobe_ctx);
+ return _wrapper->get_push_vexprs(push_vexprs, _vprobe_ctx);
} else {
_profile->add_info_string("Info", _format_status());
return Status::OK();
@@ -1132,32 +1165,35 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_
}
Status IRuntimeFilter::get_prepared_vexprs(std::vector<vectorized::VExpr*>*
vexprs,
- const RowDescriptor& desc) {
+ const RowDescriptor& desc,
RuntimeState* state) {
_profile->add_info_string("Info", _format_status());
if (_is_ignored) {
return Status::OK();
}
- DCHECK((!_state->enable_pipeline_exec() && _rf_state ==
RuntimeFilterState::READY) ||
- (_state->enable_pipeline_exec() &&
+ DCHECK((!_enable_pipeline_exec && _rf_state == RuntimeFilterState::READY)
||
+ (_enable_pipeline_exec &&
_rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY));
DCHECK(is_consumer());
std::lock_guard guard(_inner_mutex);
if (_push_down_vexprs.empty()) {
- RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs, _state,
_vprobe_ctx));
+ RETURN_IF_ERROR(_wrapper->get_push_vexprs(&_push_down_vexprs,
_vprobe_ctx));
}
- // push expr
vexprs->insert(vexprs->end(), _push_down_vexprs.begin(),
_push_down_vexprs.end());
return Status::OK();
}
bool IRuntimeFilter::await() {
DCHECK(is_consumer());
+ auto execution_timeout = _state == nullptr ?
_query_ctx->execution_timeout() * 1000
+ : _state->execution_timeout() *
1000;
+ auto runtime_filter_wait_time_ms = _state == nullptr ?
_query_ctx->runtime_filter_wait_time_ms()
+ :
_state->runtime_filter_wait_time_ms();
// bitmap filter is precise filter and only filter once, so it must be
applied.
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
- ? _state->execution_timeout() * 1000
- : _state->runtime_filter_wait_time_ms();
- if (_state->enable_pipeline_exec()) {
+ ? execution_timeout
+ : runtime_filter_wait_time_ms;
+ if (_enable_pipeline_exec) {
auto expected = _rf_state_atomic.load(std::memory_order_acquire);
if (expected == RuntimeFilterState::NOT_READY) {
if (!_rf_state_atomic.compare_exchange_strong(
@@ -1203,12 +1239,16 @@ bool IRuntimeFilter::await() {
bool IRuntimeFilter::is_ready_or_timeout() {
DCHECK(is_consumer());
auto cur_state = _rf_state_atomic.load(std::memory_order_acquire);
+ auto execution_timeout = _state == nullptr ?
_query_ctx->execution_timeout() * 1000
+ : _state->execution_timeout() *
1000;
+ auto runtime_filter_wait_time_ms = _state == nullptr ?
_query_ctx->runtime_filter_wait_time_ms()
+ :
_state->runtime_filter_wait_time_ms();
// bitmap filter is precise filter and only filter once, so it must be
applied.
int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
- ? _state->execution_timeout() * 1000
- : _state->runtime_filter_wait_time_ms();
+ ? execution_timeout
+ : runtime_filter_wait_time_ms;
int64_t ms_since_registration = MonotonicMillis() - registration_time_;
- if (!_state->enable_pipeline_exec()) {
+ if (!_enable_pipeline_exec) {
_rf_state = RuntimeFilterState::TIME_OUT;
return true;
} else if (is_ready()) {
@@ -1245,7 +1285,7 @@ bool IRuntimeFilter::is_ready_or_timeout() {
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
- if (_state->enable_pipeline_exec()) {
+ if (_enable_pipeline_exec) {
_rf_state_atomic.store(RuntimeFilterState::READY);
} else {
std::unique_lock lock(_inner_mutex);
@@ -1261,6 +1301,10 @@ void IRuntimeFilter::signal() {
_profile->add_info_string("BitmapSize",
std::to_string(bitmap_filter->size()));
_profile->add_info_string("IsNotIn", bitmap_filter->is_not_in() ?
"true" : "false");
}
+ if (_wrapper->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
+ _profile->add_info_string("BloomFilterSize",
+
std::to_string(_wrapper->get_bloom_filter_size()));
+ }
}
BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter() const {
@@ -1268,8 +1312,7 @@ BloomFilterFuncBase* IRuntimeFilter::get_bloomfilter()
const {
}
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const
TQueryOptions* options,
- UniqueId fragment_instance_id, int
node_id,
- bool build_bf_exactly) {
+ int node_id, bool build_bf_exactly) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));
@@ -1292,15 +1335,19 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
_has_remote_target = desc->has_remote_targets;
_expr_order = desc->expr_order;
_filter_id = desc->filter_id;
+ _opt_remote_rf = desc->__isset.opt_remote_rf && desc->opt_remote_rf;
vectorized::VExprContext* build_ctx = nullptr;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool, desc->src_expr,
&build_ctx));
RuntimeFilterParams params;
- params.fragment_instance_id = fragment_instance_id;
params.filter_id = _filter_id;
params.filter_type = _runtime_filter_type;
params.column_return_type = build_ctx->root()->type().type;
params.max_in_num = options->runtime_filter_max_in_num;
+ // We build runtime filter by exact distinct count iff three conditions
are met:
+ // 1. Only 1 join key
+ // 2. Do not have remote target (e.g. do not need to merge)
+ // 3. Bloom filter
params.build_bf_exactly = build_bf_exactly && !_has_remote_target &&
_runtime_filter_type ==
RuntimeFilterType::BLOOM_FILTER;
if (desc->__isset.bloom_filter_size_bytes) {
@@ -1334,7 +1381,11 @@ Status IRuntimeFilter::init_with_desc(const
TRuntimeFilterDesc* desc, const TQue
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_pool,
iter->second, &_vprobe_ctx));
}
- _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool, ¶ms));
+ if (_state) {
+ _wrapper = _pool->add(new RuntimePredicateWrapper(_state, _pool,
¶ms));
+ } else {
+ _wrapper = _pool->add(new RuntimePredicateWrapper(_query_ctx, _pool,
¶ms));
+ }
return _wrapper->init(¶ms);
}
@@ -1346,6 +1397,10 @@ Status IRuntimeFilter::serialize(PPublishFilterRequest*
request, void** data, in
return serialize_impl(request, data, len);
}
+Status IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void**
data, int* len) {
+ return serialize_impl(request, data, len);
+}
+
Status IRuntimeFilter::create_wrapper(RuntimeState* state, const
MergeRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
@@ -1358,6 +1413,35 @@ Status IRuntimeFilter::create_wrapper(RuntimeState*
state, const UpdateRuntimeFi
return _create_wrapper(state, param, pool, wrapper);
}
+Status IRuntimeFilter::create_wrapper(QueryContext* query_ctx,
+ const UpdateRuntimeFilterParamsV2*
param, ObjectPool* pool,
+
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
+ int filter_type = param->request->filter_type();
+ PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
+ if (param->request->has_in_filter()) {
+ column_type =
to_primitive_type(param->request->in_filter().column_type());
+ }
+ wrapper->reset(new RuntimePredicateWrapper(query_ctx, pool, column_type,
get_type(filter_type),
+ param->request->filter_id()));
+
+ switch (filter_type) {
+ case PFilterType::IN_FILTER: {
+ DCHECK(param->request->has_in_filter());
+ return (*wrapper)->assign(¶m->request->in_filter());
+ }
+ case PFilterType::BLOOM_FILTER: {
+ DCHECK(param->request->has_bloom_filter());
+ return (*wrapper)->assign(¶m->request->bloom_filter(),
param->data);
+ }
+ case PFilterType::MINMAX_FILTER: {
+ DCHECK(param->request->has_minmax_filter());
+ return (*wrapper)->assign(¶m->request->minmax_filter());
+ }
+ default:
+ return Status::InvalidArgument("unknown filter type");
+ }
+}
+
void IRuntimeFilter::change_to_bloom_filter() {
auto origin_type = _wrapper->get_real_type();
_wrapper->change_to_bloom_filter();
@@ -1379,7 +1463,6 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState*
state, const T* param, Obje
column_type =
to_primitive_type(param->request->in_filter().column_type());
}
wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type,
get_type(filter_type),
-
UniqueId(param->request->fragment_id()),
param->request->filter_id()));
switch (filter_type) {
@@ -1401,11 +1484,22 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState*
state, const T* param, Obje
}
void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
- DCHECK(parent_profile != nullptr);
- _profile.reset(new RuntimeProfile(fmt::format("RuntimeFilter: (id = {},
type = {})", _filter_id,
-
::doris::to_string(_runtime_filter_type))));
+ if (_profile_init) {
+ return;
+ }
+ {
+ std::lock_guard guard(_inner_mutex);
+ if (_profile_init) {
+ return;
+ }
+ DCHECK(parent_profile != nullptr);
+ _profile.reset(
+ new RuntimeProfile(fmt::format("RuntimeFilter: (id = {}, type
= {})", _filter_id,
+
::doris::to_string(_runtime_filter_type))));
+ _profile_init = true;
+ }
parent_profile->add_child(_profile.get(), true, nullptr);
- if (!_state->enable_pipeline_exec()) {
+ if (!_enable_pipeline_exec) {
_await_time_cost = ADD_TIMER(_profile, "AWaitTimeCost");
}
_profile->add_info_string("Info", _format_status());
@@ -1421,10 +1515,6 @@ void
IRuntimeFilter::update_runtime_filter_type_to_profile() {
}
}
-void IRuntimeFilter::set_push_down_profile() {
- _profile->add_info_string("HasPushDownToEngine", "true");
-}
-
void IRuntimeFilter::ready_for_publish() {
_wrapper->ready_for_publish();
}
@@ -1446,10 +1536,6 @@ Status IRuntimeFilter::merge_from(const
RuntimePredicateWrapper* wrapper) {
return status;
}
-const RuntimePredicateWrapper* IRuntimeFilter::get_wrapper() {
- return _wrapper;
-}
-
template <typename T>
void batch_copy(PInFilter* filter, HybridSetBase::IteratorBase* it,
void (*set_func)(PColumnValue*, const T*)) {
@@ -1752,21 +1838,46 @@ Status IRuntimeFilter::update_filter(const
UpdateRuntimeFilterParams* param) {
return Status::OK();
}
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param)
{
+ int64_t start_update = MonotonicMillis();
+ if (param->request->has_in_filter() &&
param->request->in_filter().has_ignored_msg()) {
+ set_ignored();
+ const PInFilter in_filter = param->request->in_filter();
+ auto msg = param->pool->add(new std::string(in_filter.ignored_msg()));
+ set_ignored_msg(*msg);
+ }
+
+ std::unique_ptr<RuntimePredicateWrapper> tmp_wrapper;
+ RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_query_ctx, param, _pool,
&tmp_wrapper));
+ auto origin_type = _wrapper->get_real_type();
+ RETURN_IF_ERROR(_wrapper->merge(tmp_wrapper.get()));
+ if (origin_type != _wrapper->get_real_type()) {
+ update_runtime_filter_type_to_profile();
+ }
+ this->signal();
+
+ _profile->add_info_string("MergeTime",
std::to_string(param->request->merge_time()) + " ms");
+ _profile->add_info_string("UpdateTime",
+ std::to_string(MonotonicMillis() - start_update)
+ " ms");
+ return Status::OK();
+}
+
Status IRuntimeFilter::consumer_close() {
DCHECK(is_consumer());
return Status::OK();
}
Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<vectorized::VExpr*>*
container,
- RuntimeState* state,
vectorized::VExprContext*
vprob_expr) {
- DCHECK(state != nullptr);
DCHECK(container != nullptr);
DCHECK(_pool != nullptr);
DCHECK(vprob_expr->root()->type().type == _column_return_type ||
(is_string_type(vprob_expr->root()->type().type) &&
is_string_type(_column_return_type)) ||
- _filter_type == RuntimeFilterType::BITMAP_FILTER);
+ _filter_type == RuntimeFilterType::BITMAP_FILTER)
+ << " vprob_expr->root()->type().type: " <<
vprob_expr->root()->type().type
+ << " _column_return_type: " << _column_return_type
+ << " _filter_type: " << ::doris::to_string(_filter_type);
auto real_filter_type = get_real_type();
switch (real_filter_type) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a2f6995d59..52191b5182 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -32,6 +32,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/lock.h"
@@ -51,6 +52,7 @@ namespace doris {
class ObjectPool;
class RuntimePredicateWrapper;
class PPublishFilterRequest;
+class PPublishFilterRequestV2;
class PMergeFilterRequest;
class TRuntimeFilterDesc;
class RowDescriptor;
@@ -106,7 +108,6 @@ struct RuntimeFilterParams {
bloom_filter_size(-1),
max_in_num(0),
filter_id(0),
- fragment_instance_id(0, 0),
bitmap_filter_not_in(false) {}
RuntimeFilterType filter_type;
@@ -115,7 +116,6 @@ struct RuntimeFilterParams {
int64_t bloom_filter_size;
int32_t max_in_num;
int32_t filter_id;
- UniqueId fragment_instance_id;
bool bitmap_filter_not_in;
bool build_bf_exactly;
};
@@ -129,6 +129,16 @@ struct UpdateRuntimeFilterParams {
ObjectPool* pool;
};
+struct UpdateRuntimeFilterParamsV2 {
+ UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req,
+ butil::IOBufAsZeroCopyInputStream* data_stream,
+ ObjectPool* obj_pool)
+ : request(req), data(data_stream), pool(obj_pool) {}
+ const PPublishFilterRequestV2* request;
+ butil::IOBufAsZeroCopyInputStream* data;
+ ObjectPool* pool;
+};
+
struct MergeRuntimeFilterParams {
MergeRuntimeFilterParams(const PMergeFilterRequest* req,
butil::IOBufAsZeroCopyInputStream* data_stream)
@@ -164,7 +174,25 @@ public:
_expr_order(-1),
_always_true(false),
_is_ignored(false),
- registration_time_(MonotonicMillis()) {}
+ registration_time_(MonotonicMillis()),
+ _enable_pipeline_exec(_state->enable_pipeline_exec()) {}
+
+ IRuntimeFilter(QueryContext* query_ctx, ObjectPool* pool)
+ : _query_ctx(query_ctx),
+ _pool(pool),
+ _runtime_filter_type(RuntimeFilterType::UNKNOWN_FILTER),
+ _filter_id(-1),
+ _is_broadcast_join(true),
+ _has_remote_target(false),
+ _has_local_target(false),
+ _rf_state(RuntimeFilterState::NOT_READY),
+ _rf_state_atomic(RuntimeFilterState::NOT_READY),
+ _role(RuntimeFilterRole::PRODUCER),
+ _expr_order(-1),
+ _always_true(false),
+ _is_ignored(false),
+ registration_time_(MonotonicMillis()),
+ _enable_pipeline_exec(query_ctx->enable_pipeline_exec()) {}
~IRuntimeFilter() = default;
@@ -172,6 +200,10 @@ public:
const TQueryOptions* query_options, const
RuntimeFilterRole role,
int node_id, IRuntimeFilter** res, bool
build_bf_exactly = false);
+ static Status create(QueryContext* query_ctx, ObjectPool* pool, const
TRuntimeFilterDesc* desc,
+ const TQueryOptions* query_options, const
RuntimeFilterRole role,
+ int node_id, IRuntimeFilter** res, bool
build_bf_exactly = false);
+
void copy_to_shared_context(vectorized::SharedRuntimeFilterContext&
context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext&
context);
@@ -192,20 +224,19 @@ public:
Status get_push_expr_ctxs(std::vector<vectorized::VExpr*>* push_vexprs);
Status get_prepared_vexprs(std::vector<doris::vectorized::VExpr*>*
push_vexprs,
- const RowDescriptor& desc);
+ const RowDescriptor& desc, RuntimeState* state);
bool is_broadcast_join() const { return _is_broadcast_join; }
bool has_remote_target() const { return _has_remote_target; }
bool is_ready() const {
- return (!_state->enable_pipeline_exec() && _rf_state ==
RuntimeFilterState::READY) ||
- (_state->enable_pipeline_exec() &&
+ return (!_enable_pipeline_exec && _rf_state ==
RuntimeFilterState::READY) ||
+ (_enable_pipeline_exec &&
_rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY);
}
RuntimeFilterState current_state() const {
- return _state->enable_pipeline_exec() ?
_rf_state_atomic.load(std::memory_order_acquire)
- : _rf_state;
+ return _enable_pipeline_exec ?
_rf_state_atomic.load(std::memory_order_acquire) : _rf_state;
}
bool is_ready_or_timeout();
@@ -226,33 +257,34 @@ public:
// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions*
options,
- UniqueId fragment_id, int node_id = -1, bool
build_bf_exactly = false);
+ int node_id = -1, bool build_bf_exactly = false);
BloomFilterFuncBase* get_bloomfilter() const;
// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Status serialize(PPublishFilterRequest* request, void** data = nullptr,
int* len = nullptr);
+ Status serialize(PPublishFilterRequestV2* request, void** data = nullptr,
int* len = nullptr);
Status merge_from(const RuntimePredicateWrapper* wrapper);
// for ut
- const RuntimePredicateWrapper* get_wrapper();
static Status create_wrapper(RuntimeState* state, const
MergeRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
static Status create_wrapper(RuntimeState* state, const
UpdateRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
+ static Status create_wrapper(QueryContext* query_ctx, const
UpdateRuntimeFilterParamsV2* param,
+ ObjectPool* pool,
+ std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
void change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Status update_filter(const UpdateRuntimeFilterParams* param);
+ Status update_filter(const UpdateRuntimeFilterParamsV2* param);
void set_ignored() { _is_ignored = true; }
- // for ut
- bool is_ignored() const { return _is_ignored; }
-
void set_ignored_msg(std::string& msg) { _ignored_msg = msg; }
// for ut
@@ -262,21 +294,17 @@ public:
Status consumer_close();
// async push runtimefilter to remote node
- Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
+ Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr,
bool opt_remote_rf);
Status join_rpc();
void init_profile(RuntimeProfile* parent_profile);
void update_runtime_filter_type_to_profile();
- void set_push_down_profile();
-
void ready_for_publish();
- std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const;
-
- static bool enable_use_batch(int be_exec_version, PrimitiveType type) {
- return be_exec_version > 0 && (is_int_or_bool(type) ||
is_float_or_double(type));
+ static bool enable_use_batch(bool use_batch, PrimitiveType type) {
+ return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
}
int filter_id() const { return _filter_id; }
@@ -304,7 +332,7 @@ protected:
}
std::string _get_explain_state_string() {
- if (_state->enable_pipeline_exec()) {
+ if (_enable_pipeline_exec) {
return _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY
? "READY"
: _rf_state_atomic.load(std::memory_order_acquire) ==
@@ -318,11 +346,12 @@ protected:
}
}
- RuntimeState* _state;
+ RuntimeState* _state = nullptr;
+ QueryContext* _query_ctx = nullptr;
ObjectPool* _pool;
// _wrapper is a runtime filter function wrapper
// _wrapper should alloc from _pool
- RuntimePredicateWrapper* _wrapper = nullptr;
+ RuntimePredicateWrapper* _wrapper;
// runtime filter type
RuntimeFilterType _runtime_filter_type;
// runtime filter id
@@ -350,7 +379,7 @@ protected:
// this filter won't filter any data
bool _always_true;
- doris::vectorized::VExprContext* _vprobe_ctx;
+ doris::vectorized::VExprContext* _vprobe_ctx = nullptr;
// Indicate whether runtime filter expr has been ignored
bool _is_ignored;
@@ -370,6 +399,12 @@ protected:
/// Time in ms (from MonotonicMillis()), that the filter was registered.
const int64_t registration_time_;
+
+ const bool _enable_pipeline_exec;
+
+ bool _profile_init = false;
+
+ bool _opt_remote_rf;
};
// avoid expose RuntimePredicateWrapper
diff --git a/be/src/exprs/runtime_filter_rpc.cpp
b/be/src/exprs/runtime_filter_rpc.cpp
index 8a5c4acb9b..829224f3f2 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -45,7 +45,8 @@ struct IRuntimeFilter::rpc_context {
brpc::CallId cid;
};
-Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr,
+ bool opt_remote_rf) {
DCHECK(is_producer());
DCHECK(_rpc_context == nullptr);
std::shared_ptr<PBackendService_Stub> stub(
@@ -69,6 +70,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state,
const TNetworkAddress
pfragment_instance_id->set_lo(state->fragment_instance_id().lo);
_rpc_context->request.set_filter_id(_filter_id);
+ _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
_rpc_context->request.set_is_pipeline(state->enable_pipeline_exec());
_rpc_context->cntl.set_timeout_ms(1000);
_rpc_context->cid = _rpc_context->cntl.call_id();
diff --git a/be/src/olap/bloom_filter_predicate.h
b/be/src/olap/bloom_filter_predicate.h
index 9090ea86a3..73c2e9fba1 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -89,8 +89,7 @@ private:
}
}
}
-
- } else if (IRuntimeFilter::enable_use_batch(_be_exec_version, T)) {
+ } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) {
const auto& data =
reinterpret_cast<
const
vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7a7ec74835..e2a8802fd9 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -18,6 +18,7 @@
#include "runtime/fragment_mgr.h"
#include <bvar/latency_recorder.h>
+#include <exprs/runtime_filter.h>
#include <fmt/format.h>
#include <gen_cpp/DorisExternalService_types.h>
#include <gen_cpp/FrontendService.h>
@@ -640,7 +641,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
- query_ctx = QueryContext::create_shared(params.fragment_num_on_host,
_exec_env);
+ query_ctx = QueryContext::create_shared(params.fragment_num_on_host,
_exec_env,
+ params.query_options);
query_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
&(query_ctx->desc_tbl)));
@@ -1155,10 +1157,65 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
return runtime_filter_mgr->update_filter(request, attach_data);
}
+Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
+ butil::IOBufAsZeroCopyInputStream*
attach_data) {
+ bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+
+ const auto& fragment_instance_ids = request->fragment_instance_ids();
+ if (fragment_instance_ids.size() > 0) {
+ UniqueId fragment_instance_id = fragment_instance_ids[0];
+ TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
+
+ std::shared_ptr<FragmentExecState> fragment_state;
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+ RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+ ObjectPool* pool;
+ if (is_pipeline) {
+ std::unique_lock<std::mutex> lock(_lock);
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
+ }
+ pip_context = iter->second;
+
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr =
+
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
+ pool = &pip_context->get_query_context()->obj_pool;
+ } else {
+ std::unique_lock<std::mutex> lock(_lock);
+ auto iter = _fragment_map.find(tfragment_instance_id);
+ if (iter == _fragment_map.end()) {
+ VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
+ return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
+ }
+ fragment_state = iter->second;
+
+ DCHECK(fragment_state != nullptr);
+ runtime_filter_mgr = fragment_state->executor()
+ ->runtime_state()
+ ->get_query_ctx()
+ ->runtime_filter_mgr();
+ pool = &fragment_state->get_query_ctx()->obj_pool;
+ }
+
+ UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
+ int filter_id = request->filter_id();
+ IRuntimeFilter* real_filter = nullptr;
+ RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filter(filter_id,
&real_filter));
+ RETURN_IF_ERROR(real_filter->update_filter(¶ms));
+ }
+
+ return Status::OK();
+}
+
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream*
attach_data) {
UniqueId queryid = request->query_id();
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
+ bool opt_remote_rf = request->has_opt_remote_rf() &&
request->opt_remote_rf();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
@@ -1189,7 +1246,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
// when filter_controller->merge is still in progress
fragment_state = iter->second;
}
- RETURN_IF_ERROR(filter_controller->merge(request, attach_data));
+ RETURN_IF_ERROR(filter_controller->merge(request, attach_data,
opt_remote_rf));
return Status::OK();
}
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index ad7f830a7f..8ca58ccffa 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -129,6 +129,9 @@ public:
Status apply_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
+ Status apply_filterv2(const PPublishFilterRequestV2* request,
+ butil::IOBufAsZeroCopyInputStream* attach_data);
+
Status merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b941f0fe86..0909702c70 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -30,6 +30,7 @@
#include "runtime/datetime_value.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "task_group/task_group.h"
#include "util/pretty_printer.h"
@@ -48,8 +49,12 @@ class QueryContext {
ENABLE_FACTORY_CREATOR(QueryContext);
public:
- QueryContext(int total_fragment_num, ExecEnv* exec_env)
- : fragment_num(total_fragment_num), timeout_second(-1),
_exec_env(exec_env) {
+ QueryContext(int total_fragment_num, ExecEnv* exec_env, const
TQueryOptions& query_options)
+ : fragment_num(total_fragment_num),
+ timeout_second(-1),
+ _exec_env(exec_env),
+ _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
+ _query_options(query_options) {
_start_time = vectorized::VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new
vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new
vectorized::SharedScannerController());
@@ -139,6 +144,29 @@ public:
taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
+ int execution_timeout() const {
+ return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
+ :
_query_options.query_timeout;
+ }
+
+ int32_t runtime_filter_wait_time_ms() const {
+ return _query_options.runtime_filter_wait_time_ms;
+ }
+
+ bool enable_pipeline_exec() const {
+ return _query_options.__isset.enable_pipeline_engine &&
+ _query_options.enable_pipeline_engine;
+ }
+
+ int be_exec_version() const {
+ if (!_query_options.__isset.be_exec_version) {
+ return 0;
+ }
+ return _query_options.be_exec_version;
+ }
+
+ RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
+
public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
@@ -186,6 +214,8 @@ private:
vectorized::RuntimePredicate _runtime_predicate;
taskgroup::TaskGroupPtr _task_group;
+ std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
+ const TQueryOptions _query_options;
};
} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index e956122130..b6dfa0a625 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -35,6 +35,7 @@
#include "exprs/runtime_filter.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/brpc_client_cache.h"
@@ -51,10 +52,12 @@ struct async_rpc_context {
RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState*
state) : _state(state) {}
+RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext*
query_ctx)
+ : _query_ctx(query_ctx) {}
+
RuntimeFilterMgr::~RuntimeFilterMgr() {}
Status RuntimeFilterMgr::init() {
- DCHECK(_state->query_mem_tracker() != nullptr);
_tracker = std::make_unique<MemTracker>("RuntimeFilterMgr",
ExecEnv::GetInstance()->experimental_mem_tracker());
return Status::OK();
@@ -107,20 +110,41 @@ Status RuntimeFilterMgr::register_filter(const
RuntimeFilterRole role,
VLOG_NOTICE << "regist filter...:" << key << ",role:" << (int)role;
auto iter = filter_map->find(key);
- if (iter != filter_map->end()) {
- return Status::InvalidArgument("filter has registed");
- }
RuntimeFilterMgrVal filter_mgr_val;
filter_mgr_val.role = role;
- RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
role, node_id,
- &filter_mgr_val.filter,
build_bf_exactly));
-
- filter_map->emplace(key, filter_mgr_val);
+ if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && role ==
RuntimeFilterRole::CONSUMER &&
+ desc.has_remote_targets && desc.type == TRuntimeFilterType::BLOOM) {
+ // if this runtime filter has remote target (e.g. need merge), we
reuse the runtime filter between all instances
+ DCHECK(_query_ctx != nullptr);
+ if (iter != filter_map->end()) {
+ return Status::OK();
+ }
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ iter = filter_map->find(key);
+ if (iter != filter_map->end()) {
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx,
&_query_ctx->obj_pool, &desc,
+ &options, role, node_id,
&filter_mgr_val.filter,
+ build_bf_exactly));
+ filter_map->emplace(key, filter_mgr_val);
+ }
+ } else {
+ DCHECK(_state != nullptr);
+ if (iter != filter_map->end()) {
+ return Status::InvalidArgument("filter has registed");
+ }
+ RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc,
&options, role, node_id,
+ &filter_mgr_val.filter,
build_bf_exactly));
+ filter_map->emplace(key, filter_mgr_val);
+ }
return Status::OK();
}
+
Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request,
butil::IOBufAsZeroCopyInputStream*
data) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
@@ -158,12 +182,34 @@ Status
RuntimeFilterMergeControllerEntity::_init_with_desc(
cntVal->runtime_filter_desc = *runtime_filter_desc;
cntVal->target_info = *target_info;
cntVal->pool.reset(new ObjectPool());
- cntVal->filter = cntVal->pool->add(new IRuntimeFilter(_state,
cntVal->pool.get()));
+ cntVal->filter =
+ cntVal->pool->add(new IRuntimeFilter(_state,
&_state->get_query_ctx()->obj_pool));
std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
// LOG(INFO) << "entity filter id:" << filter_id;
- cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options,
- _fragment_instance_id, -1, false);
+ cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc,
query_options, -1, false);
+ _filter_map.emplace(filter_id, cntVal);
+ return Status::OK();
+}
+
+Status RuntimeFilterMergeControllerEntity::_init_with_desc(
+ const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions*
query_options,
+ const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
+ const int producer_size) {
+ std::lock_guard<std::mutex> guard(_filter_map_mutex);
+ std::shared_ptr<RuntimeFilterCntlVal> cntVal =
std::make_shared<RuntimeFilterCntlVal>();
+ // runtime_filter_desc and target will be released,
+ // so we need to copy to cntVal
+ cntVal->producer_size = producer_size;
+ cntVal->runtime_filter_desc = *runtime_filter_desc;
+ cntVal->targetv2_info = *targetv2_info;
+ cntVal->pool.reset(new ObjectPool());
+ cntVal->filter =
+ cntVal->pool->add(new IRuntimeFilter(_state,
&_state->get_query_ctx()->obj_pool));
+
+ std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
+ // LOG(INFO) << "entity filter id:" << filter_id;
+ cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc,
query_options);
_filter_map.emplace(filter_id, cntVal);
return Status::OK();
}
@@ -179,26 +225,60 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id, UniqueId frag
for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter)
{
int filter_id = filterid_to_desc.first;
const auto& target_iter =
runtime_filter_params.rid_to_target_param.find(filter_id);
- if (target_iter == runtime_filter_params.rid_to_target_param.end()) {
+ if (target_iter == runtime_filter_params.rid_to_target_param.end() &&
+ !runtime_filter_params.__isset.rid_to_target_paramv2) {
return Status::InternalError("runtime filter params meet error");
+ } else if (target_iter ==
runtime_filter_params.rid_to_target_param.end()) {
+ const auto& targetv2_iter =
runtime_filter_params.rid_to_target_paramv2.find(filter_id);
+ if (targetv2_iter ==
runtime_filter_params.rid_to_target_paramv2.end()) {
+ return Status::InternalError("runtime filter params meet
error");
+ }
+ const auto& build_iter =
+
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
+ return Status::InternalError("runtime filter params meet
error");
+ }
+ _init_with_desc(&filterid_to_desc.second, &query_options,
&targetv2_iter->second,
+ build_iter->second);
+ } else {
+ const auto& build_iter =
+
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
+ return Status::InternalError("runtime filter params meet
error");
+ }
+ _init_with_desc(&filterid_to_desc.second, &query_options,
&target_iter->second,
+ build_iter->second);
}
- const auto& build_iter =
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
- if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
- return Status::InternalError("runtime filter params meet error");
+ }
+ if (runtime_filter_params.__isset.rid_to_runtime_filter) {
+ for (auto& filterid_to_desc :
runtime_filter_params.rid_to_runtime_filter) {
+ int filter_id = filterid_to_desc.first;
+ const auto& target_iter =
runtime_filter_params.rid_to_target_param.find(filter_id);
+ if (target_iter ==
runtime_filter_params.rid_to_target_param.end()) {
+ return Status::InternalError("runtime filter params meet
error");
+ }
+ const auto& build_iter =
+
runtime_filter_params.runtime_filter_builder_num.find(filter_id);
+ if (build_iter ==
runtime_filter_params.runtime_filter_builder_num.end()) {
+ return Status::InternalError("runtime filter params meet
error");
+ }
+ _init_with_desc(&filterid_to_desc.second, &query_options,
&target_iter->second,
+ build_iter->second);
}
- _init_with_desc(&filterid_to_desc.second, &query_options,
&target_iter->second,
- build_iter->second);
}
return Status::OK();
}
// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest*
request,
-
butil::IOBufAsZeroCopyInputStream* attach_data) {
+
butil::IOBufAsZeroCopyInputStream* attach_data,
+ bool opt_remote_rf) {
+ _opt_remote_rf = _opt_remote_rf && opt_remote_rf;
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
{
+ int64_t start_merge = MonotonicMillis();
std::lock_guard<std::mutex> guard(_filter_map_mutex);
auto iter = _filter_map.find(std::to_string(request->filter_id()));
VLOG_ROW << "recv filter id:" << request->filter_id() << " " <<
request->ShortDebugString();
@@ -220,70 +300,143 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
// TODO: avoid log when we had acquired a lock
VLOG_ROW << "merge size:" << merged_size << ":" <<
cntVal->producer_size;
DCHECK_LE(merged_size, cntVal->producer_size);
+ _merge_timer += (MonotonicMillis() - start_merge);
if (merged_size < cntVal->producer_size) {
return Status::OK();
}
}
if (merged_size == cntVal->producer_size) {
- // prepare rpc context
- using PPublishFilterRpcContext =
- async_rpc_context<PPublishFilterRequest,
PPublishFilterResponse>;
- std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts;
- rpc_contexts.reserve(cntVal->target_info.size());
-
- butil::IOBuf request_attachment;
-
- PPublishFilterRequest apply_request;
- // serialize filter
- void* data = nullptr;
- int len = 0;
- bool has_attachment = false;
- RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data,
&len));
- if (data != nullptr && len > 0) {
- request_attachment.append(data, len);
- has_attachment = true;
- }
+ if (opt_remote_rf) {
+ DCHECK_GT(cntVal->targetv2_info.size(), 0);
+ DCHECK(cntVal->filter->is_bloomfilter());
+ // Optimize merging phase iff:
+ // 1. All BE has been upgraded (e.g. _opt_remote_rf)
+ // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0)
+ // 3. This filter is bloom filter (only bloom filter should be
used for merging)
+ using PPublishFilterRpcContext =
+ async_rpc_context<PPublishFilterRequestV2,
PPublishFilterResponse>;
+ std::vector<std::unique_ptr<PPublishFilterRpcContext>>
rpc_contexts;
+ rpc_contexts.reserve(cntVal->targetv2_info.size());
+
+ butil::IOBuf request_attachment;
+
+ PPublishFilterRequestV2 apply_request;
+ // serialize filter
+ void* data = nullptr;
+ int len = 0;
+ bool has_attachment = false;
+ RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data,
&len));
+ if (data != nullptr && len > 0) {
+ request_attachment.append(data, len);
+ has_attachment = true;
+ }
- std::vector<TRuntimeFilterTargetParams>& targets = cntVal->target_info;
- for (size_t i = 0; i < targets.size(); i++) {
- rpc_contexts.emplace_back(new PPublishFilterRpcContext);
- size_t cur = rpc_contexts.size() - 1;
- rpc_contexts[cur]->request = apply_request;
- rpc_contexts[cur]->request.set_filter_id(request->filter_id());
-
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
- request->is_pipeline());
- *rpc_contexts[cur]->request.mutable_query_id() =
request->query_id();
- if (has_attachment) {
-
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+ std::vector<TRuntimeFilterTargetParamsV2>& targets =
cntVal->targetv2_info;
+ for (size_t i = 0; i < targets.size(); i++) {
+ rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+ size_t cur = rpc_contexts.size() - 1;
+ rpc_contexts[cur]->request = apply_request;
+ rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+
request->is_pipeline());
+ rpc_contexts[cur]->request.set_merge_time(_merge_timer);
+ *rpc_contexts[cur]->request.mutable_query_id() =
request->query_id();
+ if (has_attachment) {
+
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+ }
+ rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
+
+ // set fragment-id
+ for (size_t fid = 0; fid <
targets[cur].target_fragment_instance_ids.size();
+ fid++) {
+ PUniqueId* cur_id =
rpc_contexts[cur]->request.add_fragment_instance_ids();
+
cur_id->set_hi(targets[cur].target_fragment_instance_ids[fid].hi);
+
cur_id->set_lo(targets[cur].target_fragment_instance_ids[fid].lo);
+ }
+
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ targets[i].target_fragment_instance_addr));
+ VLOG_NOTICE << "send filter " <<
rpc_contexts[cur]->request.filter_id()
+ << " to:" <<
targets[i].target_fragment_instance_addr.hostname << ":"
+ << targets[i].target_fragment_instance_addr.port
+ << rpc_contexts[cur]->request.ShortDebugString();
+ if (stub == nullptr) {
+ rpc_contexts.pop_back();
+ continue;
+ }
+ stub->apply_filterv2(&rpc_contexts[cur]->cntl,
&rpc_contexts[cur]->request,
+ &rpc_contexts[cur]->response,
brpc::DoNothing());
}
- rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
-
- // set fragment-id
- auto request_fragment_id =
rpc_contexts[cur]->request.mutable_fragment_id();
-
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
-
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
-
- std::shared_ptr<PBackendService_Stub> stub(
-
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
- targets[i].target_fragment_instance_addr));
- VLOG_NOTICE << "send filter " <<
rpc_contexts[cur]->request.filter_id()
- << " to:" <<
targets[i].target_fragment_instance_addr.hostname << ":"
- << targets[i].target_fragment_instance_addr.port
- << rpc_contexts[cur]->request.ShortDebugString();
- if (stub == nullptr) {
- rpc_contexts.pop_back();
- continue;
+ for (auto& rpc_context : rpc_contexts) {
+ brpc::Join(rpc_context->cid);
+ if (rpc_context->cntl.Failed()) {
+ LOG(WARNING) << "runtimefilter rpc err:" <<
rpc_context->cntl.ErrorText();
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ rpc_context->cntl.remote_side());
+ }
}
- stub->apply_filter(&rpc_contexts[cur]->cntl,
&rpc_contexts[cur]->request,
- &rpc_contexts[cur]->response,
brpc::DoNothing());
- }
- for (auto& rpc_context : rpc_contexts) {
- brpc::Join(rpc_context->cid);
- if (rpc_context->cntl.Failed()) {
- LOG(WARNING) << "runtimefilter rpc err:" <<
rpc_context->cntl.ErrorText();
- ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
- rpc_context->cntl.remote_side());
+ } else {
+ // prepare rpc context
+ using PPublishFilterRpcContext =
+ async_rpc_context<PPublishFilterRequest,
PPublishFilterResponse>;
+ std::vector<std::unique_ptr<PPublishFilterRpcContext>>
rpc_contexts;
+ rpc_contexts.reserve(cntVal->target_info.size());
+
+ butil::IOBuf request_attachment;
+
+ PPublishFilterRequest apply_request;
+ // serialize filter
+ void* data = nullptr;
+ int len = 0;
+ bool has_attachment = false;
+ RETURN_IF_ERROR(cntVal->filter->serialize(&apply_request, &data,
&len));
+ if (data != nullptr && len > 0) {
+ request_attachment.append(data, len);
+ has_attachment = true;
+ }
+
+ std::vector<TRuntimeFilterTargetParams>& targets =
cntVal->target_info;
+ for (size_t i = 0; i < targets.size(); i++) {
+ rpc_contexts.emplace_back(new PPublishFilterRpcContext);
+ size_t cur = rpc_contexts.size() - 1;
+ rpc_contexts[cur]->request = apply_request;
+ rpc_contexts[cur]->request.set_filter_id(request->filter_id());
+
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
+
request->is_pipeline());
+ *rpc_contexts[cur]->request.mutable_query_id() =
request->query_id();
+ if (has_attachment) {
+
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
+ }
+ rpc_contexts[cur]->cid = rpc_contexts[cur]->cntl.call_id();
+
+ // set fragment-id
+ auto request_fragment_id =
rpc_contexts[cur]->request.mutable_fragment_id();
+
request_fragment_id->set_hi(targets[cur].target_fragment_instance_id.hi);
+
request_fragment_id->set_lo(targets[cur].target_fragment_instance_id.lo);
+
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ targets[i].target_fragment_instance_addr));
+ VLOG_NOTICE << "send filter " <<
rpc_contexts[cur]->request.filter_id()
+ << " to:" <<
targets[i].target_fragment_instance_addr.hostname << ":"
+ << targets[i].target_fragment_instance_addr.port
+ << rpc_contexts[cur]->request.ShortDebugString();
+ if (stub == nullptr) {
+ rpc_contexts.pop_back();
+ continue;
+ }
+ stub->apply_filter(&rpc_contexts[cur]->cntl,
&rpc_contexts[cur]->request,
+ &rpc_contexts[cur]->response,
brpc::DoNothing());
+ }
+ for (auto& rpc_context : rpc_contexts) {
+ brpc::Join(rpc_context->cid);
+ if (rpc_context->cntl.Failed()) {
+ LOG(WARNING) << "runtimefilter rpc err:" <<
rpc_context->cntl.ErrorText();
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ rpc_context->cntl.remote_side());
+ }
}
}
}
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index 09a760eff8..0a0a70ec7a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -41,11 +41,14 @@ class IOBufAsZeroCopyInputStream;
namespace doris {
class PPublishFilterRequest;
+class PPublishFilterRequestV2;
class PMergeFilterRequest;
class IRuntimeFilter;
class MemTracker;
class RuntimeState;
enum class RuntimeFilterRole;
+class RuntimePredicateWrapper;
+class QueryContext;
/// producer:
/// Filter filter;
@@ -63,6 +66,8 @@ class RuntimeFilterMgr {
public:
RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state);
+ RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx);
+
~RuntimeFilterMgr();
Status init();
@@ -100,12 +105,14 @@ private:
std::map<int32_t, RuntimeFilterMgrVal> _producer_map;
RuntimeState* _state;
+ QueryContext* _query_ctx;
std::unique_ptr<MemTracker> _tracker;
ObjectPool _pool;
TNetworkAddress _merge_addr;
bool _has_merge_addr;
+ std::mutex _lock;
};
// controller -> <query-id, entity>
@@ -123,8 +130,8 @@ public:
const TQueryOptions& query_options);
// handle merge rpc
- Status merge(const PMergeFilterRequest* request,
- butil::IOBufAsZeroCopyInputStream* attach_data);
+ Status merge(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data,
+ bool opt_remote_rf);
UniqueId query_id() const { return _query_id; }
@@ -135,6 +142,7 @@ public:
int producer_size;
TRuntimeFilterDesc runtime_filter_desc;
std::vector<doris::TRuntimeFilterTargetParams> target_info;
+ std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
IRuntimeFilter* filter;
std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
std::shared_ptr<ObjectPool> pool;
@@ -149,6 +157,11 @@ private:
const
std::vector<doris::TRuntimeFilterTargetParams>* target_info,
const int producer_size);
+ Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
+ const TQueryOptions* query_options,
+ const
std::vector<doris::TRuntimeFilterTargetParamsV2>* target_info,
+ const int producer_size);
+
UniqueId _query_id;
UniqueId _fragment_instance_id;
// protect _filter_map
@@ -158,6 +171,8 @@ private:
// filter-id -> val
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
RuntimeState* _state;
+ bool _opt_remote_rf = true;
+ int64_t _merge_timer = 0;
};
// RuntimeFilterMergeController has a map query-id -> entity
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 512438c776..0dbc26d6ac 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -829,6 +829,30 @@ void
PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr
}
}
+void PInternalServiceImpl::apply_filterv2(::google::protobuf::RpcController*
controller,
+ const
::doris::PPublishFilterRequestV2* request,
+ ::doris::PPublishFilterResponse*
response,
+ ::google::protobuf::Closure* done) {
+ bool ret = _light_work_pool.try_offer([this, controller, request,
response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
+ butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+ UniqueId unique_id(request->query_id());
+ VLOG_NOTICE << "rpc apply_filterv2 recv";
+ Status st = _exec_env->fragment_mgr()->apply_filterv2(request,
&zero_copy_input_stream);
+ if (!st.ok()) {
+ LOG(WARNING) << "apply filter meet error: " << st.to_string();
+ }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ LOG(WARNING) << "fail to offer request to the work pool";
+ brpc::ClosureGuard closure_guard(done);
+ response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+ response->mutable_status()->add_error_msgs("fail to offer request to
the work pool");
+ }
+}
+
void PInternalServiceImpl::send_data(google::protobuf::RpcController*
controller,
const PSendDataRequest* request,
PSendDataResult* response,
google::protobuf::Closure* done) {
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 6cfbae4fa7..22042196e2 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -123,6 +123,10 @@ public:
const ::doris::PPublishFilterRequest* request,
::doris::PPublishFilterResponse* response,
::google::protobuf::Closure* done) override;
+ void apply_filterv2(::google::protobuf::RpcController* controller,
+ const ::doris::PPublishFilterRequestV2* request,
+ ::doris::PPublishFilterResponse* response,
+ ::google::protobuf::Closure* done) override;
void transmit_block(::google::protobuf::RpcController* controller,
const ::doris::PTransmitDataParams* request,
::doris::PTransmitDataResult* response,
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index f811775d69..67ad6d29c1 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -428,6 +428,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
ADD_TIMER(_build_phase_profile,
"BuildTableConvertToPartitionedTime");
_build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows",
TUnit::UNIT);
_build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile,
"BuildSideHashComputingTime");
+ _build_runtime_filter_timer = ADD_TIMER(_build_phase_profile,
"BuildRuntimeFilterTime");
// Probe phase
auto probe_phase_profile = runtime_profile()->create_child("ProbePhase",
true, true);
@@ -441,7 +442,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
- _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
+ _push_down_timer = ADD_TIMER(runtime_profile(),
"PublishRuntimeFilterTime");
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
_build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets",
TUnit::UNIT);
_build_buckets_fill_counter = ADD_COUNTER(runtime_profile(),
"FilledBuckets", TUnit::UNIT);
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index eb5bf7d240..eef8b30b90 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -288,6 +288,7 @@ private:
RuntimeProfile::Counter* _probe_side_output_timer;
RuntimeProfile::Counter* _build_side_compute_hash_timer;
RuntimeProfile::Counter* _build_side_merge_block_timer;
+ RuntimeProfile::Counter* _build_runtime_filter_timer;
RuntimeProfile::Counter* _build_blocks_memory_usage;
RuntimeProfile::Counter* _hash_table_memory_usage;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 6ff46427ed..d13d4fc9c8 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -316,10 +316,23 @@ Status VScanNode::_register_runtime_filter() {
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
- RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
- RuntimeFilterRole::CONSUMER, filter_desc,
_state->query_options(), id()));
-
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-
&runtime_filter));
+ if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
+ DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM &&
filter_desc.has_remote_targets);
+ // Optimize merging phase iff:
+ // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
+ // 2. This filter is bloom filter (only bloom filter should be
used for merging)
+
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_filter(
+ RuntimeFilterRole::CONSUMER, filter_desc,
_state->query_options(), id(),
+ false));
+
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
+ filter_desc.filter_id, &runtime_filter));
+ } else {
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
+ RuntimeFilterRole::CONSUMER, filter_desc,
_state->query_options(), id(),
+ false));
+
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
+
&runtime_filter));
+ }
_runtime_filter_ctxs.emplace_back(runtime_filter);
_runtime_filter_ready_flag.emplace_back(false);
}
@@ -345,13 +358,6 @@ Status VScanNode::_acquire_runtime_filter(bool wait) {
std::vector<VExpr*> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
- // If all targets are local, scan node will use hash node's runtime
filter, and we don't
- // need to allocate memory again
- if (runtime_filter->has_remote_target()) {
- if (auto bf = runtime_filter->get_bloomfilter()) {
- RETURN_IF_ERROR(bf->init_with_fixed_length());
- }
- }
bool ready = runtime_filter->is_ready();
if (!ready && wait) {
ready = runtime_filter->await();
@@ -1329,7 +1335,8 @@ Status
VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
++current_arrived_rf_num;
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
-
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs,
_row_descriptor);
+
_runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs,
_row_descriptor,
+
_state);
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 58d0086ed6..fe6180eff1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -190,8 +190,12 @@ public final class RuntimeFilter {
tFilter.setIsBroadcastJoin(isBroadcastJoin);
tFilter.setHasLocalTargets(hasLocalTargets);
tFilter.setHasRemoteTargets(hasRemoteTargets);
+ boolean optRemoteRf = true;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(),
target.expr.treeToThrift());
+ // TODO: now only support SlotRef
+ optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType
== TRuntimeFilterType.BLOOM
+ && target.expr instanceof SlotRef;
}
tFilter.setType(runtimeFilterType);
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
@@ -199,6 +203,7 @@ public final class RuntimeFilter {
tFilter.setBitmapTargetExpr(targets.get(0).expr.treeToThrift());
tFilter.setBitmapFilterNotIn(bitmapFilterNotIn);
}
+ tFilter.setOptRemoteRf(optRemoteRf);
return tFilter;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e60e4122b2..8068c1a998 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -96,7 +96,7 @@ import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterParams;
-import org.apache.doris.thrift.TRuntimeFilterTargetParams;
+import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TScanRangeParams;
@@ -3122,12 +3122,24 @@ public class Coordinator {
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
for (Map.Entry<RuntimeFilterId,
List<FRuntimeFilterTargetParam>> entry
: ridToTargetParam.entrySet()) {
- List<TRuntimeFilterTargetParams> targetParams =
Lists.newArrayList();
+ Map<TNetworkAddress, TRuntimeFilterTargetParamsV2>
targetParams = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam :
entry.getValue()) {
- targetParams.add(new
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
- targetParam.targetFragmentInstanceAddr));
+ if
(targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+
.add(targetParam.targetFragmentInstanceId);
+ } else {
+
targetParams.put(targetParam.targetFragmentInstanceAddr,
+ new TRuntimeFilterTargetParamsV2());
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
+ =
targetParam.targetFragmentInstanceAddr;
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+ = new ArrayList<>();
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+
.add(targetParam.targetFragmentInstanceId);
+ }
}
-
params.params.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(),
targetParams);
+
params.params.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
+ new
ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
}
for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
params.params.runtime_filter_params.putToRuntimeFilterBuilderNum(
@@ -3197,12 +3209,25 @@ public class Coordinator {
if
(instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
for (Map.Entry<RuntimeFilterId,
List<FRuntimeFilterTargetParam>> entry
: ridToTargetParam.entrySet()) {
- List<TRuntimeFilterTargetParams> targetParams =
Lists.newArrayList();
+ Map<TNetworkAddress, TRuntimeFilterTargetParamsV2>
targetParams = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam :
entry.getValue()) {
- targetParams.add(new
TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId,
- targetParam.targetFragmentInstanceAddr));
+ if
(targetParams.containsKey(targetParam.targetFragmentInstanceAddr)) {
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+
.add(targetParam.targetFragmentInstanceId);
+ } else {
+
targetParams.put(targetParam.targetFragmentInstanceAddr,
+ new TRuntimeFilterTargetParamsV2());
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_addr
+ =
targetParam.targetFragmentInstanceAddr;
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+ = new ArrayList<>();
+
targetParams.get(targetParam.targetFragmentInstanceAddr).target_fragment_instance_ids
+
.add(targetParam.targetFragmentInstanceId);
+ }
}
-
localParams.runtime_filter_params.putToRidToTargetParam(entry.getKey().asInt(),
targetParams);
+
+
localParams.runtime_filter_params.putToRidToTargetParamv2(entry.getKey().asInt(),
+ new
ArrayList<TRuntimeFilterTargetParamsV2>(targetParams.values()));
}
for (Map.Entry<RuntimeFilterId, Integer> entry :
ridToBuilderNum.entrySet()) {
localParams.runtime_filter_params.putToRuntimeFilterBuilderNum(
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index d41a670052..f66090a77b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -481,6 +481,7 @@ message PMergeFilterRequest {
optional PBloomFilter bloom_filter = 6;
optional PInFilter in_filter = 7;
optional bool is_pipeline = 8;
+ optional bool opt_remote_rf = 9;
};
message PMergeFilterResponse {
@@ -498,6 +499,18 @@ message PPublishFilterRequest {
optional bool is_pipeline = 8;
};
+message PPublishFilterRequestV2 {
+ required int32 filter_id = 1;
+ required PUniqueId query_id = 2;
+ repeated PUniqueId fragment_instance_ids = 3;
+ required PFilterType filter_type = 4;
+ optional PMinMaxFilter minmax_filter = 5;
+ optional PBloomFilter bloom_filter = 6;
+ optional PInFilter in_filter = 7;
+ optional bool is_pipeline = 8;
+ optional int64 merge_time = 9;
+};
+
message PPublishFilterResponse {
required PStatus status = 1;
};
@@ -639,6 +652,7 @@ service PBackendService {
rpc rollback(PRollbackRequest) returns (PRollbackResult);
rpc merge_filter(PMergeFilterRequest) returns (PMergeFilterResponse);
rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
+ rpc apply_filterv2(PPublishFilterRequestV2) returns
(PPublishFilterResponse);
rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 111718eb1b..aa7ffc24ae 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -237,6 +237,12 @@ struct TRuntimeFilterTargetParams {
2: required Types.TNetworkAddress target_fragment_instance_addr
}
+struct TRuntimeFilterTargetParamsV2 {
+ 1: required list<Types.TUniqueId> target_fragment_instance_ids
+ // The address of the instance where the fragment is expected to run
+ 2: required Types.TNetworkAddress target_fragment_instance_addr
+}
+
struct TRuntimeFilterParams {
// Runtime filter merge instance address
1: optional Types.TNetworkAddress runtime_filter_merge_addr
@@ -250,6 +256,8 @@ struct TRuntimeFilterParams {
// Number of Runtime filter producers
4: optional map<i32, i32> runtime_filter_builder_num
+
+ 5: optional map<i32, list<TRuntimeFilterTargetParamsV2>>
rid_to_target_paramv2
}
// Parameters for a single execution instance of a particular TPlanFragment
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 0eaf1d61a0..3b332744a9 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1001,6 +1001,8 @@ struct TRuntimeFilterDesc {
// for bitmap filter
11: optional bool bitmap_filter_not_in
+
+ 12: optional bool opt_remote_rf;
}
struct TDataGenScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]