This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push:
new 61e3bf7262d update rf
61e3bf7262d is described below
commit 61e3bf7262d4d747add178f5a44e356e23be2d3e
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Oct 30 18:07:35 2023 +0800
update rf
update
---
be/src/exprs/bitmapfilter_predicate.h | 8 +-
be/src/exprs/block_bloom_filter.hpp | 20 +-
be/src/exprs/bloom_filter_func.h | 310 +++++++++++-------------------
be/src/exprs/hybrid_set.h | 119 +++++++-----
be/src/exprs/minmax_predicate.h | 273 ++++++++------------------
be/src/exprs/runtime_filter.cpp | 111 ++++-------
be/src/exprs/runtime_filter.h | 6 +-
be/src/exprs/runtime_filter_slots.h | 23 +--
be/src/exprs/runtime_filter_slots_cross.h | 24 +--
be/src/olap/bloom_filter_predicate.h | 60 +-----
be/src/vec/columns/column_dictionary.h | 29 +--
be/src/vec/common/hash_table/hash_map.h | 4 +
be/src/vec/exprs/vbloom_predicate.cpp | 37 +---
13 files changed, 335 insertions(+), 689 deletions(-)
diff --git a/be/src/exprs/bitmapfilter_predicate.h
b/be/src/exprs/bitmapfilter_predicate.h
index 743a55c4b6e..8df488cf875 100644
--- a/be/src/exprs/bitmapfilter_predicate.h
+++ b/be/src/exprs/bitmapfilter_predicate.h
@@ -31,7 +31,7 @@ namespace doris {
class BitmapFilterFuncBase : public FilterFuncBase {
public:
virtual void insert(const void* data) = 0;
- virtual void insert_many(const std::vector<const BitmapValue*> bitmaps) =
0;
+ virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) =
0;
virtual bool empty() = 0;
virtual Status assign(BitmapValue* bitmap_value) = 0;
virtual void light_copy(BitmapFilterFuncBase* other) { _not_in =
other->_not_in; }
@@ -60,7 +60,7 @@ public:
void insert(const void* data) override;
- void insert_many(const std::vector<const BitmapValue*> bitmaps) override;
+ void insert_many(const std::vector<const BitmapValue*>& bitmaps) override;
uint16_t find_fixed_len_olap_engine(const char* data, const uint8*
nullmap, uint16_t* offsets,
int number) override;
@@ -75,7 +75,7 @@ public:
return Status::OK();
}
- void light_copy(BitmapFilterFuncBase* bloomfilter_func) override;
+ void light_copy(BitmapFilterFuncBase* bitmapfilter_func) override;
size_t size() const override { return _bitmap_value->cardinality(); }
@@ -108,7 +108,7 @@ void BitmapFilterFunc<type>::insert(const void* data) {
}
template <PrimitiveType type>
-void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*>
bitmaps) {
+void BitmapFilterFunc<type>::insert_many(const std::vector<const
BitmapValue*>& bitmaps) {
if (bitmaps.empty()) {
return;
}
diff --git a/be/src/exprs/block_bloom_filter.hpp
b/be/src/exprs/block_bloom_filter.hpp
index 654867d6ccc..18c34bbb312 100644
--- a/be/src/exprs/block_bloom_filter.hpp
+++ b/be/src/exprs/block_bloom_filter.hpp
@@ -73,13 +73,6 @@ public:
void insert(uint32_t hash) noexcept;
// Same as above with convenience of hashing the key.
void insert(const Slice& key) noexcept {
- if (key.data) {
- insert(HashUtil::murmur_hash3_32(key.data, key.size, _hash_seed));
- }
- }
-
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- void insert_crc32_hash(const Slice& key) noexcept {
if (key.data) {
insert(HashUtil::crc_hash(key.data, key.size, _hash_seed));
}
@@ -124,21 +117,12 @@ public:
}
// Same as above with convenience of hashing the key.
bool find(const Slice& key) const noexcept {
- if (key.data) {
- return find(HashUtil::murmur_hash3_32(key.data, key.size,
_hash_seed));
- } else {
- return false;
- }
- }
-
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- bool find_crc32_hash(const Slice& key) const noexcept {
if (key.data) {
return find(HashUtil::crc_hash(key.data, key.size, _hash_seed));
- } else {
- return false;
}
+ return false;
}
+
// Computes the logical OR of this filter with 'other' and stores the
result in this
// filter.
// Notes:
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index a8330250ec0..a7b0904691f 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -53,23 +53,8 @@ public:
return _bloom_filter->find(data);
}
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- template <typename T>
- bool test_new_hash(T data) const {
- if constexpr (std::is_same_v<T, Slice>) {
- return _bloom_filter->find_crc32_hash(data);
- } else {
- return _bloom_filter->find(data);
- }
- }
-
void add_bytes(const char* data, size_t len) {
_bloom_filter->insert(Slice(data, len)); }
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- void add_bytes_new_hash(const char* data, size_t len) {
- _bloom_filter->insert_crc32_hash(Slice(data, len));
- }
-
// test_element/find_element only used on vectorized engine
template <typename T>
bool test_element(T element) const {
@@ -96,8 +81,6 @@ private:
// Only Used In RuntimeFilter
class BloomFilterFuncBase : public FilterFuncBase {
public:
- BloomFilterFuncBase() : _inited(false) {}
-
virtual ~BloomFilterFuncBase() = default;
Status init(int64_t expect_num, double fpp) {
@@ -112,9 +95,8 @@ public:
Status init_with_fixed_length() {
if (_build_bf_exactly) {
return Status::OK();
- } else {
- return init_with_fixed_length(_bloom_filter_length);
}
+ return init_with_fixed_length(_bloom_filter_length);
}
Status init_with_cardinality(const size_t build_bf_cardinality) {
@@ -128,9 +110,8 @@ public:
// Handle case where ndv == 1 => ceil(log2(m/8)) < 0.
int log_filter_size = std::max(0, (int)(std::ceil(std::log(m / 8)
/ std::log(2))));
return init_with_fixed_length(((int64_t)1) << log_filter_size);
- } else {
- return Status::OK();
}
+ return Status::OK();
}
Status init_with_fixed_length(int64_t bloom_filter_length) {
@@ -157,36 +138,35 @@ public:
// allocate memory again.
if (_inited) {
DCHECK(bloomfilter_func != nullptr);
- auto other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ auto* other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
- LOG(WARNING) << "bloom filter size not the same: already
allocated bytes = "
- << _bloom_filter_alloced << ", expected allocated
bytes = "
- << other_func->_bloom_filter_alloced;
- return Status::InvalidArgument("bloom filter size invalid");
+ return Status::InvalidArgument(
+ "bloom filter size not the same: already allocated
bytes {}, expected "
+ "allocated bytes {}",
+ _bloom_filter_alloced,
other_func->_bloom_filter_alloced);
}
return _bloom_filter->merge(other_func->_bloom_filter.get());
}
{
std::lock_guard<std::mutex> l(_lock);
if (!_inited) {
- auto other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ auto* other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
DCHECK(_bloom_filter == nullptr);
DCHECK(bloomfilter_func != nullptr);
_bloom_filter = bloomfilter_func->_bloom_filter;
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_inited = true;
return Status::OK();
- } else {
- DCHECK(bloomfilter_func != nullptr);
- auto other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
- if (_bloom_filter_alloced !=
other_func->_bloom_filter_alloced) {
- LOG(WARNING) << "bloom filter size not the same: already
allocated bytes = "
- << _bloom_filter_alloced << ", expected
allocated bytes = "
- << other_func->_bloom_filter_alloced;
- return Status::InvalidArgument("bloom filter size
invalid");
- }
- return _bloom_filter->merge(other_func->_bloom_filter.get());
}
+ DCHECK(bloomfilter_func != nullptr);
+ auto* other_func =
static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
+ return Status::InvalidArgument(
+ "bloom filter size not the same: already allocated
bytes {}, expected "
+ "allocated bytes {}",
+ _bloom_filter_alloced,
other_func->_bloom_filter_alloced);
+ }
+ return _bloom_filter->merge(other_func->_bloom_filter.get());
}
}
@@ -208,7 +188,7 @@ public:
size_t get_size() const { return _bloom_filter ? _bloom_filter->size() :
0; }
void light_copy(BloomFilterFuncBase* bloomfilter_func) {
- auto other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
+ auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
_bloom_filter = other_func->_bloom_filter;
_inited = other_func->_inited;
@@ -216,34 +196,21 @@ public:
virtual void insert(const void* data) = 0;
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- virtual void insert_crc32_hash(const void* data) = 0;
-
virtual bool find(const void* data) const = 0;
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- virtual bool find_crc32_hash(const void* data) const = 0;
-
virtual bool find_olap_engine(const void* data) const = 0;
virtual bool find_uint32_t(uint32_t data) const = 0;
- virtual void insert_fixed_len(const char* data, const int* offsets, int
number) = 0;
-
- virtual void insert_fixed_len(const char* data) = 0;
-
- virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8*
nullmap,
- uint16_t* offsets, int number,
- bool is_parse_column) = 0;
+ virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t
start) = 0;
- virtual void find_fixed_len(const char* data, const uint8* nullmap, int
number,
- uint8* results) = 0;
+ virtual void find_fixed_len(const vectorized::ColumnPtr& column, uint8_t*
results) = 0;
protected:
// bloom filter size
int32_t _bloom_filter_alloced;
std::shared_ptr<BloomFilterAdaptor> _bloom_filter;
- bool _inited;
+ bool _inited {};
std::mutex _lock;
int64_t _bloom_filter_length;
bool _build_bf_exactly = false;
@@ -251,77 +218,50 @@ protected:
template <class T>
struct CommonFindOp {
- // test_batch/find_batch/find_batch_olap_engine only used on vectorized
engine
- void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data,
const int* offsets,
- int number) const {
- for (int i = 0; i < number; i++) {
- bloom_filter.add_element(*((T*)data + offsets[i]));
- }
- }
-
- void insert_single(BloomFilterAdaptor& bloom_filter, const char* data)
const {
- bloom_filter.add_element(*((T*)data));
- }
-
- uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter,
const char* data,
- const uint8* nullmap, uint16_t* offsets,
int number,
- const bool is_parse_column) const {
- uint16_t new_size = 0;
- if (is_parse_column) {
- if (nullmap == nullptr) {
- for (int i = 0; i < number; i++) {
- uint16_t idx = offsets[i];
- if (!bloom_filter.test_element(*((T*)data + idx))) {
- continue;
- }
- offsets[new_size++] = idx;
- }
- } else {
- for (int i = 0; i < number; i++) {
- uint16_t idx = offsets[i];
- if (nullmap[idx]) {
- continue;
- }
- if (!bloom_filter.test_element(*((T*)data + idx))) {
- continue;
- }
- offsets[new_size++] = idx;
+ void insert_batch(BloomFilterAdaptor& bloom_filter, const
vectorized::ColumnPtr& column,
+ size_t start) const {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col = nullable->get_nested_column();
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ const T* data = (T*)col.get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ bloom_filter.add_element(*(data + i));
}
}
} else {
- if (nullmap == nullptr) {
- for (int i = 0; i < number; i++) {
- if (!bloom_filter.test_element(*((T*)data + i))) {
- continue;
- }
- offsets[new_size++] = i;
- }
- } else {
- for (int i = 0; i < number; i++) {
- if (nullmap[i]) {
- continue;
- }
- if (!bloom_filter.test_element(*((T*)data + i))) {
- continue;
- }
- offsets[new_size++] = i;
- }
+ const T* data = (T*)column->get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ bloom_filter.add_element(*(data + i));
}
}
- return new_size;
}
- void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap,
- int number, uint8* results) const {
- for (int i = 0; i < number; i++) {
- results[i] = false;
- if (nullmap != nullptr && nullmap[i]) {
- continue;
+ void find_batch(const BloomFilterAdaptor& bloom_filter, const
vectorized::ColumnPtr& column,
+ uint8_t* results) const {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ const T* data =
(T*)nullable->get_nested_column().get_raw_data().data;
+ for (size_t i = 0; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ results[i] = bloom_filter.test_element(data[i]);
+ } else {
+ results[i] = false;
+ }
}
- if (!bloom_filter.test_element(*((T*)data + i))) {
- continue;
+ } else {
+ const T* data = (T*)column->get_raw_data().data;
+ for (size_t i = 0; i < column->size(); i++) {
+ results[i] = bloom_filter.test_element(data[i]);
}
- results[i] = true;
}
}
@@ -340,43 +280,62 @@ struct CommonFindOp {
};
struct StringFindOp {
- void insert_batch(BloomFilterAdaptor& bloom_filter, const char* data,
const int* offsets,
- int number) const {
- LOG(FATAL) << "StringFindOp does not support insert_batch";
- }
-
- void insert_single(BloomFilterAdaptor& bloom_filter, const char* data)
const {
- LOG(FATAL) << "StringFindOp does not support insert_single";
- }
-
- uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter,
const char* data,
- const uint8* nullmap, uint16_t* offsets,
int number,
- const bool is_parse_column) const {
- LOG(FATAL) << "StringFindOp does not support find_batch_olap_engine";
- return 0;
- }
-
- void find_batch(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap,
- int number, uint8* results) const {
- LOG(FATAL) << "StringFindOp does not support find_batch";
+ static void insert_batch(BloomFilterAdaptor& bloom_filter, const
vectorized::ColumnPtr& column,
+ size_t start) {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnString&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ bloom_filter.add_element(col.get_data_at(i));
+ }
+ }
+ } else {
+ const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
+ for (size_t i = start; i < column->size(); i++) {
+ bloom_filter.add_element(col->get_data_at(i));
+ }
+ }
}
- void insert(BloomFilterAdaptor& bloom_filter, const void* data) const {
- const auto* value = reinterpret_cast<const StringRef*>(data);
- if (value) {
- bloom_filter.add_bytes(value->data, value->size);
+ static void find_batch(const BloomFilterAdaptor& bloom_filter,
+ const vectorized::ColumnPtr& column, uint8_t*
results) {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnString&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ for (size_t i = 0; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ results[i] = bloom_filter.test_element(col.get_data_at(i));
+ } else {
+ results[i] = false;
+ }
+ }
+ } else {
+ const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
+ for (size_t i = 0; i < column->size(); i++) {
+ results[i] = bloom_filter.test_element(col->get_data_at(i));
+ }
}
}
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- void insert_crc32_hash(BloomFilterAdaptor& bloom_filter, const void* data)
const {
+ static void insert(BloomFilterAdaptor& bloom_filter, const void* data) {
const auto* value = reinterpret_cast<const StringRef*>(data);
if (value) {
- bloom_filter.add_bytes_new_hash(value->data, value->size);
+ bloom_filter.add_bytes(value->data, value->size);
}
}
- bool find(const BloomFilterAdaptor& bloom_filter, const void* data) const {
+ static bool find(const BloomFilterAdaptor& bloom_filter, const void* data)
{
const auto* value = reinterpret_cast<const StringRef*>(data);
if (value == nullptr) {
return false;
@@ -384,19 +343,11 @@ struct StringFindOp {
return bloom_filter.test(Slice(value->data, value->size));
}
- //This function is only to be used if the be_exec_version may be less than
2. If updated, please delete it.
- bool find_crc32_hash(const BloomFilterAdaptor& bloom_filter, const void*
data) const {
- const auto* value = reinterpret_cast<const StringRef*>(data);
- if (value == nullptr) {
- return false;
- }
- return bloom_filter.test_new_hash(Slice(value->data, value->size));
- }
-
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void*
data) const {
+ static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const
void* data) {
return StringFindOp::find(bloom_filter, data);
}
- bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) const {
+
+ static bool find(const BloomFilterAdaptor& bloom_filter, uint32_t data) {
return bloom_filter.test(data);
}
};
@@ -404,7 +355,7 @@ struct StringFindOp {
// We do not need to judge whether data is empty, because null will not appear
// when filer used by the storage engine
struct FixedStringFindOp : public StringFindOp {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void*
input_data) const {
+ static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const
void* input_data) {
const auto* value = reinterpret_cast<const StringRef*>(input_data);
int64_t size = value->size;
const char* data = value->data;
@@ -417,7 +368,7 @@ struct FixedStringFindOp : public StringFindOp {
};
struct DateTimeFindOp : public CommonFindOp<VecDateTimeValue> {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void*
data) const {
+ static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const
void* data) {
VecDateTimeValue value;
value.from_olap_datetime(*reinterpret_cast<const uint64_t*>(data));
return bloom_filter.test(Slice((char*)&value,
sizeof(VecDateTimeValue)));
@@ -428,7 +379,7 @@ struct DateTimeFindOp : public
CommonFindOp<VecDateTimeValue> {
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=101684
struct DateFindOp : public CommonFindOp<VecDateTimeValue> {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void*
data) const {
+ static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const
void* data) {
uint24_t date = *static_cast<const uint24_t*>(data);
uint64_t value = uint32_t(date);
@@ -440,7 +391,7 @@ struct DateFindOp : public CommonFindOp<VecDateTimeValue> {
};
struct DecimalV2FindOp : public CommonFindOp<DecimalV2Value> {
- bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const void*
data) const {
+ static bool find_olap_engine(const BloomFilterAdaptor& bloom_filter, const
void* data) {
auto packed_decimal = *static_cast<const decimal12_t*>(data);
DecimalV2Value value;
int64_t int_value = packed_decimal.integer;
@@ -502,37 +453,13 @@ public:
dummy.insert(*_bloom_filter, data);
}
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- void insert_crc32_hash(const void* data) override {
- if constexpr (std::is_same_v<typename
BloomFilterTypeTraits<type>::FindOp, StringFindOp> ||
- std::is_same_v<typename
BloomFilterTypeTraits<type>::FindOp,
- FixedStringFindOp>) {
- DCHECK(_bloom_filter != nullptr);
- dummy.insert_crc32_hash(*_bloom_filter, data);
- } else {
- insert(data);
- }
- }
-
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
DCHECK(_bloom_filter != nullptr);
- dummy.insert_batch(*_bloom_filter, data, offsets, number);
+ dummy.insert_batch(*_bloom_filter, column, start);
}
- void insert_fixed_len(const char* data) override {
- DCHECK(_bloom_filter != nullptr);
- dummy.insert_single(*_bloom_filter, data);
- }
-
- uint16_t find_fixed_len_olap_engine(const char* data, const uint8*
nullmap, uint16_t* offsets,
- int number, const bool
is_parse_column) override {
- return dummy.find_batch_olap_engine(*_bloom_filter, data, nullmap,
offsets, number,
- is_parse_column);
- }
-
- void find_fixed_len(const char* data, const uint8* nullmap, int number,
- uint8* results) override {
- dummy.find_batch(*_bloom_filter, data, nullmap, number, results);
+ void find_fixed_len(const vectorized::ColumnPtr& column, uint8_t* results)
override {
+ dummy.find_batch(*_bloom_filter, column, results);
}
bool find(const void* data) const override {
@@ -540,17 +467,6 @@ public:
return dummy.find(*_bloom_filter, data);
}
- // This function is only to be used if the be_exec_version may be less
than 2. If updated, please delete it.
- bool find_crc32_hash(const void* data) const override {
- if constexpr (std::is_same_v<typename
BloomFilterTypeTraits<type>::FindOp, StringFindOp> ||
- std::is_same_v<typename
BloomFilterTypeTraits<type>::FindOp,
- FixedStringFindOp>) {
- DCHECK(_bloom_filter != nullptr);
- return dummy.find_crc32_hash(*_bloom_filter, data);
- }
- return find(data);
- }
-
bool find_olap_engine(const void* data) const override {
return dummy.find_olap_engine(*_bloom_filter, data);
}
diff --git a/be/src/exprs/hybrid_set.h b/be/src/exprs/hybrid_set.h
index 6a90bdd47cd..9151dc7d3bd 100644
--- a/be/src/exprs/hybrid_set.h
+++ b/be/src/exprs/hybrid_set.h
@@ -29,7 +29,7 @@
namespace doris {
-#define FIXED_CONTAINER_MAX_SIZE 8
+constexpr int FIXED_CONTAINER_MAX_SIZE = 8;
/**
* Fix Container can use simd to improve performance. 1 <= N <= 8 can be
improved performance by test. FIXED_CONTAINER_MAX_SIZE = 8.
@@ -44,7 +44,7 @@ public:
class Iterator;
- FixedContainer() : _size(0) { static_assert(N >= 0 && N <=
FIXED_CONTAINER_MAX_SIZE); }
+ FixedContainer() { static_assert(N >= 0 && N <= FIXED_CONTAINER_MAX_SIZE);
}
~FixedContainer() = default;
@@ -141,7 +141,7 @@ public:
private:
std::array<T, N> _data;
- size_t _size;
+ size_t _size {};
};
/**
@@ -183,7 +183,7 @@ public:
// use in vectorize execute engine
virtual void insert(void* data, size_t) = 0;
- virtual void insert_fixed_len(const char* data, const int* offsets, int
number) = 0;
+ virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t
start) = 0;
virtual void insert(HybridSetBase* set) {
HybridSetBase::IteratorBase* iter = set->begin();
@@ -199,11 +199,6 @@ public:
// use in vectorize execute engine
virtual bool find(const void* data, size_t) const = 0;
- virtual void find_fixed_len(const char* __restrict data, const uint8*
__restrict null_map,
- int number, uint8* __restrict results) {
- LOG(FATAL) << "HybridSetBase not support find_fixed_len";
- }
-
virtual void find_batch(const doris::vectorized::IColumn& column, size_t
rows,
doris::vectorized::ColumnUInt8::Container&
results) {
LOG(FATAL) << "HybridSetBase not support find_batch";
@@ -275,21 +270,29 @@ public:
if (data == nullptr) {
return;
}
-
- if constexpr (sizeof(ElementType) >= 16) {
- // for large int, it will core dump with no memcpy
- ElementType value;
- memcpy(&value, data, sizeof(ElementType));
- _set.insert(value);
+ _set.insert(*reinterpret_cast<const ElementType*>(data));
+ }
+ void insert(void* data, size_t /*unused*/) override { insert(data); }
+
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col = nullable->get_nested_column();
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ const ElementType* data = (ElementType*)col.get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ _set.insert(*(data + i));
+ }
+ }
} else {
- _set.insert(*reinterpret_cast<const ElementType*>(data));
- }
- }
- void insert(void* data, size_t) override { insert(data); }
-
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- for (int i = 0; i < number; i++) {
- insert((void*)((ElementType*)data + offsets[i]));
+ const ElementType* data =
(ElementType*)column->get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ _set.insert(*(data + i));
+ }
}
}
@@ -303,21 +306,7 @@ public:
return _set.find(*reinterpret_cast<const ElementType*>(data));
}
- bool find(const void* data, size_t) const override { return find(data); }
-
- void find_fixed_len(const char* __restrict data, const uint8* __restrict
null_map, int number,
- uint8* __restrict results) override {
- ElementType* value = (ElementType*)data;
- if (null_map == nullptr) {
- for (int i = 0; i < number; i++) {
- results[i] = _set.find(value[i]);
- }
- } else {
- for (int i = 0; i < number; i++) {
- results[i] = _set.find(value[i]) & !null_map[i];
- }
- }
- }
+ bool find(const void* data, size_t /*unused*/) const override { return
find(data); }
void find_batch(const doris::vectorized::IColumn& column, size_t rows,
doris::vectorized::ColumnUInt8::Container& results)
override {
@@ -414,8 +403,26 @@ public:
_set.insert(str_value);
}
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- LOG(FATAL) << "string set not support insert_fixed_len";
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnString&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ _set.insert(col.get_data_at(i).to_string());
+ }
+ }
+ } else {
+ const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
+ for (size_t i = start; i < column->size(); i++) {
+ _set.insert(col->get_data_at(i).to_string());
+ }
+ }
}
int size() override { return _set.size(); }
@@ -425,7 +432,7 @@ public:
return false;
}
- auto* value = reinterpret_cast<const StringRef*>(data);
+ const auto* value = reinterpret_cast<const StringRef*>(data);
std::string str_value(const_cast<const char*>(value->data),
value->size);
return _set.find(str_value);
}
@@ -461,7 +468,7 @@ public:
void _find_batch(const doris::vectorized::IColumn& column, size_t rows,
const doris::vectorized::NullMap* null_map,
doris::vectorized::ColumnUInt8::Container& results) {
- auto& col = assert_cast<const
doris::vectorized::ColumnString&>(column);
+ const auto& col = assert_cast<const
doris::vectorized::ColumnString&>(column);
const uint8_t* __restrict null_map_data;
if constexpr (is_nullable) {
null_map_data = null_map->data();
@@ -538,8 +545,26 @@ public:
_set.insert(sv);
}
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- LOG(FATAL) << "string set not support insert_fixed_len";
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnString&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ _set.insert(col.get_data_at(i));
+ }
+ }
+ } else {
+ const auto& col = assert_cast<const
vectorized::ColumnString*>(column.get());
+ for (size_t i = start; i < column->size(); i++) {
+ _set.insert(col->get_data_at(i));
+ }
+ }
}
int size() override { return _set.size(); }
@@ -549,7 +574,7 @@ public:
return false;
}
- auto* value = reinterpret_cast<const StringRef*>(data);
+ const auto* value = reinterpret_cast<const StringRef*>(data);
return _set.find(*value);
}
@@ -588,10 +613,10 @@ public:
void _find_batch(const doris::vectorized::IColumn& column, size_t rows,
const doris::vectorized::NullMap* null_map,
doris::vectorized::ColumnUInt8::Container& results) {
- auto& col = assert_cast<const
doris::vectorized::ColumnString&>(column);
+ const auto& col = assert_cast<const
doris::vectorized::ColumnString&>(column);
const uint32_t* __restrict offset = col.get_offsets().data();
const uint8_t* __restrict data = col.get_chars().data();
- uint8_t* __restrict cursor = const_cast<uint8_t*>(data);
+ auto* __restrict cursor = const_cast<uint8_t*>(data);
const uint8_t* __restrict null_map_data;
if constexpr (is_nullable) {
null_map_data = null_map->data();
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index cdf898292fc..efc4ebf8630 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -19,15 +19,16 @@
#include "common/object_pool.h"
#include "runtime/type_limit.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
namespace doris {
// only used in Runtime Filter
class MinMaxFuncBase {
public:
virtual void insert(const void* data) = 0;
- virtual void insert_fixed_len(const char* data, const int* offsets, int
number) = 0;
+ virtual void insert_fixed_len(const vectorized::ColumnPtr& column, size_t
start) = 0;
virtual bool find(void* data) = 0;
- virtual bool is_empty() = 0;
virtual void* get_max() = 0;
virtual void* get_min() = 0;
// assign minmax data
@@ -37,7 +38,7 @@ public:
virtual ~MinMaxFuncBase() = default;
};
-template <class T>
+template <class T, bool NeedMax = true, bool NeedMin = true>
class MinMaxNumFunc : public MinMaxFuncBase {
public:
MinMaxNumFunc() = default;
@@ -50,32 +51,52 @@ public:
T val_data = *reinterpret_cast<const T*>(data);
- if (_empty) {
- _min = val_data;
- _max = val_data;
- _empty = false;
- return;
+ if constexpr (NeedMin) {
+ if (val_data < _min) {
+ _min = val_data;
+ }
}
- if (val_data < _min) {
- _min = val_data;
- } else if (val_data > _max) {
- _max = val_data;
+
+ if constexpr (NeedMax) {
+ if (val_data > _max) {
+ _max = val_data;
+ }
}
}
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- if (!number) {
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start)
override {
+ if (column->empty()) {
return;
}
- if (_empty) {
- _min = *((T*)data + offsets[0]);
- _max = *((T*)data + offsets[0]);
- }
- for (int i = _empty; i < number; i++) {
- _min = std::min(_min, *((T*)data + offsets[i]));
- _max = std::max(_max, *((T*)data + offsets[i]));
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col = nullable->get_nested_column();
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+
+ const T* data = (T*)col.get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min, *(data + i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max, *(data + i));
+ }
+ }
+ }
+ } else {
+ const T* data = (T*)column->get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min, *(data + i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max, *(data + i));
+ }
+ }
}
- _empty = false;
}
bool find(void* data) override {
@@ -84,40 +105,55 @@ public:
}
T val_data = *reinterpret_cast<T*>(data);
- return val_data >= _min && val_data <= _max;
+ if constexpr (NeedMin) {
+ if (val_data < _min) {
+ return false;
+ }
+ }
+ if constexpr (NeedMax) {
+ if (val_data > _max) {
+ return false;
+ }
+ }
+ return true;
}
Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
if constexpr (std::is_same_v<T, StringRef>) {
- MinMaxNumFunc<T>* other_minmax =
static_cast<MinMaxNumFunc<T>*>(minmax_func);
-
- if (other_minmax->_min < _min) {
- auto& other_min = other_minmax->_min;
- auto str = pool->add(new std::string(other_min.data,
other_min.size));
- _min.data = str->data();
- _min.size = str->length();
+ auto* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+ if constexpr (NeedMin) {
+ if (other_minmax->_min < _min) {
+ auto& other_min = other_minmax->_min;
+ auto* str = pool->add(new std::string(other_min.data,
other_min.size));
+ _min.data = str->data();
+ _min.size = str->length();
+ }
}
- if (other_minmax->_max > _max) {
- auto& other_max = other_minmax->_max;
- auto str = pool->add(new std::string(other_max.data,
other_max.size));
- _max.data = str->data();
- _max.size = str->length();
+ if constexpr (NeedMax) {
+ if (other_minmax->_max > _max) {
+ auto& other_max = other_minmax->_max;
+ auto* str = pool->add(new std::string(other_max.data,
other_max.size));
+ _max.data = str->data();
+ _max.size = str->length();
+ }
}
} else {
- MinMaxNumFunc<T>* other_minmax =
static_cast<MinMaxNumFunc<T>*>(minmax_func);
- if (other_minmax->_min < _min) {
- _min = other_minmax->_min;
+ auto* other_minmax = static_cast<MinMaxNumFunc<T>*>(minmax_func);
+ if constexpr (NeedMin) {
+ if (other_minmax->_min < _min) {
+ _min = other_minmax->_min;
+ }
}
- if (other_minmax->_max > _max) {
- _max = other_minmax->_max;
+ if constexpr (NeedMax) {
+ if (other_minmax->_max > _max) {
+ _max = other_minmax->_max;
+ }
}
}
return Status::OK();
}
- bool is_empty() override { return _empty; }
-
void* get_max() override { return &_max; }
void* get_min() override { return &_min; }
@@ -131,161 +167,12 @@ public:
protected:
T _max = type_limit<T>::min();
T _min = type_limit<T>::max();
- // we use _empty to avoid compare twice
- bool _empty = true;
};
template <class T>
-class MinNumFunc : public MinMaxNumFunc<T> {
-public:
- MinNumFunc() = default;
- ~MinNumFunc() override = default;
-
- void insert(const void* data) override {
- if (data == nullptr) {
- return;
- }
-
- T val_data = *reinterpret_cast<const T*>(data);
-
- if (this->_empty) {
- this->_min = val_data;
- this->_empty = false;
- return;
- }
- if (val_data < this->_min) {
- this->_min = val_data;
- }
- }
-
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- if (!number) {
- return;
- }
- if (this->_empty) {
- this->_min = *((T*)data + offsets[0]);
- }
- for (int i = this->_empty; i < number; i++) {
- this->_min = std::min(this->_min, *((T*)data + offsets[i]));
- }
- this->_empty = false;
- }
-
- bool find(void* data) override {
- if (data == nullptr) {
- return false;
- }
-
- T val_data = *reinterpret_cast<T*>(data);
- return val_data >= this->_min;
- }
-
- Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
- if constexpr (std::is_same_v<T, StringRef>) {
- MinNumFunc<T>* other_minmax =
assert_cast<MinNumFunc<T>*>(minmax_func);
- if (other_minmax->_min < this->_min) {
- auto& other_min = other_minmax->_min;
- auto str = pool->add(new std::string(other_min.data,
other_min.size));
- this->_min.data = str->data();
- this->_min.size = str->length();
- }
- } else {
- MinNumFunc<T>* other_minmax =
assert_cast<MinNumFunc<T>*>(minmax_func);
- if (other_minmax->_min < this->_min) {
- this->_min = other_minmax->_min;
- }
- }
-
- return Status::OK();
- }
-
- //min filter the max is useless, so return nullptr directly
- void* get_max() override {
- DCHECK(false);
- return nullptr;
- }
-
- Status assign(void* min_data, void* max_data) override {
- this->_min = *(T*)min_data;
- return Status::OK();
- }
-};
+using MinNumFunc = MinMaxNumFunc<T, false, true>;
template <class T>
-class MaxNumFunc : public MinMaxNumFunc<T> {
-public:
- MaxNumFunc() = default;
- ~MaxNumFunc() override = default;
-
- void insert(const void* data) override {
- if (data == nullptr) {
- return;
- }
-
- T val_data = *reinterpret_cast<const T*>(data);
-
- if (this->_empty) {
- this->_max = val_data;
- this->_empty = false;
- return;
- }
- if (val_data > this->_max) {
- this->_max = val_data;
- }
- }
-
- void insert_fixed_len(const char* data, const int* offsets, int number)
override {
- if (!number) {
- return;
- }
- if (this->_empty) {
- this->_max = *((T*)data + offsets[0]);
- }
- for (int i = this->_empty; i < number; i++) {
- this->_max = std::max(this->_max, *((T*)data + offsets[i]));
- }
- this->_empty = false;
- }
-
- bool find(void* data) override {
- if (data == nullptr) {
- return false;
- }
-
- T val_data = *reinterpret_cast<T*>(data);
- return val_data <= this->_max;
- }
-
- Status merge(MinMaxFuncBase* minmax_func, ObjectPool* pool) override {
- if constexpr (std::is_same_v<T, StringRef>) {
- MinMaxNumFunc<T>* other_minmax =
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
-
- if (other_minmax->_max > this->_max) {
- auto& other_max = other_minmax->_max;
- auto str = pool->add(new std::string(other_max.data,
other_max.size));
- this->_max.data = str->data();
- this->_max.size = str->length();
- }
- } else {
- MinMaxNumFunc<T>* other_minmax =
assert_cast<MinMaxNumFunc<T>*>(minmax_func);
- if (other_minmax->_max > this->_max) {
- this->_max = other_minmax->_max;
- }
- }
-
- return Status::OK();
- }
-
- //max filter the min is useless, so return nullptr directly
- void* get_min() override {
- DCHECK(false);
- return nullptr;
- }
-
- Status assign(void* min_data, void* max_data) override {
- this->_max = *(T*)max_data;
- return Status::OK();
- }
-};
+using MaxNumFunc = MinMaxNumFunc<T, true, false>;
} // namespace doris
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index c6e64fd0e55..53c222488e3 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -51,6 +51,7 @@
#include "util/string_parser.hpp"
#include "vec/columns/column.h"
#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
#include "vec/common/assert_cast.h"
#include "vec/core/wide_integer.h"
#include "vec/core/wide_integer_to_string.h"
@@ -286,10 +287,7 @@ public:
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
- _filter_id(params->filter_id),
- _use_batch(
- IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
- _use_new_hash(_be_exec_version >= 2) {}
+ _filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool,
PrimitiveType column_type,
@@ -299,10 +297,7 @@ public:
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
- _filter_id(filter_id),
- _use_batch(
- IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
- _use_new_hash(_be_exec_version >= 2) {}
+ _filter_id(filter_id) {}
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
const RuntimeFilterParams* params)
@@ -311,10 +306,7 @@ public:
_pool(pool),
_column_return_type(params->column_return_type),
_filter_type(params->filter_type),
- _filter_id(params->filter_id),
- _use_batch(
- IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
- _use_new_hash(_be_exec_version >= 2) {}
+ _filter_id(params->filter_id) {}
// for a 'tmp' runtime predicate wrapper
// only could called assign method or as a param for merge
RuntimePredicateWrapper(QueryContext* query_ctx, ObjectPool* pool,
PrimitiveType column_type,
@@ -324,10 +316,7 @@ public:
_pool(pool),
_column_return_type(column_type),
_filter_type(type),
- _filter_id(filter_id),
- _use_batch(
- IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)),
- _use_new_hash(_be_exec_version >= 2) {}
+ _filter_id(filter_id) {}
// init runtime filter wrapper
// alloc memory to init runtime filter function
Status init(const RuntimeFilterParams* params) {
@@ -389,23 +378,10 @@ public:
void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
if (_context.hybrid_set->size() > 0) {
- auto it = _context.hybrid_set->begin();
-
- if (_use_batch) {
- while (it->has_next()) {
- bloom_filter->insert_fixed_len((char*)it->get_value());
- it->next();
- }
- } else {
- while (it->has_next()) {
- if (_use_new_hash) {
- bloom_filter->insert_crc32_hash(it->get_value());
- } else {
- bloom_filter->insert(it->get_value());
- }
-
- it->next();
- }
+ auto* it = _context.hybrid_set->begin();
+ while (it->has_next()) {
+ bloom_filter->insert(it->get_value());
+ it->next();
}
}
}
@@ -428,20 +404,12 @@ public:
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
- if (_use_new_hash) {
- _context.bloom_filter_func->insert_crc32_hash(data);
- } else {
- _context.bloom_filter_func->insert(data);
- }
+ _context.bloom_filter_func->insert(data);
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
if (_is_bloomfilter) {
- if (_use_new_hash) {
- _context.bloom_filter_func->insert_crc32_hash(data);
- } else {
- _context.bloom_filter_func->insert(data);
- }
+ _context.bloom_filter_func->insert(data);
} else {
_context.hybrid_set->insert(data);
}
@@ -457,30 +425,30 @@ public:
}
}
- void insert_fixed_len(const char* data, const int* offsets, int number) {
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (_is_ignored_in_filter) {
break;
}
- _context.hybrid_set->insert_fixed_len(data, offsets, number);
+ _context.hybrid_set->insert_fixed_len(column, start);
break;
}
case RuntimeFilterType::MIN_FILTER:
case RuntimeFilterType::MAX_FILTER:
case RuntimeFilterType::MINMAX_FILTER: {
- _context.minmax_func->insert_fixed_len(data, offsets, number);
+ _context.minmax_func->insert_fixed_len(column, start);
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
- _context.bloom_filter_func->insert_fixed_len(data, offsets,
number);
+ _context.bloom_filter_func->insert_fixed_len(column, start);
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
if (_is_bloomfilter) {
- _context.bloom_filter_func->insert_fixed_len(data, offsets,
number);
+ _context.bloom_filter_func->insert_fixed_len(column, start);
} else {
- _context.hybrid_set->insert_fixed_len(data, offsets, number);
+ _context.hybrid_set->insert_fixed_len(column, start);
}
break;
}
@@ -508,24 +476,33 @@ public:
}
}
- void insert_batch(const vectorized::ColumnPtr column, const
std::vector<int>& rows) {
+ void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
- bitmap_filter_insert_batch(column, rows);
- } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0,
_column_return_type)) {
- insert_fixed_len(column->get_raw_data().data, rows.data(),
rows.size());
+ bitmap_filter_insert_batch(column, start);
} else {
- for (int index : rows) {
- insert(column->get_data_at(index));
- }
+ insert_fixed_len(column, start);
}
}
- void bitmap_filter_insert_batch(const vectorized::ColumnPtr column,
- const std::vector<int>& rows) {
+ void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t
start) {
std::vector<const BitmapValue*> bitmaps;
- auto* col = assert_cast<const
vectorized::ColumnComplexType<BitmapValue>*>(column.get());
- for (int index : rows) {
- bitmaps.push_back(&(col->get_data()[index]));
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnBitmap&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ bitmaps.push_back(&(col.get_data()[i]));
+ }
+ }
+ } else {
+ const auto* col = assert_cast<const
vectorized::ColumnBitmap*>(column.get());
+ for (size_t i = start; i < column->size(); i++) {
+ bitmaps.push_back(&(col->get_data()[i]));
+ }
}
_context.bitmap_filter_func->insert_many(bitmaps);
}
@@ -1039,13 +1016,6 @@ private:
bool _is_ignored_in_filter = false;
std::string* _ignored_in_filter_msg = nullptr;
uint32_t _filter_id;
-
- // When _column_return_type is invalid, _use_batch will be always false.
- bool _use_batch;
-
- // When _use_new_hash is set to true, use the new hash method.
- // This is only to be used if the be_exec_version may be less than 2. If
updated, please delete it.
- const bool _use_new_hash;
};
Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const
TRuntimeFilterDesc* desc,
@@ -1092,10 +1062,9 @@ void IRuntimeFilter::insert(const StringRef& value) {
_wrapper->insert(value);
}
-void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column,
- const std::vector<int>& rows) {
+void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t
start) {
DCHECK(is_producer());
- _wrapper->insert_batch(column, rows);
+ _wrapper->insert_batch(column, start);
}
Status IRuntimeFilter::publish() {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index fdd3b02ad63..ed4298dda85 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -262,7 +262,7 @@ public:
// only used for producer
void insert(const void* data);
void insert(const StringRef& data);
- void insert_batch(vectorized::ColumnPtr column, const std::vector<int>&
rows);
+ void insert_batch(vectorized::ColumnPtr column, size_t start);
// publish filter
// push filter to remote node or push down it to scan_node
@@ -350,10 +350,6 @@ public:
void update_runtime_filter_type_to_profile();
- static bool enable_use_batch(bool use_batch, PrimitiveType type) {
- return use_batch && (is_int_or_bool(type) || is_float_or_double(type));
- }
-
int filter_id() const { return _filter_id; }
static std::string to_string(RuntimeFilterType type) {
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 0f841e5a60f..01155493f87 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -172,29 +172,8 @@ public:
int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
for (const auto* it : datas) {
auto column = it->get_by_position(result_column_id).column;
-
- std::vector<int> indexs;
- // indexs start from 1 because the first row is mocked for
join hash map
- if (const auto* nullable =
-
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
- column = nullable->get_nested_column_ptr();
- const uint8_t* null_map = assert_cast<const
vectorized::ColumnUInt8*>(
-
nullable->get_null_map_column_ptr().get())
- ->get_data()
- .data();
- for (int i = 1; i < column->size(); i++) {
- if (null_map[i]) {
- continue;
- }
- indexs.push_back(i);
- }
- } else {
- for (int i = 1; i < column->size(); i++) {
- indexs.push_back(i);
- }
- }
for (auto* filter : iter->second) {
- filter->insert_batch(column, indexs);
+ filter->insert_batch(column, 1);
}
}
}
diff --git a/be/src/exprs/runtime_filter_slots_cross.h
b/be/src/exprs/runtime_filter_slots_cross.h
index 4868b27a4ea..76b6085bab9 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -61,7 +61,7 @@ public:
Status insert(vectorized::Block* block) {
for (int i = 0; i < _runtime_filters.size(); ++i) {
auto* filter = _runtime_filters[i];
- auto& vexpr_ctx = filter_src_expr_ctxs[i];
+ const auto& vexpr_ctx = filter_src_expr_ctxs[i];
int result_column_id = -1;
RETURN_IF_ERROR(vexpr_ctx->execute(block, &result_column_id));
@@ -70,25 +70,7 @@ public:
block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
- auto& column = block->get_by_position(result_column_id).column;
- if (auto* nullable =
-
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& column_nested = nullable->get_nested_column_ptr();
- auto& column_nullmap = nullable->get_null_map_column_ptr();
- std::vector<int> indexs;
- for (int row_index = 0; row_index < column->size();
++row_index) {
- if (assert_cast<const
vectorized::ColumnUInt8*>(column_nullmap.get())
- ->get_bool(row_index)) {
- continue;
- }
- indexs.push_back(row_index);
- }
- filter->insert_batch(column_nested, indexs);
- } else {
- std::vector<int> rows(column->size());
- std::iota(rows.begin(), rows.end(), 0);
- filter->insert_batch(column, rows);
- }
+
filter->insert_batch(block->get_by_position(result_column_id).column, 0);
}
return Status::OK();
}
@@ -100,7 +82,7 @@ public:
return Status::OK();
}
- bool empty() { return !_runtime_filters.size(); }
+ bool empty() { return _runtime_filters.empty(); }
private:
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
diff --git a/be/src/olap/bloom_filter_predicate.h
b/be/src/olap/bloom_filter_predicate.h
index d2816be9966..87f5ff266c3 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -76,61 +76,17 @@ private:
uint16_t new_size = 0;
if (column.is_column_dictionary()) {
- auto* dict_col = reinterpret_cast<const
vectorized::ColumnDictI32*>(&column);
- if (_be_exec_version >= 2) {
- for (uint16_t i = 0; i < size; i++) {
- uint16_t idx = sel[i];
- sel[new_size] = idx;
- if constexpr (is_nullable) {
- new_size += !null_map[idx] &&
_specific_filter->find_uint32_t(
-
dict_col->get_crc32_hash_value(idx));
- } else {
- new_size += _specific_filter->find_uint32_t(
- dict_col->get_crc32_hash_value(idx));
- }
- }
- } else {
- for (uint16_t i = 0; i < size; i++) {
- uint16_t idx = sel[i];
- sel[new_size] = idx;
- if constexpr (is_nullable) {
- new_size += !null_map[idx] &&
-
_specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
- } else {
- new_size +=
_specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
- }
- }
- }
- } else if (is_string_type(T) && _be_exec_version >= 2) {
- auto& pred_col =
- reinterpret_cast<
- const
vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
- &column)
- ->get_data();
-
- auto pred_col_data = pred_col.data();
- const bool is_dense_column = pred_col.size() == size;
+ const auto* dict_col = reinterpret_cast<const
vectorized::ColumnDictI32*>(&column);
for (uint16_t i = 0; i < size; i++) {
- uint16_t idx = is_dense_column ? i : sel[i];
+ uint16_t idx = sel[i];
+ sel[new_size] = idx;
if constexpr (is_nullable) {
- if (!null_map[idx] &&
-
_specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) {
- sel[new_size++] = idx;
- }
+ new_size += !null_map[idx] &&
+
_specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
} else {
- if
(_specific_filter->find_crc32_hash(get_cell_value(pred_col_data[idx]))) {
- sel[new_size++] = idx;
- }
+ new_size +=
_specific_filter->find_uint32_t(dict_col->get_hash_value(idx));
}
}
- } else if (IRuntimeFilter::enable_use_batch(_be_exec_version > 0, T)) {
- const auto& data =
- reinterpret_cast<
- const
vectorized::PredicateColumnType<PredicateEvaluateType<T>>*>(
- &column)
- ->get_data();
- new_size =
_specific_filter->find_fixed_len_olap_engine((char*)data.data(), null_map,
- sel, size,
data.size() != size);
} else {
auto& pred_col =
reinterpret_cast<
@@ -177,8 +133,8 @@ uint16_t BloomFilterColumnPredicate<T>::evaluate(const
vectorized::IColumn& colu
return size;
}
if (column.is_nullable()) {
- auto* nullable_col = reinterpret_cast<const
vectorized::ColumnNullable*>(&column);
- auto& null_map_data = nullable_col->get_null_map_column().get_data();
+ const auto* nullable_col = reinterpret_cast<const
vectorized::ColumnNullable*>(&column);
+ const auto& null_map_data =
nullable_col->get_null_map_column().get_data();
new_size =
evaluate<true>(nullable_col->get_nested_column(),
null_map_data.data(), sel, size);
} else {
diff --git a/be/src/vec/columns/column_dictionary.h
b/be/src/vec/columns/column_dictionary.h
index 1f107e629f4..1b7ef90f61b 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -283,9 +283,7 @@ public:
}
uint32_t get_hash_value(uint32_t idx) const { return
_dict.get_hash_value(_codes[idx], _type); }
- uint32_t get_crc32_hash_value(uint32_t idx) const {
- return _dict.get_crc32_hash_value(_codes[idx], _type);
- }
+
template <typename HybridSetType>
void find_codes(const HybridSetType* values,
std::vector<vectorized::UInt8>& selected) const {
return _dict.find_codes(values, selected);
@@ -384,31 +382,6 @@ public:
}
inline uint32_t get_hash_value(T code, FieldType type) const {
- if (_compute_hash_value_flags[code]) {
- return _hash_values[code];
- } else {
- auto& sv = (*_dict_data)[code];
- // The char data is stored in the disk with the schema length,
- // and zeros are filled if the length is insufficient
-
- // When reading data, use
shrink_char_type_column_suffix_zero(_char_type_idx)
- // Remove the suffix 0
- // When writing data, use the CharField::consume function to
fill in the trailing 0.
-
- // For dictionary data of char type, sv.size is the schema
length,
- // so use strnlen to remove the 0 at the end to get the actual
length.
- int32_t len = sv.size;
- if (type == FieldType::OLAP_FIELD_TYPE_CHAR) {
- len = strnlen(sv.data, sv.size);
- }
- uint32_t hash_val = HashUtil::murmur_hash3_32(sv.data, len, 0);
- _hash_values[code] = hash_val;
- _compute_hash_value_flags[code] = 1;
- return _hash_values[code];
- }
- }
-
- inline uint32_t get_crc32_hash_value(T code, FieldType type) const {
if (_compute_hash_value_flags[code]) {
return _hash_values[code];
} else {
diff --git a/be/src/vec/common/hash_table/hash_map.h
b/be/src/vec/common/hash_table/hash_map.h
index c26234b4e22..07528b857fa 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -20,6 +20,8 @@
#pragma once
+#include <gen_cpp/PlanNodes_types.h>
+
#include <span>
#include "vec/common/hash_table/hash.h"
@@ -232,6 +234,8 @@ public:
uint32_t get_bucket_size() const { return bucket_size; }
+ size_t size() const { return next.size(); }
+
void build(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums,
size_t num_elem) {
build_keys = keys;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index 06bd21a6eb3..176ecb219ce 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -88,41 +88,16 @@ Status VBloomPredicate::execute(VExprContext* context,
Block* block, int* result
block->get_by_position(arguments[0]).column->convert_to_full_column_if_const();
size_t sz = argument_column->size();
res_data_column->resize(sz);
- auto ptr =
((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
+ auto* ptr =
((ColumnVector<UInt8>*)res_data_column.get())->get_data().data();
auto type =
WhichDataType(remove_nullable(block->get_by_position(arguments[0]).type));
if (type.is_string_or_fixed_string()) {
- // When _be_exec_version is equal to or greater than 2, we use the new
hash method.
- // This is only to be used if the be_exec_version may be less than 2.
If updated, please delete it.
- if (_be_exec_version >= 2) {
- for (size_t i = 0; i < sz; i++) {
- /// TODO: remove virtual function call in get_data_at to
improve performance
- auto ele = argument_column->get_data_at(i);
- const StringRef v(ele.data, ele.size);
- ptr[i] = _filter->find_crc32_hash(reinterpret_cast<const
void*>(&v));
- }
- } else {
- for (size_t i = 0; i < sz; i++) {
- auto ele = argument_column->get_data_at(i);
- const StringRef v(ele.data, ele.size);
- ptr[i] = _filter->find(reinterpret_cast<const void*>(&v));
- }
- }
- } else if (_be_exec_version > 0 && (type.is_int_or_uint() ||
type.is_float())) {
- if (argument_column->is_nullable()) {
- auto column_nested = reinterpret_cast<const
ColumnNullable*>(argument_column.get())
- ->get_nested_column_ptr();
- auto column_nullmap = reinterpret_cast<const
ColumnNullable*>(argument_column.get())
- ->get_null_map_column_ptr();
- _filter->find_fixed_len(column_nested->get_raw_data().data,
-
(uint8*)column_nullmap->get_raw_data().data, sz, ptr);
- } else {
- _filter->find_fixed_len(argument_column->get_raw_data().data,
nullptr, sz, ptr);
- }
- } else {
for (size_t i = 0; i < sz; i++) {
- ptr[i] = _filter->find(
- reinterpret_cast<const
void*>(argument_column->get_data_at(i).data));
+ auto ele = argument_column->get_data_at(i);
+ const StringRef v(ele.data, ele.size);
+ ptr[i] = _filter->find(reinterpret_cast<const void*>(&v));
}
+ } else {
+ _filter->find_fixed_len(argument_column, ptr);
}
if (_data_type->is_nullable()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]