This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev_join by this push:
new c3089031635 update rf
c3089031635 is described below
commit c3089031635ced43b76f87282fceeade5d4c32fa
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Wed Oct 18 14:22:02 2023 +0800
update rf
---
be/src/exprs/runtime_filter_slots.h | 35 ++++++++---------
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
be/src/vec/exec/join/vhash_join_node.h | 61 ++++++++----------------------
3 files changed, 35 insertions(+), 63 deletions(-)
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index e0ff2cb0067..307253f430c 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -161,7 +161,7 @@ public:
return Status::OK();
}
- void insert(std::unordered_map<const vectorized::Block*,
std::vector<int>>& datas) {
+ void insert(const std::unordered_set<const vectorized::Block*>& datas) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
if (iter == _runtime_filters.end()) {
@@ -169,30 +169,31 @@ public:
}
int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
- for (auto it : datas) {
- auto& column =
it.first->get_by_position(result_column_id).column;
+ for (const auto* it : datas) {
+ auto column = it->get_by_position(result_column_id).column;
- if (auto* nullable =
+ std::vector<int> indexs;
+ if (const auto* nullable =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& column_nested = nullable->get_nested_column_ptr();
- auto& column_nullmap = nullable->get_null_map_column_ptr();
- std::vector<int> indexs;
- for (int row_num : it.second) {
- if (assert_cast<const
vectorized::ColumnUInt8*>(column_nullmap.get())
- ->get_bool(row_num)) {
+ column = nullable->get_nested_column_ptr();
+ const uint8_t* null_map = assert_cast<const
vectorized::ColumnUInt8*>(
+
nullable->get_null_map_column_ptr().get())
+ ->get_data()
+ .data();
+ for (int i = 0; i < column->size(); i++) {
+ if (null_map[i]) {
continue;
}
- indexs.push_back(row_num);
+ indexs.push_back(i);
}
- for (auto filter : iter->second) {
- filter->insert_batch(column_nested, indexs);
- }
-
} else {
- for (auto filter : iter->second) {
- filter->insert_batch(column, it.second);
+ for (int i = 0; i < column->size(); i++) {
+ indexs.push_back(i);
}
}
+ for (auto* filter : iter->second) {
+ filter->insert_batch(column, indexs);
+ }
}
}
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9cf559588cc..49c1a459b70 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -101,7 +101,7 @@ protected:
bool _has_set_need_null_map_for_build = false;
bool _build_side_ignore_null = false;
size_t _build_rf_cardinality = 0;
- std::unordered_map<const vectorized::Block*, std::vector<int>>
_inserted_rows;
+ std::unordered_set<const vectorized::Block*> _inserted_blocks;
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
RuntimeProfile::Counter* _build_table_timer;
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index ef5a61eae17..c0d964fd66c 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -86,10 +86,10 @@ struct ProcessRuntimeFilterBuild {
RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table->size(),
parent->_build_rf_cardinality));
- if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_rows.empty()) {
+ if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_blocks.empty()) {
{
SCOPED_TIMER(parent->_push_compute_timer);
- parent->_runtime_filter_slots->insert(parent->_inserted_rows);
+
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
}
}
{
@@ -117,54 +117,25 @@ struct ProcessHashTableBuild {
template <bool ignore_null, bool short_circuit_for_null>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map,
bool* has_null_key) {
- using KeyGetter = typename HashTableContext::State;
-
- Defer defer {[&]() {
- int64_t bucket_size =
hash_table_ctx.hash_table->get_buffer_size_in_cells();
- int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
- int64_t bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
- COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
- COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
- COUNTER_SET(_parent->_build_collisions_counter,
- hash_table_ctx.hash_table->get_collisions());
- COUNTER_SET(_parent->_build_buckets_fill_counter,
filled_bucket_size);
-
- std::string hash_table_buckets_info;
-
- hash_table_buckets_info +=
-
std::to_string(hash_table_ctx.hash_table->get_buffer_size_in_cells()) + ", ";
- _parent->add_hash_buckets_info(hash_table_buckets_info);
-
- hash_table_buckets_info.clear();
- hash_table_buckets_info +=
std::to_string(hash_table_ctx.hash_table->size()) + ", ";
- _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
- }};
-
- KeyGetter key_getter(_build_raw_ptrs);
-
- SCOPED_TIMER(_parent->_build_table_insert_timer);
- hash_table_ctx.hash_table->reset_resize_timer();
-
- vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
- bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
- if (has_runtime_filter) {
- inserted_rows.reserve(_batch_size);
+ if (short_circuit_for_null || ignore_null) {
+ for (int i = 0; i < _rows; i++) {
+ if ((*null_map)[i]) {
+ *has_null_key = true;
+ }
+ }
+ if (short_circuit_for_null && *has_null_key) {
+ return Status::OK();
+ }
}
+ if (!_parent->runtime_filter_descs().empty()) {
+ _parent->_inserted_blocks.insert(&_acquired_block);
+ }
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() :
nullptr);
-
- auto& arena = *_parent->arena();
- auto old_build_arena_memory = arena.size();
+ SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->build(hash_table_ctx.keys,
hash_table_ctx.hash_values.data(),
_rows);
- _parent->_build_rf_cardinality += inserted_rows.size();
-
- _parent->_build_arena_memory_usage->add(arena.size() -
old_build_arena_memory);
-
- COUNTER_UPDATE(_parent->_build_table_expanse_timer,
- hash_table_ctx.hash_table->get_resize_timer_value());
-
return Status::OK();
}
@@ -471,7 +442,7 @@ private:
friend struct ProcessRuntimeFilterBuild;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
- std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+ std::unordered_set<const Block*> _inserted_blocks;
std::vector<IRuntimeFilter*> _runtime_filters;
size_t _build_rf_cardinality = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]