This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push:
new f82f9f10748 some fix (#26761)
f82f9f10748 is described below
commit f82f9f107489d93bf47a45cca5f141e64c34e439
Author: Pxl <[email protected]>
AuthorDate: Sat Nov 18 23:08:53 2023 +0800
some fix (#26761)
---
be/src/exprs/bloom_filter_func.h | 4 +-
be/src/exprs/minmax_predicate.h | 56 ++++--
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 2 +
be/src/pipeline/exec/hashjoin_probe_operator.h | 3 +-
be/src/vec/common/hash_table/hash_map.h | 197 +++++++++++++++------
be/src/vec/common/hash_table/hash_map_context.h | 15 +-
be/src/vec/core/block.cpp | 15 +-
be/src/vec/exec/join/process_hash_table_probe.h | 7 +-
.../vec/exec/join/process_hash_table_probe_impl.h | 85 ++++-----
be/src/vec/exec/join/vhash_join_node.cpp | 33 +++-
be/src/vec/exec/join/vhash_join_node.h | 20 +--
11 files changed, 283 insertions(+), 154 deletions(-)
diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 3f1e25efdc8..1ac2367d1c9 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -59,7 +59,7 @@ public:
// test_element/find_element only used on vectorized engine
template <typename T>
bool test_element(T element) const {
- if constexpr (std::is_same_v<T, Slice>) {
+ if constexpr (std::is_same_v<T, StringRef>) {
return _bloom_filter->find(element);
} else {
return _bloom_filter->find(HashUtil::fixed_len_to_uint32(element));
@@ -68,7 +68,7 @@ public:
template <typename T>
void add_element(T element) {
- if constexpr (std::is_same_v<T, Slice>) {
+ if constexpr (std::is_same_v<T, StringRef>) {
_bloom_filter->insert(element);
} else {
_bloom_filter->insert(HashUtil::fixed_len_to_uint32(element));
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index efc4ebf8630..fcf2ef44a19 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -17,10 +17,14 @@
#pragma once
+#include <type_traits>
+
#include "common/object_pool.h"
#include "runtime/type_limit.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/assert_cast.h"
namespace doris {
// only used in Runtime Filter
@@ -75,25 +79,51 @@ public:
assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
- const T* data = (T*)col.get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
- if (!nullmap[i]) {
- if constexpr (NeedMin) {
- _min = std::min(_min, *(data + i));
+ if constexpr (std::is_same_v<T, StringRef>) {
+ const auto& column_string = assert_cast<const
vectorized::ColumnString&>(col);
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min,
column_string.get_data_at(i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max,
column_string.get_data_at(i));
+ }
}
- if constexpr (NeedMax) {
- _max = std::max(_max, *(data + i));
+ }
+ } else {
+ const T* data = (T*)col.get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min, *(data + i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max, *(data + i));
+ }
}
}
}
} else {
- const T* data = (T*)column->get_raw_data().data;
- for (size_t i = start; i < column->size(); i++) {
- if constexpr (NeedMin) {
- _min = std::min(_min, *(data + i));
+ if constexpr (std::is_same_v<T, StringRef>) {
+ const auto& column_string = assert_cast<const
vectorized::ColumnString&>(*column);
+ for (size_t i = start; i < column->size(); i++) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min, column_string.get_data_at(i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max, column_string.get_data_at(i));
+ }
}
- if constexpr (NeedMax) {
- _max = std::max(_max, *(data + i));
+ } else {
+ const T* data = (T*)column->get_raw_data().data;
+ for (size_t i = start; i < column->size(); i++) {
+ if constexpr (NeedMin) {
+ _min = std::min(_min, *(data + i));
+ }
+ if constexpr (NeedMax) {
+ _max = std::max(_max, *(data + i));
+ }
}
}
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index b459d8eddd7..e9873129815 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -89,7 +89,9 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
void HashJoinProbeLocalState::prepare_for_next() {
_probe_index = 0;
+ _build_index = 0;
_ready_probe = false;
+ _last_probe_match = -1;
_prepare_probe_block();
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 31a3bd38b93..c045980881f 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -103,6 +103,7 @@ private:
bool _ready_probe = false;
bool _probe_eos = false;
std::atomic<bool> _probe_inited = false;
+ int _last_probe_match;
vectorized::Block _probe_block;
vectorized::ColumnRawPtrs _probe_columns;
@@ -116,8 +117,6 @@ private:
bool _need_null_map_for_probe = false;
bool _has_set_need_null_map_for_probe = false;
vectorized::ColumnUInt8::MutablePtr _null_map_column;
- // for cases when a probe row matches more than batch size build rows.
- bool _is_any_probe_match_row_output = false;
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
std::make_unique<HashTableCtxVariants>();
diff --git a/be/src/vec/common/hash_table/hash_map.h
b/be/src/vec/common/hash_table/hash_map.h
index 1ccf34c921c..2f81fc27978 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -25,9 +25,11 @@
#include <span>
#include "common/compiler_util.h"
+#include "vec/columns/column_filter_helper.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_table.h"
#include "vec/common/hash_table/hash_table_allocator.h"
+
/** NOTE HashMap could only be used for memmoveable (position independent)
types.
* Example: std::string is not position independent in libstdc++ with C++11
ABI or in libc++.
* Also, key in hash table must be of type, that zero bytes is compared
equals to zero key.
@@ -227,20 +229,20 @@ public:
void prepare_build(size_t num_elem, int batch_size) {
max_batch_size = batch_size;
bucket_size = calc_bucket_size(num_elem + 1);
- first.resize(bucket_size, 0);
+ first.resize(bucket_size + 1);
next.resize(num_elem);
if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
- visited.resize(num_elem, 0);
+ visited.resize(num_elem);
}
}
uint32_t get_bucket_size() const { return bucket_size; }
- size_t size() const { return next.size(); }
+ size_t size() const { return Base::size() == 0 ? next.size() :
Base::size(); }
std::vector<uint8_t>& get_visited() { return visited; }
@@ -252,32 +254,40 @@ public:
next[i] = first[bucket_num];
first[bucket_num] = i;
}
+ first[bucket_size] = 0; // index = bucket_num means null
}
- template <int JoinOpType, bool with_other_conjuncts>
+ template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join,
bool need_judge_null>
auto find_batch(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums,
int probe_idx, uint32_t build_idx, int probe_rows,
- uint32_t* __restrict probe_idxs, uint32_t* __restrict
build_idxs) {
+ uint32_t* __restrict probe_idxs, uint32_t* __restrict
build_idxs,
+ doris::vectorized::ColumnFilterHelper* mark_column) {
+ if constexpr (is_mark_join) {
+ return _find_batch_mark<JoinOpType>(keys, bucket_nums, probe_idx,
probe_rows,
+ probe_idxs, build_idxs,
mark_column);
+ }
+
+ if constexpr (with_other_conjuncts) {
+ return _find_batch_conjunct<JoinOpType>(keys, bucket_nums,
probe_idx, build_idx,
+ probe_rows, probe_idxs,
build_idxs);
+ }
+
if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
- return _find_batch_inner_outer_join<JoinOpType,
with_other_conjuncts>(
- keys, bucket_nums, probe_idx, build_idx, probe_rows,
probe_idxs, build_idxs);
+ return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums,
probe_idx, build_idx,
+ probe_rows,
probe_idxs, build_idxs);
}
if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
- return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums,
probe_idx, probe_rows,
- probe_idxs);
+ JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
+ JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)
{
+ return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
+ keys, bucket_nums, probe_idx, probe_rows, probe_idxs);
}
if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
- if constexpr (!with_other_conjuncts) {
- return _find_batch_right_semi_anti(keys, bucket_nums,
probe_idx, probe_rows);
- } else {
- return _find_batch_right_semi_anti_conjunct(keys, bucket_nums,
probe_idx, build_idx,
- probe_rows,
probe_idxs, build_idxs);
- }
+ return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx,
probe_rows);
}
return std::tuple {0, 0U, 0};
}
@@ -305,10 +315,41 @@ public:
}
private:
+ // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN
support mark join
+ template <int JoinOpType>
+ auto _find_batch_mark(const Key* __restrict keys, const uint32_t*
__restrict bucket_nums,
+ int probe_idx, int probe_rows, uint32_t* __restrict
probe_idxs,
+ uint32_t* __restrict build_idxs,
+ doris::vectorized::ColumnFilterHelper* mark_column) {
+ auto matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ auto build_idx = first[bucket_nums[probe_idx]];
+
+ while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+ build_idx = next[build_idx];
+ }
+
+ if (bucket_nums[probe_idx] == bucket_size) {
+ // mark result as null when probe row is null
+ mark_column->insert_null();
+ } else {
+ bool matched = JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ?
build_idx != 0
+ :
build_idx == 0;
+ mark_column->insert_value(matched);
+ }
+
+ probe_idxs[matched_cnt] = probe_idx++;
+ build_idxs[matched_cnt] = build_idx;
+ matched_cnt++;
+ }
+ return std::tuple {probe_idx, 0U, matched_cnt};
+ }
+
auto _find_batch_right_semi_anti(const Key* __restrict keys,
const uint32_t* __restrict bucket_nums,
int probe_idx,
int probe_rows) {
- auto matched_cnt = 0;
while (probe_idx < probe_rows) {
auto build_idx = first[bucket_nums[probe_idx]];
@@ -320,28 +361,95 @@ private:
}
probe_idx++;
}
+ return std::tuple {probe_idx, 0U, 0};
+ }
+
+ template <int JoinOpType, bool need_judge_null>
+ auto _find_batch_left_semi_anti(const Key* __restrict keys,
+ const uint32_t* __restrict bucket_nums,
int probe_idx,
+ int probe_rows, uint32_t* __restrict
probe_idxs) {
+ auto matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ if constexpr (need_judge_null) {
+ if (bucket_nums[probe_idx] == bucket_size) {
+ probe_idx++;
+ continue;
+ }
+ }
+
+ auto build_idx = first[bucket_nums[probe_idx]];
+
+ while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+ build_idx = next[build_idx];
+ }
+ bool matched =
+ JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx
!= 0 : build_idx == 0;
+ probe_idxs[matched_cnt] = probe_idx++;
+ matched_cnt += matched;
+ }
return std::tuple {probe_idx, 0U, matched_cnt};
}
- auto _find_batch_right_semi_anti_conjunct(const Key* __restrict keys,
- const uint32_t* __restrict
bucket_nums, int probe_idx,
- uint32_t build_idx, int
probe_rows,
- uint32_t* __restrict probe_idxs,
- uint32_t* __restrict build_idxs)
{
+ auto _find_batch_left_semi_anti_conjunct(const Key* __restrict keys,
+ const uint32_t* __restrict
bucket_nums, int probe_idx,
+ int probe_rows, uint32_t*
__restrict probe_idxs,
+ uint32_t* __restrict build_idxs) {
+ auto matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ auto build_idx = first[bucket_nums[probe_idx]];
+
+ while (build_idx) {
+ if (keys[probe_idx] == build_keys[build_idx]) {
+ probe_idxs[matched_cnt] = probe_idx;
+ build_idxs[matched_cnt] = build_idx;
+ matched_cnt++;
+ }
+ build_idx = next[build_idx];
+ }
+ probe_idx++;
+ }
+ return std::tuple {probe_idx, 0U, matched_cnt};
+ }
+
+ template <int JoinOpType>
+ auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t*
__restrict bucket_nums,
+ int probe_idx, uint32_t build_idx, int
probe_rows,
+ uint32_t* __restrict probe_idxs, uint32_t*
__restrict build_idxs) {
auto matched_cnt = 0;
const auto batch_size = max_batch_size;
auto do_the_probe = [&]() {
auto matched_cnt_old = matched_cnt;
while (build_idx && matched_cnt < batch_size) {
- if (!visited[build_idx] && keys[probe_idx] ==
build_keys[build_idx]) {
+ if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+ if (!visited[build_idx] && keys[probe_idx] ==
build_keys[build_idx]) {
+ build_idxs[matched_cnt++] = build_idx;
+ }
+ } else {
build_idxs[matched_cnt++] = build_idx;
+ matched_cnt += keys[probe_idx] == build_keys[build_idx];
}
build_idx = next[build_idx];
}
+
for (auto i = matched_cnt_old; i < matched_cnt; i++) {
probe_idxs[i] = probe_idx;
}
+
+ if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+ if (!build_idx) {
+ probe_idxs[matched_cnt] = probe_idx;
+ build_idxs[matched_cnt] = 0;
+ matched_cnt++;
+ }
+ }
+
probe_idx++;
};
@@ -354,32 +462,13 @@ private:
do_the_probe();
}
- probe_idx -= (matched_cnt == batch_size && build_idx);
+ probe_idx -=
+ (matched_cnt >= batch_size &&
+ build_idx); // FULL_OUTER_JOIN may over batch_size when
emplace 0 into build_idxs
return std::tuple {probe_idx, build_idx, matched_cnt};
}
template <int JoinOpType>
- auto _find_batch_left_semi_anti(const Key* __restrict keys,
- const uint32_t* __restrict bucket_nums,
int probe_idx,
- int probe_rows, uint32_t* __restrict
probe_idxs) {
- auto matched_cnt = 0;
- const auto batch_size = max_batch_size;
-
- while (probe_idx < probe_rows && matched_cnt < batch_size) {
- auto build_idx = first[bucket_nums[probe_idx]];
-
- while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
- build_idx = next[build_idx];
- }
- bool matched =
- JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx
!= 0 : build_idx == 0;
- probe_idxs[matched_cnt] = probe_idx++;
- matched_cnt += matched;
- }
- return std::tuple {probe_idx, 0U, matched_cnt};
- }
-
- template <int JoinOpType, bool with_other_conjuncts>
auto _find_batch_inner_outer_join(const Key* __restrict keys,
const uint32_t* __restrict bucket_nums,
int probe_idx,
uint32_t build_idx, int probe_rows,
@@ -393,14 +482,13 @@ private:
if (keys[probe_idx] == build_keys[build_idx]) {
probe_idxs[matched_cnt] = probe_idx;
build_idxs[matched_cnt] = build_idx;
- if constexpr (!with_other_conjuncts &&
- (JoinOpType ==
doris::TJoinOp::RIGHT_OUTER_JOIN ||
- JoinOpType ==
doris::TJoinOp::FULL_OUTER_JOIN)) {
+ matched_cnt++;
+ if constexpr (JoinOpType ==
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+ JoinOpType ==
doris::TJoinOp::FULL_OUTER_JOIN) {
if (!visited[build_idx]) {
visited[build_idx] = 1;
}
}
- matched_cnt++;
}
build_idx = next[build_idx];
}
@@ -422,8 +510,7 @@ private:
}
while (probe_idx < probe_rows && matched_cnt < batch_size) {
- uint32_t bucket_num = bucket_nums[probe_idx];
- build_idx = first[bucket_num];
+ build_idx = first[bucket_nums[probe_idx]];
do_the_probe();
}
@@ -434,11 +521,11 @@ private:
const Key* __restrict build_keys;
std::vector<uint8_t> visited;
- uint32_t bucket_size = 0;
- int max_batch_size = 0;
+ uint32_t bucket_size = 1;
+ int max_batch_size = 4064;
- std::vector<uint32_t> first;
- std::vector<uint32_t> next;
+ std::vector<uint32_t> first = {0};
+ std::vector<uint32_t> next = {0};
// use in iter hash map
mutable uint32_t iter_idx = 1;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index 64216986365..ad25e1cd83a 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -85,11 +85,8 @@ struct MethodBase {
return;
}
for (uint32_t k = 0; k < num_rows; ++k) {
- if (null_map[k]) {
- continue;
- }
-
- bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+ bucket_nums[k] =
+ null_map[k] ? bucket_size : hash_table->hash(keys[k]) &
(bucket_size - 1);
}
}
@@ -306,9 +303,9 @@ struct MethodOneNumber : public MethodBase<TData> {
Base::keys = (FieldType*)(key_columns[0]->is_nullable()
? assert_cast<const
ColumnNullable*>(key_columns[0])
->get_nested_column_ptr()
- : key_columns[0])
- ->get_raw_data()
- .data;
+ ->get_raw_data()
+ .data
+ :
key_columns[0]->get_raw_data().data);
std::string name = key_columns[0]->get_name();
if (is_join) {
Base::init_join_bucket_num(num_rows, bucket_size, null_map);
@@ -352,6 +349,8 @@ struct MethodKeysFixed : public MethodBase<TData> {
void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns,
const ColumnRawPtrs& nullmap_columns, std::vector<T>&
result) {
size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
+ // set size to 0 at first, then use resize to call default constructor
on index included from [0, row_numbers) to reset all memory
+ result.clear();
result.resize(row_numbers);
size_t offset = 0;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8492044215f..8fcdde220a3 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -330,11 +330,14 @@ size_t Block::get_position_by_name(const std::string&
name) const {
void Block::check_number_of_rows(bool allow_null_columns) const {
ssize_t rows = -1;
for (const auto& elem : data) {
- if (!elem.column && allow_null_columns) continue;
+ if (!elem.column && allow_null_columns) {
+ continue;
+ }
if (!elem.column) {
- LOG(FATAL) << fmt::format(
- "Column {} in block is nullptr, in method
check_number_of_rows.", elem.name);
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Column {} in block is nullptr, in method
check_number_of_rows.",
+ elem.name);
}
ssize_t size = elem.column->size();
@@ -342,8 +345,8 @@ void Block::check_number_of_rows(bool allow_null_columns)
const {
if (rows == -1) {
rows = size;
} else if (rows != size) {
- LOG(FATAL) << fmt::format("Sizes of columns doesn't match:
{}:{},{}:{}, col size: {}",
- data.front().name, rows, elem.name,
size, each_col_size());
+ throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns
doesn't match, block={}",
+ dump_structure());
}
}
}
@@ -1065,7 +1068,7 @@ std::unique_ptr<Block>
Block::create_same_struct_block(size_t size, bool is_rese
if (is_reserve) {
column->reserve(size);
} else {
- column->resize(size);
+ column->insert_many_defaults(size);
}
temp_block->insert({std::move(column), d.type, d.name});
}
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index 34b5dc3ee8d..ed7d0c6443b 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -100,9 +100,6 @@ struct ProcessHashTableProbe {
std::unique_ptr<Arena> _serialize_key_arena;
std::vector<char> _probe_side_find_result;
- int _right_col_idx;
- int _right_col_len;
-
bool _have_other_join_conjunct;
bool _is_right_semi_anti;
std::vector<bool>* _left_output_slot_flags;
@@ -114,7 +111,9 @@ struct ProcessHashTableProbe {
RuntimeProfile::Counter* _build_side_output_timer;
RuntimeProfile::Counter* _probe_side_output_timer;
RuntimeProfile::Counter* _probe_process_hashtable_timer;
- static constexpr int PROBE_SIDE_EXPLODE_RATE = 1;
+
+ int _right_col_idx;
+ int _right_col_len;
};
} // namespace vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index e3fadf2056f..8cb5bd8cb8f 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -52,7 +52,11 @@ ProcessHashTableProbe<JoinOpType,
Parent>::ProcessHashTableProbe(Parent* parent,
_search_hashtable_timer(parent->_search_hashtable_timer),
_build_side_output_timer(parent->_build_side_output_timer),
_probe_side_output_timer(parent->_probe_side_output_timer),
-
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {}
+
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer),
+ _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
+ ? 0
+ : _parent->left_table_data_types().size()),
+ _right_col_len(_parent->right_table_data_types().size()) {}
template <int JoinOpType, typename Parent>
void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
@@ -122,13 +126,9 @@ template <typename HashTableType>
typename HashTableType::State ProcessHashTableProbe<JoinOpType,
Parent>::_init_probe_side(
HashTableType& hash_table_ctx, size_t probe_rows, bool
with_other_join_conjuncts,
const uint8_t* null_map) {
- _right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts
- ? 0
- : _parent->left_table_data_types().size();
- _right_col_len = _parent->right_table_data_types().size();
-
- _probe_indexs.resize(_batch_size);
- _build_indexs.resize(_batch_size);
+ // may over batch size 1 for some outer join case
+ _probe_indexs.resize(_batch_size + 1);
+ _build_indexs.resize(_batch_size + 1);
if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
@@ -147,6 +147,10 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
MutableBlock&
mutable_block,
Block*
output_block,
size_t
probe_rows) {
+ if (_right_col_len && !_build_block) {
+ return Status::InternalError("build block is nullptr");
+ }
+
auto& probe_index = _parent->_probe_index;
auto& build_index = _parent->_build_index;
auto last_probe_index = probe_index;
@@ -170,10 +174,13 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
{
SCOPED_TIMER(_search_hashtable_timer);
- auto [new_probe_idx, new_build_idx, new_current_offset] =
- hash_table_ctx.hash_table->template find_batch<JoinOpType,
with_other_conjuncts>(
- hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
- build_index, probe_rows, _probe_indexs.data(),
_build_indexs.data());
+ auto [new_probe_idx, new_build_idx,
+ new_current_offset] = hash_table_ctx.hash_table->template
find_batch < JoinOpType,
+ with_other_conjuncts, is_mark_join,
+ need_null_map_for_probe &&
+ ignore_null > (hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(),
+ probe_index, build_index, probe_rows,
_probe_indexs.data(),
+ _build_indexs.data(), mark_column.get());
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
@@ -236,36 +243,28 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
auto null_map_column = ColumnVector<UInt8>::create(row_count, 0);
auto* __restrict null_map_data = null_map_column->get_data().data();
-
// process equal-conjuncts-matched tuples that are newly generated
// in this run if there are any.
for (int i = 0; i < row_count; ++i) {
- auto join_hit = _build_indexs[i];
- auto other_hit = filter_column_ptr[i];
-
- if (!other_hit) {
- for (size_t j = 0; j < _right_col_len; ++j) {
- typeid_cast<ColumnNullable*>(
- std::move(*output_block->get_by_position(j +
_right_col_idx).column)
- .assume_mutable()
- .get())
- ->get_null_map_data()[i] = true;
- }
- }
+ bool join_hit = _build_indexs[i];
+ bool other_hit = filter_column_ptr[i];
+
null_map_data[i] = !join_hit || !other_hit;
- if (join_hit) {
- filter_map[i] = other_hit;
+ if (!join_hit) {
+ filter_map[i] = _parent->_last_probe_match != _probe_indexs[i];
} else {
- filter_map[i] = true;
+ filter_map[i] = other_hit;
+ }
+ if (filter_map[i]) {
+ _parent->_last_probe_match = _probe_indexs[i];
}
}
for (size_t i = 0; i < row_count; ++i) {
if (filter_map[i]) {
_tuple_is_null_right_flags->emplace_back(null_map_data[i]);
- if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
- JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
+ if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
visited[_build_indexs[i]] = 1;
}
}
@@ -275,9 +274,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
auto new_filter_column = ColumnVector<UInt8>::create(row_count);
auto& filter_map = new_filter_column->get_data();
- size_t start_row_idx = 1;
- filter_map.emplace_back(filter_column_ptr[0]);
- for (size_t i = start_row_idx; i < row_count; ++i) {
+ for (size_t i = 0; i < row_count; ++i) {
filter_map[i] = filter_column_ptr[i];
}
@@ -309,16 +306,8 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
// equal-conjuncts-matched tuple is output in all sub blocks,
// if there are none, just pick a tuple and output.
- size_t start_row_idx = 1;
- // Both equal conjuncts and other conjuncts are true
- filter_map[0] = filter_column_ptr[0] && _build_indexs[0];
-
- for (size_t i = start_row_idx; i < row_count; ++i) {
- if (_build_indexs[i] && filter_column_ptr[i]) {
- filter_map[i] = _build_indexs[i] && filter_column_ptr[i];
- } else {
- filter_map[i] = false;
- }
+ for (size_t i = 0; i < row_count; ++i) {
+ filter_map[i] = _build_indexs[i] && filter_column_ptr[i];
}
if (is_mark_join) {
@@ -355,9 +344,8 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
orig_columns = _right_col_idx;
}
- static_cast<void>(
- Block::filter_block(output_block, result_column_id,
- is_mark_join ? output_block->columns() :
orig_columns));
+ RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id,
+ is_mark_join ?
output_block->columns() : orig_columns));
}
return Status::OK();
@@ -374,6 +362,11 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::process_data_in_hashtable(
auto block_size = _build_indexs.size();
if (block_size) {
+ if (mcol.size() < _right_col_len + _right_col_idx) {
+ return Status::InternalError(
+ "output block invalid, mcol.size()={}, _right_col_len={},
_right_col_idx={}",
+ mcol.size(), _right_col_len, _right_col_idx);
+ }
for (size_t j = 0; j < _right_col_len; ++j) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from_join(column,
_build_indexs.data(),
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 2b7447ee56b..6ba5e6e08d4 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -26,6 +26,7 @@
#include <algorithm>
#include <array>
#include <boost/iterator/iterator_facade.hpp>
+#include <climits>
#include <functional>
#include <map>
#include <memory>
@@ -59,6 +60,7 @@
#include "vec/common/assert_cast.h"
#include "vec/common/hash_table/hash_map.h"
#include "vec/common/uint128.h"
+#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
@@ -74,6 +76,8 @@
namespace doris::vectorized {
+constexpr uint32_t JOIN_BUILD_SIZE_LIMIT =
std::numeric_limits<uint32_t>::max();
+
template Status HashJoinNode::_extract_join_column<true>(
Block&, COW<IColumn>::mutable_ptr<ColumnVector<unsigned char>>&,
std::vector<IColumn const*, std::allocator<IColumn const*>>&,
@@ -91,7 +95,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode&
tnode, const Descr
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
?
tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}),
- _build_block_idx(0),
_build_side_mem_used(0),
_build_side_last_mem_used(0) {
_runtime_filter_descs = tnode.runtime_filters;
@@ -297,7 +300,9 @@ bool HashJoinNode::need_more_input_data() const {
void HashJoinNode::prepare_for_next() {
_probe_index = 0;
+ _build_index = 0;
_ready_probe = false;
+ _last_probe_match = -1;
_prepare_probe_block();
}
@@ -453,7 +458,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state,
vectorized::Block* output_
if (!st) {
return st;
}
- RETURN_IF_ERROR(_filter_data_and_build_output(state, output_block, eos,
&temp_block));
+ RETURN_IF_ERROR(_filter_data_and_build_output(state, output_block, eos,
&temp_block, false));
// Here make _join_block release the columns' ptr
_join_block.set_columns(_join_block.clone_empty_columns());
mutable_join_block.clear();
@@ -726,18 +731,22 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
// data from probe side.
_build_side_mem_used += in_block->allocated_bytes();
+ if (_build_side_mutable_block.empty()) {
+ auto tmp_build_block =
+
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
+ _build_side_mutable_block =
MutableBlock::build_mutable_block(&tmp_build_block);
+ RETURN_IF_ERROR(_build_side_mutable_block.merge(
+ *(tmp_build_block.create_same_struct_block(1, false))));
+ }
+
if (in_block->rows() != 0) {
SCOPED_TIMER(_build_side_merge_block_timer);
- if (_build_side_mutable_block.empty()) {
- RETURN_IF_ERROR(_build_side_mutable_block.merge(
- *(in_block->create_same_struct_block(1, false))));
- }
RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
- if (_build_side_mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
+ if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
return Status::NotSupported(
"Hash join do not support build table rows"
" over:" +
- std::to_string(std::numeric_limits<uint32_t>::max()));
+ std::to_string(JOIN_BUILD_SIZE_LIMIT));
}
}
}
@@ -947,6 +956,14 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block) {
RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs,
*_build_expr_call_timer, res_col_ids));
if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
+ // first row is mocked
+ for (int i = 0; i < block.columns(); i++) {
+ assert_cast<ColumnNullable*>(
+
(*std::move(block.safe_get_by_position(i).column)).mutate().get())
+ ->get_null_map_column()
+ .get_data()
+ .data()[0] = 1;
+ }
}
// TODO: Now we are not sure whether a column is nullable only by
ExecNode's `row_desc`
// so we have to initialize this flag by the first build block.
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index a58241234d8..922c41f38fd 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -117,7 +117,8 @@ struct ProcessHashTableBuild {
template <int JoinOpType, bool ignore_null, bool short_circuit_for_null>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map,
bool* has_null_key) {
if (short_circuit_for_null || ignore_null) {
- for (uint32_t i = 0; i < _rows; i++) {
+ // first row is mocked and is null
+ for (uint32_t i = 1; i < _rows; i++) {
if ((*null_map)[i]) {
*has_null_key = true;
}
@@ -296,19 +297,20 @@ private:
friend struct ProcessHashTableProbe;
void _init_short_circuit_for_probe() {
+ bool empty_block = !_build_block;
_short_circuit_for_probe =
(_has_null_in_build_side && _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join) ||
- (!_build_block && _join_op == TJoinOp::INNER_JOIN &&
!_is_mark_join) ||
- (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN &&
!_is_mark_join) ||
- (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
- (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
- (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
+ (empty_block && _join_op == TJoinOp::INNER_JOIN &&
!_is_mark_join) ||
+ (empty_block && _join_op == TJoinOp::LEFT_SEMI_JOIN &&
!_is_mark_join) ||
+ (empty_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+ (empty_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+ (empty_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_empty_right_table_need_probe_dispose =
- (!_build_block && !_have_other_join_conjunct &&
!_is_mark_join) &&
+ (empty_block && !_have_other_join_conjunct && !_is_mark_join)
&&
(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN);
}
@@ -381,6 +383,7 @@ private:
uint32_t _build_index = 0;
bool _ready_probe = false;
bool _probe_eos = false;
+ int _last_probe_match;
bool _build_side_ignore_null = false;
@@ -393,9 +396,6 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
- // for cases when a probe row matches more than batch size build rows.
- bool _is_any_probe_match_row_output = false;
- uint8_t _build_block_idx = 0;
int64_t _build_side_mem_used = 0;
int64_t _build_side_last_mem_used = 0;
MutableBlock _build_side_mutable_block;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]