This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 247063485944e727975b75c32b9bb519581b4c43 Author: HappenLee <[email protected]> AuthorDate: Mon Mar 11 10:16:56 2024 +0800 [RuntimeFilter] fix <=> runtime filter failed bug (#32003) --- be/src/exprs/bloom_filter_func.h | 68 +++++++++++++++++---------------- be/src/exprs/runtime_filter.cpp | 29 +++++++------- be/src/exprs/runtime_filter.h | 5 ++- be/src/vec/exprs/vdirect_in_predicate.h | 7 ++++ gensrc/proto/internal_service.proto | 3 ++ 5 files changed, 64 insertions(+), 48 deletions(-) diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h index cc9d8a71390..f5e822e2572 100644 --- a/be/src/exprs/bloom_filter_func.h +++ b/be/src/exprs/bloom_filter_func.h @@ -98,8 +98,6 @@ public: void set_build_bf_exactly(bool build_bf_exactly) { _build_bf_exactly = build_bf_exactly; } - void set_null_aware(bool null_aware) { _null_aware = null_aware; } - Status init_with_fixed_length() { return init_with_fixed_length(_bloom_filter_length); } Status init_with_cardinality(const size_t build_bf_cardinality) { @@ -139,44 +137,39 @@ public: // If `_inited` is false, there is no memory allocated in bloom filter and this is the first // call for `merge` function. So we just reuse this bloom filter, and we don't need to // allocate memory again. - if (_inited) { - DCHECK(bloomfilter_func != nullptr); + std::lock_guard<std::mutex> l(_lock); + if (!_inited) { auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); - if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - return Status::InvalidArgument( - "bloom filter size not the same: already allocated bytes {}, expected " - "allocated bytes {}", - _bloom_filter_alloced, other_func->_bloom_filter_alloced); - } - return _bloom_filter->merge(other_func->_bloom_filter.get()); - } - { - std::lock_guard<std::mutex> l(_lock); - if (!_inited) { - auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); - DCHECK(_bloom_filter == nullptr); - DCHECK(bloomfilter_func != nullptr); - _bloom_filter = bloomfilter_func->_bloom_filter; - _bloom_filter_alloced = other_func->_bloom_filter_alloced; - _inited = true; - return Status::OK(); - } + DCHECK(_bloom_filter == nullptr); DCHECK(bloomfilter_func != nullptr); - auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); - if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { - return Status::InvalidArgument( - "bloom filter size not the same: already allocated bytes {}, expected " - "allocated bytes {}", - _bloom_filter_alloced, other_func->_bloom_filter_alloced); - } - return _bloom_filter->merge(other_func->_bloom_filter.get()); + _bloom_filter = bloomfilter_func->_bloom_filter; + _bloom_filter_alloced = other_func->_bloom_filter_alloced; + _inited = true; + return Status::OK(); } + DCHECK(bloomfilter_func != nullptr); + auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func); + if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) { + return Status::InvalidArgument( + "bloom filter size not the same: already allocated bytes {}, expected " + "allocated bytes {}", + _bloom_filter_alloced, other_func->_bloom_filter_alloced); + } + if (other_func->_bloom_filter->contain_null()) { + _bloom_filter->set_contain_null(); + } + return _bloom_filter->merge(other_func->_bloom_filter.get()); } - Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size) { + Status assign(butil::IOBufAsZeroCopyInputStream* data, const size_t data_size, + bool contain_null) { if (_bloom_filter == nullptr) { + _null_aware = contain_null; _bloom_filter.reset(BloomFilterAdaptor::create(_null_aware)); } + if (contain_null) { + _bloom_filter->set_contain_null(); + } _bloom_filter_alloced = data_size; return _bloom_filter->init(data, data_size); @@ -187,6 +180,16 @@ public: *len = _bloom_filter->size(); } + bool contain_null() const { + DCHECK(_bloom_filter); + return _bloom_filter->contain_null(); + } + + void set_contain_null() { + DCHECK(_bloom_filter); + _bloom_filter->set_contain_null(); + } + size_t get_size() const { return _bloom_filter ? _bloom_filter->size() : 0; } void light_copy(BloomFilterFuncBase* bloomfilter_func) { @@ -214,7 +217,6 @@ protected: std::mutex _lock; int64_t _bloom_filter_length; bool _build_bf_exactly = false; - bool _null_aware = false; }; template <typename T, bool need_trim = false> diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 5cd195de515..a949969ca65 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -31,7 +31,6 @@ #include <memory> #include <mutex> #include <ostream> -#include <utility> #include "common/logging.h" #include "common/object_pool.h" @@ -40,7 +39,6 @@ #include "exprs/bloom_filter_func.h" #include "exprs/create_predicate_function.h" #include "exprs/hybrid_set.h" -#include "exprs/minmax_predicate.h" #include "gutil/strings/substitute.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/define_primitive_type.h" @@ -720,14 +718,18 @@ public: // used by shuffle runtime filter // assign this filter by protobuf - Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data) { + Status assign(const PBloomFilter* bloom_filter, butil::IOBufAsZeroCopyInputStream* data, + bool contain_null) { _is_bloomfilter = true; // we won't use this class to insert or find any data // so any type is ok _context.bloom_filter_func.reset(create_bloom_filter(_column_return_type == INVALID_TYPE ? PrimitiveType::TYPE_INT : _column_return_type)); - return _context.bloom_filter_func->assign(data, bloom_filter->filter_length()); + RETURN_IF_ERROR(_context.bloom_filter_func->assign(data, bloom_filter->filter_length(), + contain_null)); + + return Status::OK(); } // used by shuffle runtime filter @@ -877,6 +879,10 @@ public: bool is_bloomfilter() const { return _is_bloomfilter; } + bool contain_null() const { + return _is_bloomfilter && _context.bloom_filter_func->contain_null(); + } + bool is_ignored() const { return _ignored; } const std::string& ignored_msg() const { return _ignored_msg; } @@ -1302,7 +1308,8 @@ Status IRuntimeFilter::create_wrapper(const UpdateRuntimeFilterParamsV2* param, } case PFilterType::BLOOM_FILTER: { DCHECK(param->request->has_bloom_filter()); - return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); + return (*wrapper)->assign(¶m->request->bloom_filter(), param->data, + param->request->contain_null()); } case PFilterType::MIN_FILTER: case PFilterType::MAX_FILTER: @@ -1344,7 +1351,8 @@ Status IRuntimeFilter::_create_wrapper(const T* param, ObjectPool* pool, } case PFilterType::BLOOM_FILTER: { DCHECK(param->request->has_bloom_filter()); - return (*wrapper)->assign(¶m->request->bloom_filter(), param->data); + return (*wrapper)->assign(¶m->request->bloom_filter(), param->data, + param->request->contain_null()); } case PFilterType::MIN_FILTER: case PFilterType::MAX_FILTER: @@ -1400,6 +1408,7 @@ Status IRuntimeFilter::serialize_impl(T* request, void** data, int* len) { } request->set_filter_type(get_type(real_runtime_filter_type)); + request->set_contain_null(_wrapper->contain_null()); if (real_runtime_filter_type == RuntimeFilterType::IN_FILTER) { auto in_filter = request->mutable_in_filter(); @@ -1732,7 +1741,6 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex node.in_predicate.__set_is_not_in(false); node.__set_opcode(TExprOpcode::FILTER_IN); node.__set_is_nullable(false); - auto in_pred = vectorized::VDirectInPredicate::create_shared(node); in_pred->set_filter(_context.hybrid_set); in_pred->add_child(probe_ctx->root()); @@ -1753,9 +1761,6 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex min_pred->add_child(min_literal); container.push_back( vectorized::VRuntimeFilterWrapper::create_shared(min_pred_node, min_pred)); - vectorized::VExprContextSPtr new_probe_ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); - probe_ctxs.push_back(new_probe_ctx); break; } case RuntimeFilterType::MAX_FILTER: { @@ -1771,10 +1776,6 @@ Status RuntimePredicateWrapper::get_push_exprs(std::list<vectorized::VExprContex max_pred->add_child(max_literal); container.push_back( vectorized::VRuntimeFilterWrapper::create_shared(max_pred_node, max_pred)); - - vectorized::VExprContextSPtr new_probe_ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx)); - probe_ctxs.push_back(new_probe_ctx); break; } case RuntimeFilterType::MINMAX_FILTER: { diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index be334ee939a..5cfc88f4ed8 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -146,8 +146,11 @@ public: bool is_runtime_filter() const { return _filter_id != -1; } -private: + void set_null_aware(bool null_aware) { _null_aware = null_aware; } + +protected: int _filter_id = -1; + bool _null_aware = false; }; struct UpdateRuntimeFilterParams { diff --git a/be/src/vec/exprs/vdirect_in_predicate.h b/be/src/vec/exprs/vdirect_in_predicate.h index a68a6c3121a..fbba76de61d 100644 --- a/be/src/vec/exprs/vdirect_in_predicate.h +++ b/be/src/vec/exprs/vdirect_in_predicate.h @@ -67,6 +67,12 @@ public: const auto& null_map = static_cast<const ColumnNullable*>(argument_column.get())->get_null_map_data(); _filter->find_batch_nullable(*column_nested, sz, null_map, res_data_column->get_data()); + if (_null_aware) { + auto* __restrict res_data = res_data_column->get_data().data(); + for (size_t i = 0; i < sz; ++i) { + res_data[i] |= null_map[i]; + } + } } else { _filter->find_batch(*argument_column, sz, res_data_column->get_data()); } @@ -87,6 +93,7 @@ public: private: std::shared_ptr<HybridSetBase> _filter; + bool _null_aware = false; std::string _expr_name; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 87e793aa750..8b9cce2acb9 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -514,6 +514,7 @@ message PMergeFilterRequest { optional bool is_pipeline = 8; optional bool opt_remote_rf = 9; optional PColumnType column_type = 10; + optional bool contain_null = 11; }; message PMergeFilterResponse { @@ -532,6 +533,7 @@ message PPublishFilterRequest { optional bool is_pipeline = 8; optional int64 merge_time = 9; optional PColumnType column_type = 10; + optional bool contain_null = 11; }; message PPublishFilterRequestV2 { @@ -544,6 +546,7 @@ message PPublishFilterRequestV2 { optional PInFilter in_filter = 7; optional bool is_pipeline = 8; optional int64 merge_time = 9; + optional bool contain_null = 10; }; message PPublishFilterResponse { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
