This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 90f9ca817aa [Refactor](opt) Opt rf and remove unless code (#30900)
90f9ca817aa is described below
commit 90f9ca817aab540df277c55432b46ad96808c034
Author: HappenLee <[email protected]>
AuthorDate: Fri Feb 16 20:32:37 2024 +0800
[Refactor](opt) Opt rf and remove unless code (#30900)
Opt rf and remove unless code
---
be/src/exprs/runtime_filter.cpp | 62 +++++++++++-----------
be/src/exprs/runtime_filter.h | 18 +++----
be/src/exprs/runtime_filter_slots.h | 60 ++++++++++-----------
be/src/exprs/runtime_filter_slots_cross.h | 13 ++---
be/src/olap/push_handler.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 23 +++-----
be/src/pipeline/exec/hashjoin_build_sink.h | 3 --
be/src/pipeline/exec/join_build_sink_operator.cpp | 3 +-
be/src/pipeline/exec/join_build_sink_operator.h | 5 ++
.../exec/nested_loop_join_build_operator.cpp | 8 +--
.../exec/nested_loop_join_build_operator.h | 2 -
be/src/pipeline/pipeline_fragment_context.cpp | 3 +-
be/src/pipeline/pipeline_fragment_context.h | 4 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 6 +--
be/src/runtime/fold_constant_executor.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 35 +++++++-----
be/src/runtime/plan_fragment_executor.cpp | 5 +-
be/src/runtime/runtime_filter_mgr.cpp | 29 +++-------
be/src/runtime/runtime_filter_mgr.h | 8 +--
be/src/runtime/runtime_state.cpp | 20 ++++---
be/src/runtime/runtime_state.h | 10 ++--
be/src/vec/exec/join/vhash_join_node.cpp | 11 ++--
be/src/vec/exec/join/vhash_join_node.h | 10 +---
be/src/vec/exec/join/vjoin_node_base.cpp | 4 +-
be/src/vec/exec/join/vjoin_node_base.h | 7 ++-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 12 ++---
be/src/vec/exec/join/vnested_loop_join_node.h | 2 -
be/src/vec/exec/runtime_filter_consumer.cpp | 5 +-
be/test/vec/exec/vwal_scanner_test.cpp | 2 -
.../org/apache/doris/planner/RuntimeFilter.java | 19 +------
.../main/java/org/apache/doris/qe/Coordinator.java | 6 +--
31 files changed, 167 insertions(+), 232 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 47157cf74d2..ffb92520be7 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -735,7 +735,9 @@ public:
_is_bloomfilter = true;
// we won't use this class to insert or find any data
// so any type is ok
-
_context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
+
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type ==
INVALID_TYPE
+ ?
PrimitiveType::TYPE_INT
+ :
_column_return_type));
return _context.bloom_filter_func->assign(data,
bloom_filter->filter_length());
}
@@ -950,12 +952,6 @@ vectorized::SharedRuntimeFilterContext&
IRuntimeFilter::get_shared_context_ref()
return _wrapper->_context;
}
-void IRuntimeFilter::copy_from_other(IRuntimeFilter* other) {
- _wrapper->_filter_type = other->_wrapper->_filter_type;
- _wrapper->_is_bloomfilter = other->is_bloomfilter();
- _wrapper->_context = other->_wrapper->_context;
-}
-
void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t
start) {
DCHECK(is_producer());
_wrapper->insert_batch(column, start);
@@ -988,7 +984,6 @@ Status IRuntimeFilter::publish(bool publish_local) {
filter->signal();
}
}
- return Status::OK();
} else if (_has_local_target) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id,
filters));
@@ -1112,6 +1107,10 @@ bool IRuntimeFilter::is_ready_or_timeout() {
}
}
+PrimitiveType IRuntimeFilter::column_type() const {
+ return _wrapper->column_type();
+}
+
void IRuntimeFilter::signal() {
DCHECK(is_consumer());
if (_enable_pipeline_exec) {
@@ -1249,16 +1248,12 @@ Status
IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
}
Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state,
- const UpdateRuntimeFilterParamsV2*
param, ObjectPool* pool,
-
std::unique_ptr<RuntimePredicateWrapper>* wrapper) {
- int filter_type = param->request->filter_type();
- PrimitiveType column_type = PrimitiveType::INVALID_TYPE;
- if (param->request->has_in_filter()) {
- column_type =
to_primitive_type(param->request->in_filter().column_type());
- }
- wrapper->reset(new RuntimePredicateWrapper(state, pool, column_type,
get_type(filter_type),
- param->request->filter_id()));
-
+ const UpdateRuntimeFilterParamsV2* param,
+ RuntimePredicateWrapper** wrapper) {
+ auto filter_type = param->request->filter_type();
+ PrimitiveType column_type = param->column_type;
+ *wrapper = param->pool->add(new RuntimePredicateWrapper(
+ state, param->pool, column_type, get_type(filter_type),
param->request->filter_id()));
switch (filter_type) {
case PFilterType::IN_FILTER: {
DCHECK(param->request->has_in_filter());
@@ -1650,8 +1645,9 @@ bool IRuntimeFilter::is_bloomfilter() {
return _wrapper->is_bloomfilter();
}
-template <typename T>
-Status IRuntimeFilter::_update_filter(const T* param) {
+Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
+ _profile->add_info_string("MergeTime",
std::to_string(param->request->merge_time()) + " ms");
+
if (param->request->has_in_filter() &&
param->request->in_filter().has_ignored_msg()) {
const PInFilter in_filter = param->request->in_filter();
set_ignored(in_filter.ignored_msg());
@@ -1665,19 +1661,25 @@ Status IRuntimeFilter::_update_filter(const T* param) {
}
this->signal();
- _profile->add_info_string("MergeTime",
std::to_string(param->request->merge_time()) + " ms");
return Status::OK();
}
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParams* param) {
- return _update_filter(param);
-}
-
-Status IRuntimeFilter::update_filter(const UpdateRuntimeFilterParamsV2* param,
- int64_t start_apply) {
+void IRuntimeFilter::update_filter(RuntimePredicateWrapper* wrapper, int64_t
merge_time,
+ int64_t start_apply) {
_profile->add_info_string("UpdateTime",
std::to_string(MonotonicMillis() - start_apply)
+ " ms");
- return _update_filter(param);
+ _profile->add_info_string("MergeTime", std::to_string(merge_time) + " ms");
+ // prevent apply filter to not have right column_return_type remove
+ // the code in the future
+ if (_wrapper->column_type() != wrapper->column_type()) {
+ wrapper->_column_return_type = _wrapper->_column_return_type;
+ }
+ auto origin_type = _wrapper->get_real_type();
+ _wrapper = wrapper;
+ if (origin_type != _wrapper->get_real_type()) {
+ update_runtime_filter_type_to_profile();
+ }
+ this->signal();
}
Status
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContextSPtr>&
probe_ctxs,
@@ -1691,8 +1693,8 @@ Status
RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex
(is_string_type(probe_ctx->root()->type().type) &&
is_string_type(_column_return_type)) ||
_filter_type == RuntimeFilterType::BITMAP_FILTER)
- << " prob_expr->root()->type().type: " <<
probe_ctx->root()->type().type
- << " _column_return_type: " << _column_return_type
+ << " prob_expr->root()->type().type: " <<
int(probe_ctx->root()->type().type)
+ << " _column_return_type: " << int(_column_return_type)
<< " _filter_type: " << IRuntimeFilter::to_string(_filter_type);
auto real_filter_type = get_real_type();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 6d69302c2ea..8fcc4cee17f 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -157,13 +157,10 @@ struct UpdateRuntimeFilterParams {
};
struct UpdateRuntimeFilterParamsV2 {
- UpdateRuntimeFilterParamsV2(const PPublishFilterRequestV2* req,
- butil::IOBufAsZeroCopyInputStream* data_stream,
- ObjectPool* obj_pool)
- : request(req), data(data_stream), pool(obj_pool) {}
const PPublishFilterRequestV2* request;
butil::IOBufAsZeroCopyInputStream* data;
ObjectPool* pool = nullptr;
+ PrimitiveType column_type = INVALID_TYPE;
};
struct MergeRuntimeFilterParams {
@@ -220,8 +217,6 @@ public:
vectorized::SharedRuntimeFilterContext& get_shared_context_ref();
- void copy_from_other(IRuntimeFilter* other);
-
// insert data to build filter
void insert_batch(vectorized::ColumnPtr column, size_t start);
@@ -231,6 +226,8 @@ public:
RuntimeFilterType type() const { return _runtime_filter_type; }
+ PrimitiveType column_type() const;
+
Status get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr>&
probe_ctxs,
std::vector<vectorized::VExprSPtr>& push_exprs,
bool is_late_arrival);
@@ -276,20 +273,21 @@ public:
Status merge_from(const RuntimePredicateWrapper* wrapper);
- // for ut
static Status create_wrapper(RuntimeFilterParamsContext* state,
const MergeRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
static Status create_wrapper(RuntimeFilterParamsContext* state,
const UpdateRuntimeFilterParams* param,
ObjectPool* pool,
std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
+
static Status create_wrapper(RuntimeFilterParamsContext* state,
- const UpdateRuntimeFilterParamsV2* param,
ObjectPool* pool,
- std::unique_ptr<RuntimePredicateWrapper>*
wrapper);
+ const UpdateRuntimeFilterParamsV2* param,
+ RuntimePredicateWrapper** wrapper);
void change_to_bloom_filter();
Status init_bloom_filter(const size_t build_bf_cardinality);
Status update_filter(const UpdateRuntimeFilterParams* param);
- Status update_filter(const UpdateRuntimeFilterParamsV2* param, int64_t
start_apply);
+ void update_filter(RuntimePredicateWrapper* filter_wrapper, int64_t
merge_time,
+ int64_t start_apply);
void set_ignored(const std::string& msg);
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 3dcf84ace08..7a738b8c06d 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -34,9 +34,9 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
build_expr_ctxs,
- const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool
is_global = false)
+ const std::vector<IRuntimeFilter*>& runtime_filters, bool
is_global = false)
: _build_expr_context(build_expr_ctxs),
- _runtime_filter_descs(runtime_filter_descs),
+ _runtime_filters(runtime_filters),
_is_global(is_global) {}
Status init(RuntimeState* state, int64_t hash_table_size) {
@@ -75,33 +75,28 @@ public:
// ordered vector: IN, IN_OR_BLOOM, others.
// so we can ignore other filter if IN Predicate exists.
- std::vector<TRuntimeFilterDesc>
sorted_runtime_filter_descs(_runtime_filter_descs);
- auto compare_desc = [](TRuntimeFilterDesc& d1, TRuntimeFilterDesc& d2)
{
- if (d1.type == d2.type) {
+ auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
+ if (d1->type() == d2->type()) {
return false;
- } else if (d1.type == TRuntimeFilterType::IN) {
+ } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
return true;
- } else if (d2.type == TRuntimeFilterType::IN) {
+ } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
return false;
- } else if (d1.type == TRuntimeFilterType::IN_OR_BLOOM) {
+ } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
return true;
- } else if (d2.type == TRuntimeFilterType::IN_OR_BLOOM) {
+ } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
return false;
} else {
- return d1.type < d2.type;
+ return d1->type() < d2->type();
}
};
- std::sort(sorted_runtime_filter_descs.begin(),
sorted_runtime_filter_descs.end(),
- compare_desc);
+ std::sort(_runtime_filters.begin(), _runtime_filters.end(),
compare_desc);
// do not create 'in filter' when hash_table size over limit
const auto max_in_num = state->runtime_filter_max_in_num();
const bool over_max_in_num = (hash_table_size >= max_in_num);
- for (auto& filter_desc : sorted_runtime_filter_descs) {
- IRuntimeFilter* runtime_filter = nullptr;
-
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
-
&runtime_filter));
+ for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter->expr_order() < 0 ||
runtime_filter->expr_order() >= _build_expr_context.size()) {
return Status::InternalError(
@@ -133,10 +128,10 @@ public:
bool exists_in_filter =
has_in_filter[runtime_filter->expr_order()];
if (is_in_filter && over_max_in_num) {
VLOG_DEBUG << "fragment instance " <<
print_id(state->fragment_instance_id())
- << " ignore runtime filter(in filter id " <<
filter_desc.filter_id
- << ") because: in_num(" << hash_table_size <<
") >= max_in_num("
- << max_in_num << ")";
-
RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id));
+ << " ignore runtime filter(in filter id "
+ << runtime_filter->filter_id() << ") because:
in_num("
+ << hash_table_size << ") >= max_in_num(" <<
max_in_num << ")";
+
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
continue;
} else if (!is_in_filter && exists_in_filter) {
// do not create 'bloom filter' and 'minmax filter' when
'in filter' has created
@@ -144,15 +139,16 @@ public:
VLOG_DEBUG << "fragment instance " <<
print_id(state->fragment_instance_id())
<< " ignore runtime filter("
<<
IRuntimeFilter::to_string(runtime_filter->type()) << " id "
- << filter_desc.filter_id << ") because: already
exists in filter";
-
RETURN_IF_ERROR(ignore_local_filter(filter_desc.filter_id));
+ << runtime_filter->filter_id()
+ << ") because: already exists in filter";
+
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
continue;
}
} else if (is_in_filter && over_max_in_num) {
std::string msg = fmt::format(
"fragment instance {} ignore runtime filter(in filter
id {}) because: "
"in_num({}) >= max_in_num({})",
- print_id(state->fragment_instance_id()),
filter_desc.filter_id,
+ print_id(state->fragment_instance_id()),
runtime_filter->filter_id(),
hash_table_size, max_in_num);
RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
continue;
@@ -163,7 +159,7 @@ public:
!over_max_in_num)) {
has_in_filter[runtime_filter->expr_order()] = true;
}
-
_runtime_filters[runtime_filter->expr_order()].push_back(runtime_filter);
+
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
}
return Status::OK();
@@ -171,8 +167,8 @@ public:
void insert(const vectorized::Block* block) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
- auto iter = _runtime_filters.find(i);
- if (iter == _runtime_filters.end()) {
+ auto iter = _runtime_filters_map.find(i);
+ if (iter == _runtime_filters_map.end()) {
continue;
}
@@ -186,7 +182,7 @@ public:
// publish runtime filter
Status publish(bool publish_local = false) {
- for (auto& pair : _runtime_filters) {
+ for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
}
@@ -195,7 +191,7 @@ public:
}
void copy_to_shared_context(vectorized::SharedHashTableContextPtr&
context) {
- for (auto& it : _runtime_filters) {
+ for (auto& it : _runtime_filters_map) {
for (auto& filter : it.second) {
context->runtime_filters[filter->filter_id()] =
filter->get_shared_context_ref();
}
@@ -203,7 +199,7 @@ public:
}
Status copy_from_shared_context(vectorized::SharedHashTableContextPtr&
context) {
- for (auto& it : _runtime_filters) {
+ for (auto& it : _runtime_filters_map) {
for (auto& filter : it.second) {
auto filter_id = filter->filter_id();
auto ret = context->runtime_filters.find(filter_id);
@@ -216,14 +212,14 @@ public:
return Status::OK();
}
- bool empty() { return _runtime_filters.empty(); }
+ bool empty() { return _runtime_filters_map.empty(); }
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
_build_expr_context;
- const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+ std::vector<IRuntimeFilter*> _runtime_filters;
const bool _is_global = false;
// prob_contition index -> [IRuntimeFilter]
- std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
+ std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
};
} // namespace doris
diff --git a/be/src/exprs/runtime_filter_slots_cross.h
b/be/src/exprs/runtime_filter_slots_cross.h
index 7b1a2063d15..1d496ddf557 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -34,17 +34,14 @@ namespace doris {
// this class used in cross join node
class VRuntimeFilterSlotsCross {
public:
- VRuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>&
runtime_filter_descs,
+ VRuntimeFilterSlotsCross(const std::vector<IRuntimeFilter*>&
runtime_filters,
const vectorized::VExprContextSPtrs&
src_expr_ctxs)
- : _runtime_filter_descs(runtime_filter_descs),
filter_src_expr_ctxs(src_expr_ctxs) {}
+ : _runtime_filters(runtime_filters),
filter_src_expr_ctxs(src_expr_ctxs) {}
~VRuntimeFilterSlotsCross() = default;
Status init(RuntimeState* state) {
- for (auto& filter_desc : _runtime_filter_descs) {
- IRuntimeFilter* runtime_filter = nullptr;
-
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
-
&runtime_filter));
+ for (auto* runtime_filter : _runtime_filters) {
if (runtime_filter == nullptr) {
return Status::InternalError("runtime filter is nullptr");
}
@@ -53,7 +50,6 @@ public:
runtime_filter->has_remote_target()) {
return Status::InternalError("cross join runtime filter has
remote target");
}
- _runtime_filters.push_back(runtime_filter);
}
return Status::OK();
}
@@ -85,9 +81,8 @@ public:
bool empty() const { return _runtime_filters.empty(); }
private:
- const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
+ const std::vector<IRuntimeFilter*>& _runtime_filters;
const vectorized::VExprContextSPtrs filter_src_expr_ctxs;
- std::vector<IRuntimeFilter*> _runtime_filters;
};
} // namespace doris
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 6c1e50be26f..3f489fc457c 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -371,7 +371,7 @@ Status PushBrokerReader::init() {
TQueryOptions query_options;
TQueryGlobals query_globals;
_runtime_state = RuntimeState::create_unique(params, query_options,
query_globals,
- ExecEnv::GetInstance());
+ ExecEnv::GetInstance(),
nullptr);
DescriptorTbl* desc_tbl = nullptr;
Status status = DescriptorTbl::create(_runtime_state->obj_pool(),
_t_desc_tbl, &desc_tbl);
if (UNLIKELY(!status.ok())) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 4597bf9876e..19de8fb7bdc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -97,14 +97,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
// Hash Table Init
_hash_table_init(state);
-
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- p._runtime_filter_descs[i], state->query_options(),
_build_expr_ctxs.size() == 1,
- p._use_global_rf, p._child_x->parallel_tasks()));
- RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
- p._runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
+ p._runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i],
+ _build_expr_ctxs.size() == 1, p._use_global_rf,
p._child_x->parallel_tasks()));
}
return Status::OK();
@@ -121,10 +118,6 @@ bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
-std::vector<TRuntimeFilterDesc>&
HashJoinBuildSinkLocalState::runtime_filter_descs() const {
- return _parent->cast<HashJoinBuildSinkOperatorX>()._runtime_filter_descs;
-}
-
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->short_circuit_for_probe =
@@ -386,9 +379,7 @@
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope
_partition_exprs(tnode.__isset.distribute_expr_lists &&
!_is_broadcast_join
? tnode.distribute_expr_lists[1]
: std::vector<TExpr> {}),
- _use_global_rf(use_global_rf) {
- _runtime_filter_descs = tnode.runtime_filters;
-}
+ _use_global_rf(use_global_rf) {}
Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
if (_is_broadcast_join) {
@@ -548,13 +539,13 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
- if (_runtime_filter_descs.empty()) {
+ if (local_state._runtime_filters.empty()) {
return Status::OK();
}
local_state._runtime_filter_slots =
-
std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs,
-
_runtime_filter_descs,
-
use_global_rf);
+ std::make_shared<VRuntimeFilterSlots>(
+ _build_expr_ctxs,
local_state._runtime_filters,
+ use_global_rf);
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
state, arg.hash_table->size()));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index c3d6038b3eb..56a651e4210 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,7 +70,6 @@ public:
void init_short_circuit_for_probe();
bool build_unique() const;
- std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
void add_hash_buckets_info(const std::string& info) const {
@@ -101,7 +100,6 @@ protected:
// build expr
vectorized::VExprContextSPtrs _build_expr_ctxs;
- std::vector<IRuntimeFilter*> _runtime_filters;
bool _should_build_hash_table = true;
int64_t _build_side_mem_used = 0;
int64_t _build_side_last_mem_used = 0;
@@ -190,7 +188,6 @@ private:
std::shared_ptr<vectorized::SharedHashTableController>
_shared_hashtable_controller;
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
const std::vector<TExpr> _partition_exprs;
const bool _use_global_rf;
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 6b930ff8a5e..1141acc650d 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -78,7 +78,8 @@
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
: tnode.hash_join_node.__isset.is_mark ?
tnode.hash_join_node.is_mark
: false),
_short_circuit_for_null_in_build_side(_join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
- !_is_mark_join) {
+ !_is_mark_join),
+ _runtime_filter_descs(tnode.runtime_filters) {
_init_join_op();
if (_is_mark_join) {
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h
b/be/src/pipeline/exec/join_build_sink_operator.h
index f7c1415d371..9cf5be80f80 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -33,6 +33,8 @@ class JoinBuildSinkLocalState : public
PipelineXSinkLocalState<DependencyType> {
public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ const std::vector<IRuntimeFilter*>& runtime_filters() const { return
_runtime_filters; }
+
protected:
JoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<DependencyType>(parent, state) {}
@@ -43,6 +45,7 @@ protected:
RuntimeProfile::Counter* _build_rows_counter = nullptr;
RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr;
RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
+ std::vector<IRuntimeFilter*> _runtime_filters;
};
template <typename LocalStateType>
@@ -75,6 +78,8 @@ protected:
// 2. In build phase, we stop materialize build side when we meet the
first null value and set _has_null_in_build_side to true.
// 3. In probe phase, if _has_null_in_build_side is true, join node
returns empty block directly. Otherwise, probing will continue as the same as
generic left anti join.
const bool _short_circuit_for_null_in_build_side;
+
+ const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index aec93c66b62..c30bb5ad67c 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -40,24 +40,20 @@ Status
NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
}
+ _runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- p._runtime_filter_descs[i], state->query_options()));
+ p._runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i]));
}
return Status::OK();
}
-const std::vector<TRuntimeFilterDesc>&
NestedLoopJoinBuildSinkLocalState::runtime_filter_descs() {
- return
_parent->cast<NestedLoopJoinBuildSinkOperatorX>()._runtime_filter_descs;
-}
-
NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool*
pool,
int
operator_id,
const
TPlanNode& tnode,
const
DescriptorTbl& descs)
: JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>(pool,
operator_id, tnode,
descs),
- _runtime_filter_descs(tnode.runtime_filters),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only
&&
tnode.nested_loop_join_node.is_output_left_side_only),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 27f847cc00f..e89d6a9d0be 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -65,7 +65,6 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- const std::vector<TRuntimeFilterDesc>& runtime_filter_descs();
vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return
_filter_src_expr_ctxs; }
RuntimeProfile::Counter* runtime_filter_compute_timer() {
return _runtime_filter_compute_timer;
@@ -115,7 +114,6 @@ private:
vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
- const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
const bool _is_output_left_side_only;
RowDescriptor _row_descriptor;
};
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 1fb50c9ea06..6c183a96f64 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -229,13 +229,12 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
// 1. init _runtime_state
_runtime_state = RuntimeState::create_unique(
local_params.fragment_instance_id, request.query_id,
request.fragment_id,
- request.query_options, _query_ctx->query_globals, _exec_env);
+ request.query_options, _query_ctx->query_globals, _exec_env,
_query_ctx.get());
if (local_params.__isset.runtime_filter_params) {
_runtime_state->set_runtime_filter_params(local_params.runtime_filter_params);
}
_runtime_state->set_task_execution_context(shared_from_this());
- _runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
// TODO should be combine with plan_fragment_executor.prepare funciton
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index b8a3bf7e922..b247d0ce83f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -75,9 +75,7 @@ public:
TUniqueId get_fragment_instance_id() const { return _fragment_instance_id;
}
- RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) {
- return _runtime_state.get();
- }
+ RuntimeState* get_runtime_state() { return _runtime_state.get(); }
virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId
/*fragment_instance_id*/) {
return _runtime_state->runtime_filter_mgr();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 13336ea7ea7..67c5da97063 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -182,8 +182,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
// 1. Set up the global runtime state.
_runtime_state = RuntimeState::create_unique(request.query_id,
request.fragment_id,
request.query_options,
_query_ctx->query_globals,
- _exec_env);
- _runtime_state->set_query_ctx(_query_ctx.get());
+ _exec_env, _query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
SCOPED_ATTACH_TASK(_runtime_state.get());
@@ -479,7 +478,6 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>&
runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
- runtime_state->set_query_ctx(_query_ctx.get());
runtime_state->set_task_execution_context(shared_from_this());
runtime_state->set_be_number(local_params.backend_num);
@@ -566,7 +564,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_task_runtime_states.push_back(RuntimeState::create_unique(
this, local_params.fragment_instance_id,
request.query_id,
request.fragment_id, request.query_options,
_query_ctx->query_globals,
- _exec_env));
+ _exec_env, _query_ctx.get()));
auto& task_runtime_state = _task_runtime_states.back();
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
diff --git a/be/src/runtime/fold_constant_executor.cpp
b/be/src/runtime/fold_constant_executor.cpp
index 33fb034b60a..de1fd046882 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -136,7 +136,7 @@ Status FoldConstantExecutor::_init(const TQueryGlobals&
query_globals,
fragment_params.params = params;
fragment_params.protocol_version = PaloInternalServiceVersion::V1;
_runtime_state = RuntimeState::create_unique(fragment_params.params,
query_options,
- query_globals,
ExecEnv::GetInstance());
+ query_globals,
ExecEnv::GetInstance(), nullptr);
DescriptorTbl* desc_tbl = nullptr;
Status status =
DescriptorTbl::create(_runtime_state->obj_pool(),
TDescriptorTable(), &desc_tbl);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c3d2d20e1c6..bac465c1fe8 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -807,7 +807,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
static_cast<void>(_runtimefilter_controller.add_entity(
params.local_params[i], params.query_id,
params.query_options, &handler,
-
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
+
RuntimeFilterParamsContext::create(context->get_runtime_state())));
context->set_merge_controller_handler(handler);
const TUniqueId& fragment_instance_id =
params.local_params[i].fragment_instance_id;
{
@@ -887,7 +887,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
static_cast<void>(_runtimefilter_controller.add_entity(
local_params, params.query_id, params.query_options,
&handler,
-
RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId()))));
+
RuntimeFilterParamsContext::create(context->get_runtime_state())));
context->set_merge_controller_handler(handler);
{
@@ -1366,20 +1366,29 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
pool = &fragment_executor->get_query_ctx()->obj_pool;
}
- UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
- int filter_id = request->filter_id();
+ // 1. get the target filters
std::vector<IRuntimeFilter*> filters;
- RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id,
filters));
+
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(),
filters));
- IRuntimeFilter* first_filter = nullptr;
- for (auto filter : filters) {
- if (!first_filter) {
- RETURN_IF_ERROR(filter->update_filter(¶ms, start_apply));
- first_filter = filter;
- } else {
- filter->copy_from_other(first_filter);
+ // 2. create the filter wrapper to replace or ignore the target filters
+ if (request->has_in_filter() &&
request->in_filter().has_ignored_msg()) {
+ const auto& in_filter = request->in_filter();
+
+ std::ranges::for_each(filters, [&in_filter](auto& filter) {
+ filter->set_ignored(in_filter.ignored_msg());
filter->signal();
- }
+ });
+ } else if (!filters.empty()) {
+ UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
+ filters[0]->column_type()};
+ RuntimePredicateWrapper* filter_wrapper = nullptr;
+ RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(
+ runtime_filter_mgr->get_runtime_filter_context_state(),
¶ms,
+ &filter_wrapper));
+
+ std::ranges::for_each(filters, [&](auto& filter) {
+ filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
+ });
}
}
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index cbeb38204b4..9040fa12040 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -126,9 +126,8 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
// VLOG_CRITICAL << "request:\n" <<
apache::thrift::ThriftDebugString(request);
const TQueryGlobals& query_globals = _query_ctx->query_globals;
- _runtime_state =
- RuntimeState::create_unique(params, request.query_options,
query_globals, _exec_env);
- _runtime_state->set_query_ctx(_query_ctx.get());
+ _runtime_state = RuntimeState::create_unique(params,
request.query_options, query_globals,
+ _exec_env, _query_ctx.get());
_runtime_state->set_task_execution_context(shared_from_this());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 8281ae2ea6d..155bb430ecb 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -58,17 +58,6 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id,
RuntimeFilterParams
ExecEnv::GetInstance()->experimental_mem_tracker());
}
-Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
IRuntimeFilter** target) {
- std::lock_guard<std::mutex> l(_lock);
- auto iter = _producer_map.find(filter_id);
- if (iter == _producer_map.end()) {
- return Status::InvalidArgument("unknown filter: {}, role: PRODUCER",
filter_id);
- }
-
- *target = iter->second;
- return Status::OK();
-}
-
Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
std::vector<IRuntimeFilter*>&
consumer_filters) {
std::lock_guard<std::mutex> l(_lock);
@@ -101,10 +90,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
}
// TODO: union the remote opt and global two case as one case to one judge
- bool remote_opt_or_global =
- (desc.__isset.opt_remote_rf && desc.opt_remote_rf &&
desc.has_remote_targets &&
- desc.type == TRuntimeFilterType::BLOOM) ||
- is_global;
+ bool remote_opt_or_global = (desc.__isset.opt_remote_rf &&
desc.opt_remote_rf) || is_global;
if (!has_exist) {
IRuntimeFilter* filter;
@@ -122,6 +108,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc&
desc,
const TQueryOptions& options,
+ IRuntimeFilter**
producer_filter,
bool build_bf_exactly, bool
is_global,
int parallel_tasks) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
@@ -133,11 +120,10 @@ Status RuntimeFilterMgr::register_producer_filter(const
TRuntimeFilterDesc& desc
if (iter != _producer_map.end()) {
return Status::InvalidArgument("filter has registed");
}
- IRuntimeFilter* filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options,
- RuntimeFilterRole::PRODUCER, -1,
&filter,
+ RuntimeFilterRole::PRODUCER, -1,
producer_filter,
build_bf_exactly, is_global,
parallel_tasks));
- _producer_map.emplace(key, filter);
+ _producer_map.emplace(key, *producer_filter);
return Status::OK();
}
@@ -196,7 +182,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions*
query_options,
const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
const int producer_size) {
- std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
std::shared_ptr<RuntimeFilterCntlVal> cnt_val =
std::make_shared<RuntimeFilterCntlVal>();
// runtime_filter_desc and target will be released,
// so we need to copy to cnt_val
@@ -206,9 +191,10 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->pool.reset(new ObjectPool());
cnt_val->filter = cnt_val->pool->add(
new IRuntimeFilter(_state, &_state->get_query_ctx()->obj_pool,
runtime_filter_desc));
-
auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc,
query_options));
+
+ std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
_filter_map.emplace(filter_id, CntlValwithLock {cnt_val,
std::make_unique<std::mutex>()});
return Status::OK();
}
@@ -303,7 +289,6 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
if (merged_size == cnt_val->producer_size) {
if (opt_remote_rf) {
DCHECK_GT(cnt_val->targetv2_info.size(), 0);
- DCHECK(cnt_val->filter->is_bloomfilter());
// Optimize merging phase iff:
// 1. All BE has been upgraded (e.g. _opt_remote_rf)
// 2. FE has been upgraded (e.g. cnt_val->targetv2_info.size() > 0)
@@ -492,8 +477,6 @@ RuntimeFilterParamsContext*
RuntimeFilterParamsContext::create(QueryContext* que
params->query_id.set_hi(query_ctx->query_id().hi);
params->query_id.set_lo(query_ctx->query_id().lo);
- // params->fragment_instance_id.set_hi(state->fragment_instance_id().hi);
- // params->fragment_instance_id.set_lo(state->fragment_instance_id().lo);
params->be_exec_version = query_ctx->be_exec_version();
params->query_ctx = query_ctx;
params->_obj_pool = &query_ctx->obj_pool;
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index de55e34fc1e..31552ba6230 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -73,15 +73,13 @@ public:
Status get_consume_filters(const int filter_id,
std::vector<IRuntimeFilter*>& consumer_filters);
- Status get_producer_filter(const int filter_id, IRuntimeFilter**
producer_filter);
-
// register filter
Status register_consumer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
int node_id, IRuntimeFilter**
consumer_filter,
bool build_bf_exactly = false, bool
is_global = false);
Status register_producer_filter(const TRuntimeFilterDesc& desc, const
TQueryOptions& options,
- bool build_bf_exactly = false, bool
is_global = false,
- int parallel_tasks = 0);
+ IRuntimeFilter** producer_filter, bool
build_bf_exactly = false,
+ bool is_global = false, int parallel_tasks
= 0);
// update filter by remote
Status update_filter(const PPublishFilterRequest* request,
@@ -91,6 +89,8 @@ public:
Status get_merge_addr(TNetworkAddress* addr);
+ RuntimeFilterParamsContext* get_runtime_filter_context_state() const {
return _state; }
+
private:
struct ConsumerFilterHolder {
int node_id;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index d84b86f2290..d25d914147b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -76,7 +76,7 @@ RuntimeState::RuntimeState(const TUniqueId&
fragment_instance_id,
RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
const TQueryOptions& query_options, const
TQueryGlobals& query_globals,
- ExecEnv* exec_env)
+ ExecEnv* exec_env, QueryContext* ctx)
: _profile("Fragment " +
print_id(fragment_exec_params.fragment_instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
@@ -93,7 +93,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
- _error_log_file(nullptr) {
+ _error_log_file(nullptr),
+ _query_ctx(ctx) {
Status status =
init(fragment_exec_params.fragment_instance_id, query_options,
query_globals, exec_env);
DCHECK(status.ok());
@@ -106,7 +107,7 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams&
fragment_exec_params,
RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId&
query_id,
int32_t fragment_id, const TQueryOptions&
query_options,
- const TQueryGlobals& query_globals, ExecEnv*
exec_env)
+ const TQueryGlobals& query_globals, ExecEnv*
exec_env, QueryContext* ctx)
: _profile("Fragment " + print_id(instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
@@ -125,7 +126,8 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id,
const TUniqueId& query_
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
- _error_log_file(nullptr) {
+ _error_log_file(nullptr),
+ _query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options,
query_globals, exec_env);
DCHECK(status.ok());
_runtime_filter_mgr.reset(
@@ -135,7 +137,7 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id,
const TUniqueId& query_
RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const
TUniqueId& instance_id,
const TUniqueId& query_id, int32_t fragment_id,
const TQueryOptions& query_options, const
TQueryGlobals& query_globals,
- ExecEnv* exec_env)
+ ExecEnv* exec_env, QueryContext* ctx)
: _profile("Fragment " + print_id(instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
@@ -155,14 +157,15 @@
RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId&
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
- _error_log_file(nullptr) {
+ _error_log_file(nullptr),
+ _query_ctx(ctx) {
[[maybe_unused]] auto status = init(instance_id, query_options,
query_globals, exec_env);
DCHECK(status.ok());
}
RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
const TQueryOptions& query_options, const
TQueryGlobals& query_globals,
- ExecEnv* exec_env)
+ ExecEnv* exec_env, QueryContext* ctx)
: _profile("PipelineX " + std::to_string(fragment_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
@@ -181,7 +184,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id,
int32_t fragment_id,
_num_finished_scan_range(0),
_normal_row_number(0),
_error_row_number(0),
- _error_log_file(nullptr) {
+ _error_log_file(nullptr),
+ _query_ctx(ctx) {
// TODO: do we really need instance id?
Status status = init(TUniqueId(), query_options, query_globals, exec_env);
DCHECK(status.ok());
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 38053d3cb68..cdc7e83f042 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -70,20 +70,20 @@ public:
RuntimeState(const TPlanFragmentExecParams& fragment_exec_params,
const TQueryOptions& query_options, const TQueryGlobals&
query_globals,
- ExecEnv* exec_env);
+ ExecEnv* exec_env, QueryContext* ctx);
RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id,
int32 fragment_id,
const TQueryOptions& query_options, const TQueryGlobals&
query_globals,
- ExecEnv* exec_env);
+ ExecEnv* exec_env, QueryContext* ctx);
// for only use in pipelineX
RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId&
instance_id,
const TUniqueId& query_id, int32 fragment_id, const
TQueryOptions& query_options,
- const TQueryGlobals& query_globals, ExecEnv* exec_env);
+ const TQueryGlobals& query_globals, ExecEnv* exec_env,
QueryContext* ctx);
// Used by pipelineX. This runtime state is only used for setup.
RuntimeState(const TUniqueId& query_id, int32 fragment_id, const
TQueryOptions& query_options,
- const TQueryGlobals& query_globals, ExecEnv* exec_env);
+ const TQueryGlobals& query_globals, ExecEnv* exec_env,
QueryContext* ctx);
// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryGlobals& query_globals);
@@ -466,8 +466,6 @@ public:
_pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr;
}
- void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; }
-
QueryContext* get_query_ctx() { return _query_ctx; }
void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
tracker) {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 3ca828a7f35..67dffa4b203 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -92,7 +92,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode&
tnode, const Descr
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
?
tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}) {
- _runtime_filter_descs = tnode.runtime_filters;
_arena = std::make_shared<Arena>();
_hash_table_variants = std::make_shared<HashTableVariants>();
_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
@@ -180,12 +179,10 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
}
#endif
- _runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- _runtime_filter_descs[i], state->query_options(),
_probe_expr_ctxs.size() == 1));
- RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
- _runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
+ _runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i],
+ _probe_expr_ctxs.size() == 1));
}
// init left/right output slots flags, only column of slot_id in
_hash_output_slot_ids need
@@ -795,11 +792,11 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
__builtin_unreachable();
},
[&](auto&& arg) -> Status {
- if (_runtime_filter_descs.empty()) {
+ if (_runtime_filters.empty()) {
return Status::OK();
}
_runtime_filter_slots =
std::make_shared<VRuntimeFilterSlots>(
- _build_expr_ctxs,
_runtime_filter_descs);
+ _build_expr_ctxs, _runtime_filters);
RETURN_IF_ERROR(_runtime_filter_slots->init(
state, arg.hash_table->size()));
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index a017633e5ce..58451c360e6 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -53,9 +53,7 @@ template <typename T>
struct HashCRC32;
namespace doris {
-
class ObjectPool;
-class IRuntimeFilter;
class DescriptorTbl;
class RuntimeState;
@@ -77,11 +75,11 @@ class HashJoinNode;
template <typename Parent>
Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent*
parent,
bool is_global = false) {
- if (parent->runtime_filter_descs().empty()) {
+ if (parent->runtime_filters().empty()) {
return Status::OK();
}
parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
- parent->_build_expr_ctxs, parent->runtime_filter_descs(),
is_global);
+ parent->_build_expr_ctxs, parent->runtime_filters(), is_global);
RETURN_IF_ERROR(parent->_runtime_filter_slots->init(state, block->rows()));
@@ -236,7 +234,6 @@ public:
DataTypes right_table_data_types() { return _right_table_data_types; }
DataTypes left_table_data_types() { return _left_table_data_types; }
bool build_unique() const { return _build_unique; }
- std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return
_runtime_filter_descs; }
std::shared_ptr<vectorized::Arena> arena() { return _arena; }
protected:
@@ -400,9 +397,6 @@ private:
friend Status process_runtime_filter_build(RuntimeState* state,
vectorized::Block* block,
Parent* parent, bool is_global);
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-
- std::vector<IRuntimeFilter*> _runtime_filters;
std::atomic_bool _probe_open_finish = false;
std::vector<int> _build_col_ids;
};
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 3436209c4cd..6fab6b8b91f 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -79,7 +79,9 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const
TPlanNode& tnode, const Des
: tnode.hash_join_node.__isset.is_mark &&
tnode.hash_join_node.is_mark),
_short_circuit_for_null_in_build_side(_join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
- !_is_mark_join) {
+ !_is_mark_join),
+ _runtime_filter_descs(tnode.runtime_filters) {
+ _runtime_filters.resize(_runtime_filter_descs.size());
_init_join_op();
if (_is_mark_join) {
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 95d10a0a07d..0e6ac3c9837 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -35,7 +35,7 @@
namespace doris {
class ObjectPool;
class RuntimeState;
-
+class IRuntimeFilter;
} // namespace doris
namespace doris::vectorized {
@@ -74,6 +74,8 @@ public:
[[nodiscard]] bool can_terminate_early() override { return
_short_circuit_for_probe; }
+ const std::vector<IRuntimeFilter*>& runtime_filters() const { return
_runtime_filters; }
+
protected:
// Construct the intermediate blocks to store the results from join
operation.
void _construct_mutable_join_block();
@@ -147,6 +149,9 @@ protected:
RuntimeProfile::Counter* _runtime_filter_compute_timer = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
+
+ std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+ std::vector<IRuntimeFilter*> _runtime_filters;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 08d4c7d4487..150068096ba 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -34,7 +34,6 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
#include "exec/exec_node.h"
-#include "exprs/runtime_filter.h"
#include "exprs/runtime_filter_slots_cross.h"
#include "gutil/integral_types.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
@@ -43,14 +42,12 @@
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
-#include "vec/columns/column_const.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vexpr.h"
@@ -65,10 +62,10 @@ namespace doris::vectorized {
template <typename Parent>
Status RuntimeFilterBuild<Parent>::operator()(RuntimeState* state) {
- if (_parent->runtime_filter_descs().empty()) {
+ if (_parent->runtime_filters().empty()) {
return Status::OK();
}
- VRuntimeFilterSlotsCross
runtime_filter_slots(_parent->runtime_filter_descs(),
+ VRuntimeFilterSlotsCross runtime_filter_slots(_parent->runtime_filters(),
_parent->filter_src_expr_ctxs());
RETURN_IF_ERROR(runtime_filter_slots.init(state));
@@ -96,8 +93,7 @@ VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool,
const TPlanNode& tnod
_matched_rows_done(false),
_left_block_pos(0),
_left_side_eos(false),
- _old_version_flag(!tnode.__isset.nested_loop_join_node),
- _runtime_filter_descs(tnode.runtime_filters) {
+ _old_version_flag(!tnode.__isset.nested_loop_join_node) {
_left_block = Block::create_shared();
}
@@ -123,7 +119,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
- _runtime_filter_descs[i], state->query_options()));
+ _runtime_filter_descs[i], state->query_options(),
&_runtime_filters[i]));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs,
_filter_src_expr_ctxs));
return Status::OK();
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h
b/be/src/vec/exec/join/vnested_loop_join_node.h
index 73264859274..a8021bb4251 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -97,7 +97,6 @@ public:
std::shared_ptr<Block> get_left_block() { return _left_block; }
- std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return
_runtime_filter_descs; }
VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; }
RuntimeProfile::Counter* runtime_filter_compute_timer() {
return _runtime_filter_compute_timer;
@@ -267,7 +266,6 @@ private:
MutableColumns _dst_columns;
- std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
VExprContextSPtrs _filter_src_expr_ctxs;
bool _is_output_left_side_only = false;
bool _need_more_input_data = true;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 5e2d90bf62d..eba11dac45d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -53,10 +53,7 @@ Status RuntimeFilterConsumer::_register_runtime_filter(bool
is_global) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) {
- DCHECK(filter_desc.type == TRuntimeFilterType::BLOOM &&
filter_desc.has_remote_targets);
- // Optimize merging phase iff:
- // 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
- // 2. This filter is bloom filter (only bloom filter should be
used for merging)
+ DCHECK(filter_desc.has_remote_targets);
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
filter_desc, _state->query_options(), _filter_id,
&runtime_filter, false,
is_global));
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index 6f983ca5bf4..b47d7563454 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -41,7 +41,6 @@ public:
_profile = _runtime_state.runtime_profile();
_runtime_state.init_mem_trackers();
static_cast<void>(_runtime_state.init(unique_id, query_options,
query_globals, _env));
- _runtime_state.set_query_ctx(query_ctx);
}
void init();
@@ -75,7 +74,6 @@ private:
TUniqueId unique_id;
TQueryOptions query_options;
TQueryGlobals query_globals;
- QueryContext* query_ctx = nullptr;
};
void VWalScannerTest::init_desc_table() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
index 58d4d3eabe3..ade087e2de0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java
@@ -108,8 +108,6 @@ public final class RuntimeFilter {
private boolean bitmapFilterNotIn = false;
- private boolean useRemoteRfOpt = true;
-
private TMinMaxRuntimeFilterType tMinMaxRuntimeFilterType;
private boolean bloomFilterSizeCalculatedByNdv = false;
@@ -207,17 +205,6 @@ public final class RuntimeFilter {
this.bitmapFilterNotIn = bitmapFilterNotIn;
}
- public void computeUseRemoteRfOpt() {
- for (RuntimeFilterTarget target : targets) {
- useRemoteRfOpt = useRemoteRfOpt && hasRemoteTargets &&
runtimeFilterType == TRuntimeFilterType.BLOOM
- && target.expr instanceof SlotRef;
- }
- }
-
- public boolean getUseRemoteRfOpt() {
- return useRemoteRfOpt;
- }
-
/**
* Serializes a runtime filter to Thrift.
*/
@@ -229,12 +216,8 @@ public final class RuntimeFilter {
tFilter.setIsBroadcastJoin(isBroadcastJoin);
tFilter.setHasLocalTargets(hasLocalTargets);
tFilter.setHasRemoteTargets(hasRemoteTargets);
- boolean optRemoteRf = true;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(),
target.expr.treeToThrift());
- // TODO: now only support SlotRef
- optRemoteRf = optRemoteRf && hasRemoteTargets && runtimeFilterType
== TRuntimeFilterType.BLOOM
- && target.expr instanceof SlotRef;
}
tFilter.setType(runtimeFilterType);
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
@@ -245,7 +228,7 @@ public final class RuntimeFilter {
if (runtimeFilterType.equals(TRuntimeFilterType.MIN_MAX)) {
tFilter.setMinMaxType(tMinMaxRuntimeFilterType);
}
- tFilter.setOptRemoteRf(optRemoteRf);
+ tFilter.setOptRemoteRf(hasRemoteTargets);
tFilter.setBloomFilterSizeCalculatedByNdv(bloomFilterSizeCalculatedByNdv);
return tFilter;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5f4154cb2b2..784f63266c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3736,8 +3736,7 @@ public class Coordinator implements CoordInterface {
continue;
}
List<FRuntimeFilterTargetParam> fParams =
ridToTargetParam.get(rf.getFilterId());
- rf.computeUseRemoteRfOpt();
- if (rf.getUseRemoteRfOpt()) {
+ if (rf.hasRemoteTargets()) {
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2>
targetParamsV2 = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam :
fParams) {
if
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
@@ -3870,8 +3869,7 @@ public class Coordinator implements CoordInterface {
continue;
}
List<FRuntimeFilterTargetParam> fParams =
ridToTargetParam.get(rf.getFilterId());
- rf.computeUseRemoteRfOpt();
- if (rf.getUseRemoteRfOpt()) {
+ if (rf.hasRemoteTargets()) {
Map<TNetworkAddress, TRuntimeFilterTargetParamsV2>
targetParamsV2 = new HashMap<>();
for (FRuntimeFilterTargetParam targetParam :
fParams) {
if
(targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]