This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dae8f607e17 [Improvement](set) change set_operator's hash map to phmap
(#43273)
dae8f607e17 is described below
commit dae8f607e176480306471754d0391df3daeee8a7
Author: Pxl <[email protected]>
AuthorDate: Mon Nov 11 14:18:02 2024 +0800
[Improvement](set) change set_operator's hash map to phmap (#43273)
### What problem does this PR solve?
1. change set_operator's hash map to phmap.
2. optimize the logic of refresh hash table
```sql
+-------+--------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-------+---------+-------+
| k1 | int | Yes | true | NULL | |
| k2 | int | No | false | NULL | NONE |
| k3 | bigint | Yes | false | NULL | NONE |
| k4 | varchar(100) | Yes | false | NULL | NONE |
+-------+--------------+------+-------+---------+-------+
```
```sql
select count(*) from (select * from a_table intersect select * from
b_table)t;
before:Time(ms)=37768|CpuTimeMS=61754|PeakMemoryBytes=129921088
after: Time(ms)=25218|CpuTimeMS=44571|PeakMemoryBytes=129902624
```
```sql
select count(*) from (select k1 from a_table intersect select k1 from
b_table)t;
before: Time(ms)=7964|CpuTimeMS=13431|PeakMemoryBytes=130062400
after: Time(ms)=7459|CpuTimeMS=11976|PeakMemoryBytes=130091048
```
---
be/src/pipeline/common/set_utils.h | 74 +++++++++---------------
be/src/pipeline/exec/join/join_op.h | 18 ++----
be/src/pipeline/exec/set_probe_sink_operator.cpp | 74 +++++++++++++-----------
be/src/pipeline/exec/set_source_operator.cpp | 17 +++++-
be/src/vec/common/hash_table/hash_map.h | 5 --
be/src/vec/common/hash_table/hash_map_context.h | 7 +--
be/src/vec/common/hash_table/ph_hash_map.h | 11 ++--
be/src/vec/common/hash_table/ph_hash_set.h | 3 +-
be/src/vec/common/hash_table/string_hash_map.h | 1 +
9 files changed, 96 insertions(+), 114 deletions(-)
diff --git a/be/src/pipeline/common/set_utils.h
b/be/src/pipeline/common/set_utils.h
index ed64035fb42..2caf5b7d0b8 100644
--- a/be/src/pipeline/common/set_utils.h
+++ b/be/src/pipeline/common/set_utils.h
@@ -25,21 +25,32 @@
namespace doris {
-template <class Key>
-using SetFixedKeyHashTableContext =
- vectorized::MethodKeysFixed<HashMap<Key,
pipeline::RowRefListWithFlags, HashCRC32<Key>>>;
+template <typename T>
+using SetData = PHHashMap<T, RowRefListWithFlags, HashCRC32<T>>;
-template <class T>
-using SetPrimaryTypeHashTableContext =
- vectorized::MethodOneNumber<T, HashMap<T,
pipeline::RowRefListWithFlags, HashCRC32<T>>>;
+template <typename T>
+using SetFixedKeyHashTableContext = vectorized::MethodKeysFixed<SetData<T>>;
+
+template <typename T>
+using SetPrimaryTypeHashTableContext = vectorized::MethodOneNumber<T,
SetData<T>>;
+
+template <typename T>
+using SetPrimaryTypeHashTableContextNullable =
vectorized::MethodSingleNullableColumn<
+ vectorized::MethodOneNumber<T,
vectorized::DataWithNullKey<SetData<T>>>>;
using SetSerializedHashTableContext =
- vectorized::MethodSerialized<HashMap<StringRef,
pipeline::RowRefListWithFlags>>;
+ vectorized::MethodSerialized<PHHashMap<StringRef,
RowRefListWithFlags>>;
using SetMethodOneString =
- vectorized::MethodStringNoCache<HashMap<StringRef,
pipeline::RowRefListWithFlags>>;
+ vectorized::MethodStringNoCache<PHHashMap<StringRef,
RowRefListWithFlags>>;
using SetHashTableVariants =
std::variant<std::monostate, SetSerializedHashTableContext,
SetMethodOneString,
+ SetPrimaryTypeHashTableContextNullable<vectorized::UInt8>,
+
SetPrimaryTypeHashTableContextNullable<vectorized::UInt16>,
+
SetPrimaryTypeHashTableContextNullable<vectorized::UInt32>,
+
SetPrimaryTypeHashTableContextNullable<vectorized::UInt64>,
+
SetPrimaryTypeHashTableContextNullable<vectorized::UInt128>,
+
SetPrimaryTypeHashTableContextNullable<vectorized::UInt256>,
SetPrimaryTypeHashTableContext<vectorized::UInt8>,
SetPrimaryTypeHashTableContext<vectorized::UInt16>,
SetPrimaryTypeHashTableContext<vectorized::UInt32>,
@@ -51,9 +62,9 @@ using SetHashTableVariants =
SetFixedKeyHashTableContext<vectorized::UInt256>,
SetFixedKeyHashTableContext<vectorized::UInt136>>;
-struct SetDataVariants {
- SetHashTableVariants method_variant;
-
+struct SetDataVariants
+ : public DataVariants<SetHashTableVariants,
vectorized::MethodSingleNullableColumn,
+ vectorized::MethodOneNumber,
vectorized::DataWithNullKey> {
void init(const std::vector<vectorized::DataTypePtr>& data_types,
HashKeyType type) {
bool nullable = data_types.size() == 1 && data_types[0]->is_nullable();
switch (type) {
@@ -61,51 +72,22 @@ struct SetDataVariants {
method_variant.emplace<SetSerializedHashTableContext>();
break;
case HashKeyType::int8_key:
- if (nullable) {
-
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
- get_key_sizes(data_types));
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt8>>();
- }
+ emplace_single<vectorized::UInt8,
SetData<vectorized::UInt8>>(nullable);
break;
case HashKeyType::int16_key:
- if (nullable) {
-
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
- get_key_sizes(data_types));
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt16>>();
- }
+ emplace_single<vectorized::UInt16,
SetData<vectorized::UInt16>>(nullable);
break;
case HashKeyType::int32_key:
- if (nullable) {
-
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt64>>(
- get_key_sizes(data_types));
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt32>>();
- }
+ emplace_single<vectorized::UInt32,
SetData<vectorized::UInt32>>(nullable);
break;
case HashKeyType::int64_key:
- if (nullable) {
-
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt128>>(
- get_key_sizes(data_types));
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt64>>();
- }
+ emplace_single<vectorized::UInt64,
SetData<vectorized::UInt64>>(nullable);
break;
case HashKeyType::int128_key:
- if (nullable) {
-
method_variant.emplace<SetFixedKeyHashTableContext<vectorized::UInt136>>(
- get_key_sizes(data_types));
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt128>>();
- }
+ emplace_single<vectorized::UInt128,
SetData<vectorized::UInt128>>(nullable);
break;
case HashKeyType::int256_key:
- if (nullable) {
- method_variant.emplace<SetSerializedHashTableContext>();
- } else {
-
method_variant.emplace<SetPrimaryTypeHashTableContext<vectorized::UInt256>>();
- }
+ emplace_single<vectorized::UInt256,
SetData<vectorized::UInt256>>(nullable);
break;
case HashKeyType::string_key:
method_variant.emplace<SetMethodOneString>();
diff --git a/be/src/pipeline/exec/join/join_op.h
b/be/src/pipeline/exec/join/join_op.h
index 616753b72de..f3bd47a911e 100644
--- a/be/src/pipeline/exec/join/join_op.h
+++ b/be/src/pipeline/exec/join/join_op.h
@@ -20,7 +20,7 @@
#include "vec/common/columns_hashing.h"
#include "vec/core/block.h"
-namespace doris::pipeline {
+namespace doris {
/**
* Now we have different kinds of RowRef for join operation. Overall, RowRef
is the base class and
* the class inheritance is below:
@@ -129,12 +129,10 @@ struct RowRefList : RowRef {
RowRefList() = default;
RowRefList(size_t row_num_) : RowRef(row_num_) {}
- ForwardIterator<RowRefList> begin() { return
ForwardIterator<RowRefList>(this); }
+ ForwardIterator<RowRefList> begin() { return {this}; }
/// insert element after current one
- void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
- next.emplace_back(std::move(row_ref));
- }
+ void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(row_ref); }
void clear() { next.clear(); }
@@ -149,9 +147,7 @@ struct RowRefListWithFlag : RowRef {
RowRefListWithFlag() = default;
RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
- ForwardIterator<RowRefListWithFlag> const begin() {
- return ForwardIterator<RowRefListWithFlag>(this);
- }
+ ForwardIterator<RowRefListWithFlag> begin() { return {this}; }
/// insert element after current one
void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(row_ref); }
@@ -171,9 +167,7 @@ struct RowRefListWithFlags : RowRefWithFlag {
RowRefListWithFlags() = default;
RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
- ForwardIterator<RowRefListWithFlags> const begin() {
- return ForwardIterator<RowRefListWithFlags>(this);
- }
+ ForwardIterator<RowRefListWithFlags> begin() { return {this}; }
/// insert element after current one
void insert(RowRefType&& row_ref, vectorized::Arena& pool) {
next.emplace_back(row_ref); }
@@ -185,4 +179,4 @@ private:
std::vector<RowRefType> next;
};
-} // namespace doris::pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 813dad3ad79..4c250d5603b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -210,46 +210,52 @@ void
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- auto tmp_hash_table =
- std::make_shared<typename
HashTableCtxType::HashMapType>();
- bool is_need_shrink =
-
arg.hash_table->should_be_shrink(valid_element_in_hash_tbl);
- if (is_intersect || is_need_shrink) {
- tmp_hash_table->init_buf_size(size_t(
- valid_element_in_hash_tbl /
arg.hash_table->get_factor() + 1));
- }
-
arg.init_iterator();
auto& iter = arg.iterator;
auto iter_end = arg.hash_table->end();
- std::visit(
- [&](auto is_need_shrink_const) {
- while (iter != iter_end) {
- auto& mapped = iter->get_second();
- auto it = mapped.begin();
-
- if constexpr (is_intersect) { //intersected
- if (it->visited) {
- it->visited = false;
-
tmp_hash_table->insert(iter->get_value());
- }
- ++iter;
- } else { //except
- if constexpr (is_need_shrink_const) {
- if (!it->visited) {
-
tmp_hash_table->insert(iter->get_value());
- }
- }
- ++iter;
- }
- }
- },
- vectorized::make_bool_variant(is_need_shrink));
- arg.reset();
- if (is_intersect || is_need_shrink) {
+ constexpr double need_shrink_ratio = 0.25;
+ bool is_need_shrink =
+ is_intersect
+ ? (valid_element_in_hash_tbl <
+ arg.hash_table
+ ->size()) // When intersect,
shrink as long as the element decreases
+ : (valid_element_in_hash_tbl <
+ arg.hash_table->size() *
+ need_shrink_ratio); // When
except, element decreases need to within the 'need_shrink_ratio' before
shrinking
+
+ if (is_need_shrink) {
+ auto tmp_hash_table =
+ std::make_shared<typename
HashTableCtxType::HashMapType>();
+ tmp_hash_table->reserve(
+
local_state._shared_state->valid_element_in_hash_tbl);
+ while (iter != iter_end) {
+ auto& mapped = iter->get_second();
+ auto it = mapped.begin();
+
+ if constexpr (is_intersect) {
+ if (it->visited) {
+ it->visited = false;
+ tmp_hash_table->insert(iter->get_first(),
iter->get_second());
+ }
+ } else {
+ if (!it->visited) {
+ tmp_hash_table->insert(iter->get_first(),
iter->get_second());
+ }
+ }
+ ++iter;
+ }
arg.hash_table = std::move(tmp_hash_table);
+ } else if (is_intersect) {
+ while (iter != iter_end) {
+ auto& mapped = iter->get_second();
+ auto it = mapped.begin();
+ it->visited = false;
+ ++iter;
+ }
}
+
+ arg.reset();
} else {
LOG(FATAL) << "FATAL: uninited hash table";
__builtin_unreachable();
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index ebcd13ddf14..91c98288d8b 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -18,6 +18,7 @@
#include "set_source_operator.h"
#include <memory>
+#include <type_traits>
#include "common/status.h"
#include "pipeline/exec/operator.h"
@@ -124,11 +125,9 @@ Status
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
vectorized::Block* output_block, const int batch_size, bool* eos) {
size_t left_col_len = local_state._left_table_data_types.size();
hash_table_ctx.init_iterator();
- auto& iter = hash_table_ctx.iterator;
auto block_size = 0;
- for (; iter != hash_table_ctx.hash_table->end() && block_size <
batch_size; ++iter) {
- auto& value = iter->get_second();
+ auto add_result = [&local_state, &block_size, this](auto value) {
auto it = value.begin();
if constexpr (is_intersect) {
if (it->visited) { //intersected: have done probe, so visited
values it's the result
@@ -139,9 +138,21 @@ Status
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
_add_result_columns(local_state, value, block_size);
}
}
+ };
+
+ auto& iter = hash_table_ctx.iterator;
+ for (; iter != hash_table_ctx.hash_table->end() && block_size <
batch_size; ++iter) {
+ add_result(iter->get_second());
}
*eos = iter == hash_table_ctx.hash_table->end();
+ if (*eos && hash_table_ctx.hash_table->has_null_key_data()) {
+ auto value = hash_table_ctx.hash_table->template
get_null_key_data<RowRefListWithFlags>();
+ if constexpr (std::is_same_v<RowRefListWithFlags,
std::decay_t<decltype(value)>>) {
+ add_result(value);
+ }
+ }
+
if (!output_block->mem_reuse()) {
for (int i = 0; i < left_col_len; ++i) {
output_block->insert(
diff --git a/be/src/vec/common/hash_table/hash_map.h
b/be/src/vec/common/hash_table/hash_map.h
index 448ddd5b7c5..8cb02d6a80d 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -201,11 +201,6 @@ using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped,
Hash>, Hash, Grower,
template <typename Key, typename Hash = DefaultHash<Key>>
using JoinHashMap = JoinHashTable<Key, Hash>;
-template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
- typename Grower = HashTableGrower<>, typename Allocator =
HashTableAllocator>
-using HashMapWithSavedHash =
- HashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash,
Grower, Allocator>;
-
template <typename Key, typename Mapped, typename Hash, size_t
initial_size_degree>
using HashMapWithStackMemory = HashMapTable<
Key, HashMapCellWithSavedHash<Key, Mapped, Hash>, Hash,
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index 16a793d7500..875c035b425 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -33,10 +33,6 @@
#include "vec/core/types.h"
#include "vec/utils/util.hpp"
-namespace doris::pipeline {
-struct RowRefListWithFlags;
-}
-
namespace doris::vectorized {
constexpr auto BITSIZE = 8;
@@ -587,8 +583,7 @@ struct DataWithNullKey : public Base {
private:
bool has_null_key = false;
- // null_key_data store AggregateDataPtr on agg node, store PartitionBlocks
on partition sort node.
- void* null_key_data = nullptr;
+ Base::Value null_key_data;
};
/// Single low cardinality column.
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h
b/be/src/vec/common/hash_table/ph_hash_map.h
index de320425223..414624c6e1a 100644
--- a/be/src/vec/common/hash_table/ph_hash_map.h
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -40,6 +40,7 @@ public:
using key_type = Key;
using mapped_type = Mapped;
+ using Value = Mapped;
using value_type = std::pair<const Key, Mapped>;
using LookupResult = std::pair<const Key, Mapped>*;
@@ -154,10 +155,8 @@ public:
[&](const auto& ctor) {
f(ctor, key, key); });
}
- void ALWAYS_INLINE insert(const Key& key, size_t hash_value, const Mapped&
value) {
- auto it = &*_hash_map.lazy_emplace_with_hash(key, hash_value,
- [&](const auto& ctor) {
ctor(key, value); });
- it->second = value;
+ void ALWAYS_INLINE insert(const Key& key, const Mapped& value) {
+ _hash_map.lazy_emplace(key, [&](const auto& ctor) { ctor(key, value);
});
}
template <typename KeyHolder>
@@ -190,8 +189,6 @@ public:
return capacity * sizeof(typename HashMapImpl::slot_type);
}
- size_t get_buffer_size_in_cells() const { return _hash_map.capacity(); }
-
bool add_elem_size_overflow(size_t row) const {
const auto capacity = _hash_map.capacity();
// phmap use 7/8th as maximum load factor.
@@ -209,7 +206,7 @@ public:
void clear_and_shrink() { _hash_map.clear(); }
- void expanse_for_add_elem(size_t num_elem) { _hash_map.reserve(num_elem); }
+ void reserve(size_t num_elem) { _hash_map.reserve(num_elem); }
private:
HashMapImpl _hash_map;
diff --git a/be/src/vec/common/hash_table/ph_hash_set.h
b/be/src/vec/common/hash_table/ph_hash_set.h
index 6ace649b7bb..79faca9d670 100644
--- a/be/src/vec/common/hash_table/ph_hash_set.h
+++ b/be/src/vec/common/hash_table/ph_hash_set.h
@@ -36,6 +36,7 @@ public:
using key_type = Key;
using mapped_type = void;
using value_type = void;
+ using Value = void*;
using LookupResult = void*;
@@ -104,7 +105,7 @@ public:
void clear_and_shrink() { _hash_set.clear(); }
- void expanse_for_add_elem(size_t num_elem) { _hash_set.reserve(num_elem); }
+ void reserve(size_t num_elem) { _hash_set.reserve(num_elem); }
private:
HashSetImpl _hash_set;
diff --git a/be/src/vec/common/hash_table/string_hash_map.h
b/be/src/vec/common/hash_table/string_hash_map.h
index 6c7a9e74dca..cffe32e82ce 100644
--- a/be/src/vec/common/hash_table/string_hash_map.h
+++ b/be/src/vec/common/hash_table/string_hash_map.h
@@ -114,6 +114,7 @@ public:
using Base = StringHashTable<StringHashMapSubMaps<TMapped, Allocator>>;
using Self = StringHashMap;
using LookupResult = typename Base::LookupResult;
+ using Value = TMapped;
using Base::Base;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]