This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e962a7309b3 [Chore](runtime-filter) adjust some check and error msg on
runtime filter (#35018) (#35251)
e962a7309b3 is described below
commit e962a7309b38c0c8cd2bcb8f6d3337d2a3ab3847
Author: Pxl <[email protected]>
AuthorDate: Thu May 23 11:20:02 2024 +0800
[Chore](runtime-filter) adjust some check and error msg on runtime filter
(#35018) (#35251)
adjust some check and error msg on runtime filter
---
be/src/exprs/bloom_filter_func.h | 7 -------
be/src/exprs/runtime_filter.cpp | 24 ++++++++++++++++--------
be/src/exprs/runtime_filter.h | 9 ++++-----
be/src/exprs/runtime_filter_slots.h | 4 ++++
be/src/runtime/runtime_filter_mgr.cpp | 8 +++++++-
be/src/vec/exec/runtime_filter_consumer.cpp | 2 +-
6 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index bc56c7b505a..a831395a5ea 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -133,11 +133,6 @@ public:
}
Status init_with_fixed_length(int64_t bloom_filter_length) {
- if (_inited) {
- return Status::OK();
- }
- // TODO: really need the lock?
- std::lock_guard<std::mutex> l(_lock);
if (_inited) {
return Status::OK();
}
@@ -154,7 +149,6 @@ public:
// If `_inited` is false, there is no memory allocated in bloom filter
and this is the first
// call for `merge` function. So we just reuse this bloom filter, and
we don't need to
// allocate memory again.
- std::lock_guard<std::mutex> l(_lock);
if (!_inited) {
auto* other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
DCHECK(_bloom_filter == nullptr);
@@ -228,7 +222,6 @@ protected:
int32_t _bloom_filter_alloced;
std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
bool _inited {};
- std::mutex _lock;
int64_t _bloom_filter_length;
bool _build_bf_exactly = false;
bool _bloom_filter_size_calculated_by_ndv = false;
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index e51b3c739f6..3e07943c45e 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -364,9 +364,11 @@ public:
}
bool get_build_bf_cardinality() const {
- DCHECK(_filter_type == RuntimeFilterType::BLOOM_FILTER ||
- _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);
- return _context->bloom_filter_func->get_build_bf_cardinality();
+ if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
+ _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ return _context->bloom_filter_func->get_build_bf_cardinality();
+ }
+ return false;
}
void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
@@ -1522,15 +1524,21 @@ void
IRuntimeFilter::update_runtime_filter_type_to_profile() {
_profile->add_info_string("RealRuntimeFilterType",
to_string(_wrapper->get_real_type()));
}
+std::string IRuntimeFilter::debug_string() const {
+ return fmt::format(
+ "RuntimeFilter: (id = {}, type = {}, need_local_merge: {},
is_broadcast: {}, "
+ "build_bf_cardinality: {}",
+ _filter_id, to_string(_runtime_filter_type), _need_local_merge,
_is_broadcast_join,
+ _wrapper->get_build_bf_cardinality());
+}
+
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
auto status = _wrapper->merge(wrapper);
if (!status) {
- LOG(WARNING) << "runtime filter merge failed: " << _name
- << " ,need_local_merge: " << _need_local_merge
- << " ,is_broadcast: " << _is_broadcast_join;
- DCHECK(false); // rpc response is often ignored, so let it crash
directly here
+ return Status::InternalError("runtime filter merge failed: {},
error_msg: {}",
+ debug_string(), status.msg());
}
- return status;
+ return Status::OK();
}
template <typename T>
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 4733d39e298..ee6897be322 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -209,9 +209,9 @@ public:
_rf_wait_time_ms(_state->runtime_filter_wait_time_ms),
_enable_pipeline_exec(_state->enable_pipeline_exec),
_runtime_filter_type(get_runtime_filter_type(desc)),
- _name(fmt::format("RuntimeFilter: (id = {}, type = {})",
_filter_id,
- to_string(_runtime_filter_type))),
- _profile(new RuntimeProfile(_name)),
+ _profile(
+ new RuntimeProfile(fmt::format("RuntimeFilter: (id = {},
type = {})",
+ _filter_id,
to_string(_runtime_filter_type)))),
_need_local_merge(need_local_merge) {}
~IRuntimeFilter() = default;
@@ -311,7 +311,7 @@ public:
void init_profile(RuntimeProfile* parent_profile);
- std::string& get_name() { return _name; }
+ std::string debug_string() const;
void update_runtime_filter_type_to_profile();
@@ -442,7 +442,6 @@ protected:
std::atomic<bool> _profile_init = false;
// runtime filter type
RuntimeFilterType _runtime_filter_type;
- std::string _name;
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index fbc9ae6ceb3..b5b04a1ebac 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -101,6 +101,10 @@ public:
}
if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
+ if (filter->need_sync_filter_size() !=
filter->isset_synced_size()) {
+ return Status::InternalError("sync filter size meet error,
filter: {}",
+ filter->debug_string());
+ }
RETURN_IF_ERROR(
filter->init_bloom_filter(get_real_size(filter,
local_hash_table_size)));
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 9f3d26d6a16..010cb5a60e5 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -416,7 +416,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
RuntimeFilterWrapperHolder holder;
RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms, pool,
holder.getHandle()));
-
RETURN_IF_ERROR(cnt_val->filter->merge_from(holder.getHandle()->get()));
+ auto st = cnt_val->filter->merge_from(holder.getHandle()->get());
+ if (!st) {
+ // prevent error ignored
+ DCHECK(false) << st.msg();
+ return st;
+ }
+
cnt_val->arrive_id.insert(UniqueId(request->fragment_instance_id()));
merged_size = cnt_val->arrive_id.size();
// TODO: avoid log when we had acquired a lock
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 66fd0297c98..30c2cc14917 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -42,7 +42,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile*
profile) {
fmt::memory_buffer buffer;
for (auto& rf_ctx : _runtime_filter_ctxs) {
rf_ctx.runtime_filter->init_profile(profile);
- fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->get_name());
+ fmt::format_to(buffer, "{}, ", rf_ctx.runtime_filter->debug_string());
}
profile->add_info_string("RuntimeFilters: ", to_string(buffer));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]