This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f3ff86ce0d9 [Refactor](join) some refactor of join process (#44346)
f3ff86ce0d9 is described below
commit f3ff86ce0d904a5ece8624da26621c608a89d5b6
Author: Pxl <[email protected]>
AuthorDate: Thu Jan 2 17:54:53 2025 +0800
[Refactor](join) some refactor of join process (#44346)
### What problem does this PR solve?
remove need_judge_null variable
---
be/src/pipeline/dependency.h | 1 -
be/src/pipeline/exec/hashjoin_build_sink.cpp | 7 -
be/src/pipeline/exec/hashjoin_build_sink.h | 4 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 27 +--
be/src/pipeline/exec/hashjoin_probe_operator.h | 2 -
.../pipeline/exec/join/process_hash_table_probe.h | 12 +-
.../exec/join/process_hash_table_probe_impl.h | 167 ++++++-------
be/src/vec/common/hash_table/join_hash_table.h | 265 +++++++++++----------
8 files changed, 227 insertions(+), 258 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ecbd49a5647..fd179cdfd0a 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -612,7 +612,6 @@ struct HashJoinSharedState : public JoinSharedState {
size_t build_exprs_size = 0;
std::shared_ptr<vectorized::Block> build_block;
std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
- bool probe_ignore_null = false;
};
struct PartitionedHashJoinSharedState
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6aca4897367..016ea494062 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -613,13 +613,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
if (eos) {
local_state._eos = true;
local_state.init_short_circuit_for_probe();
- // Since the comparison of null values is meaningless, null aware left
anti/semi join should not output null
- // when the build side is not empty.
- if (local_state._shared_state->build_block &&
- (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) {
- local_state._shared_state->probe_ignore_null = true;
- }
local_state._dependency->set_ready_to_read();
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 906fc5f9bd4..91465380d70 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -201,10 +201,12 @@ struct ProcessHashTableBuild {
SCOPED_TIMER(_parent->_build_table_insert_timer);
hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows,
_batch_size,
*has_null_key);
+
// In order to make the null keys equal when using single null eq, all
null keys need to be set to default value.
if (_build_raw_ptrs.size() == 1 && null_map) {
_build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data());
}
+
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() :
nullptr, true, true,
hash_table_ctx.hash_table->get_bucket_size());
@@ -213,7 +215,7 @@ struct ProcessHashTableBuild {
if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
- //null aware join with other conjuncts
+ // null aware join with other conjuncts
keep_null_key = true;
} else if (_parent->_shared_state->is_null_safe_eq_join.size() == 1 &&
_parent->_shared_state->is_null_safe_eq_join[0]) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 37ccd6206f3..1f30a6183a2 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -39,7 +39,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinProbeOperatorX>();
- _shared_state->probe_ignore_null = p._probe_ignore_null;
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state,
_probe_expr_ctxs[i]));
@@ -287,12 +286,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
if (local_state._probe_index < local_state._probe_block.rows()) {
DCHECK(local_state._has_set_need_null_map_for_probe);
std::visit(
- [&](auto&& arg, auto&& process_hashtable_ctx, auto
need_judge_null) {
+ [&](auto&& arg, auto&& process_hashtable_ctx) {
using HashTableProbeType =
std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType,
std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- st = process_hashtable_ctx.template
process<need_judge_null>(
+ st = process_hashtable_ctx.template process(
arg,
local_state._null_map_column
?
&local_state._null_map_column->get_data()
@@ -308,9 +307,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
},
local_state._shared_state->hash_table_variants->method_variant,
- *local_state._process_hashtable_ctx_variants,
-
vectorized::make_bool_variant(local_state._need_null_map_for_probe &&
-
local_state._shared_state->probe_ignore_null));
+ *local_state._process_hashtable_ctx_variants);
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op !=
TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
@@ -493,29 +490,13 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state,
vectorized::Block* inpu
Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::init(tnode,
state));
DCHECK(tnode.__isset.hash_join_node);
- const bool probe_dispose_null =
- _match_all_probe || _build_unique || _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || _join_op ==
TJoinOp::LEFT_ANTI_JOIN ||
- _join_op == TJoinOp::LEFT_SEMI_JOIN;
const std::vector<TEqJoinCondition>& eq_join_conjuncts =
tnode.hash_join_node.eq_join_conjuncts;
- std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
- size_t conjuncts_index = 0;
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left,
ctx));
_probe_expr_ctxs.push_back(ctx);
- bool null_aware = eq_join_conjunct.__isset.opcode &&
- eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL
&&
- (eq_join_conjunct.right.nodes[0].is_nullable ||
- eq_join_conjunct.left.nodes[0].is_nullable);
- probe_not_ignore_null[conjuncts_index] =
- null_aware ||
- (_probe_expr_ctxs.back()->root()->is_nullable() &&
probe_dispose_null);
- conjuncts_index++;
- }
- for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
- _probe_ignore_null |= !probe_not_ignore_null[i];
}
+
if (tnode.hash_join_node.__isset.other_join_conjuncts &&
!tnode.hash_join_node.other_join_conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 1bdb9d13347..55a8835f55b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -92,7 +92,6 @@ private:
bool _ready_probe = false;
bool _probe_eos = false;
int _last_probe_match;
-
// For mark join, last probe index of null mark
int _last_probe_null_mark;
@@ -174,7 +173,6 @@ private:
// probe expr
vectorized::VExprContextSPtrs _probe_expr_ctxs;
- bool _probe_ignore_null = false;
vectorized::DataTypes _right_table_data_types;
vectorized::DataTypes _left_table_data_types;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 91fd82f0644..052ed5875ae 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -22,6 +22,7 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
+#include "vec/common/hash_table/join_hash_table.h"
namespace doris {
namespace vectorized {
@@ -54,7 +55,7 @@ struct ProcessHashTableProbe {
int last_probe_index, bool all_match_one,
bool have_other_join_conjunct);
- template <bool need_judge_null, typename HashTableType>
+ template <typename HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block,
uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct);
@@ -63,9 +64,8 @@ struct ProcessHashTableProbe {
// the output block struct is same with mutable block. we can do more opt
on it and simplify
// the logic of probe
// TODO: opt the visited here to reduce the size of hash table
- template <bool need_judge_null, typename HashTableType, bool
with_other_conjuncts,
- bool is_mark_join>
- Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
+ template <typename HashTableType, bool with_other_conjuncts, bool
is_mark_join>
+ Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
uint32_t probe_rows);
// In the presence of other join conjunct, the process of join become more
complicated.
@@ -76,12 +76,12 @@ struct ProcessHashTableProbe {
bool has_null_in_build_side);
template <bool with_other_conjuncts>
- Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t
hash_table_bucket_size);
+ Status do_mark_join_conjuncts(vectorized::Block* output_block, const
uint8_t* null_map);
template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
bool
with_other_join_conjuncts,
- const uint8_t* null_map,
bool need_judge_null);
+ const uint8_t* null_map);
// Process full outer join/ right join / right semi/anti join to output
the join result
// in hash table
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 24a9a7f6743..f97a4513816 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -27,6 +27,7 @@
#include "util/simd/bits.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
+#include "vec/common/hash_table/join_hash_table.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::pipeline {
@@ -160,14 +161,14 @@ template <int JoinOpType>
template <typename HashTableType>
typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
HashTableType& hash_table_ctx, size_t probe_rows, bool
with_other_join_conjuncts,
- const uint8_t* null_map, bool need_judge_null) {
+ const uint8_t* null_map) {
// may over batch size 1 for some outer join case
_probe_indexs.resize(_batch_size + 1);
_build_indexs.resize(_batch_size + 1);
- if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
+ if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+ with_other_join_conjuncts) {
_null_flags.resize(_batch_size + 1);
- memset(_null_flags.data(), 0, _batch_size + 1);
}
if (!_parent->_ready_probe) {
@@ -177,10 +178,10 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
if (_parent->_probe_columns.size() == 1 && null_map) {
_parent->_probe_columns[0]->assume_mutable()->replace_column_null_data(null_map);
}
+
hash_table_ctx.init_serialized_keys(_parent->_probe_columns,
probe_rows, null_map, true,
false,
hash_table_ctx.hash_table->get_bucket_size());
- hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
- need_judge_null ? null_map :
nullptr);
+ hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums);
int64_t arena_memory_usage =
hash_table_ctx.serialized_keys_size(false);
COUNTER_SET(_parent->_probe_arena_memory_usage, arena_memory_usage);
COUNTER_UPDATE(_parent->_memory_used_counter, arena_memory_usage);
@@ -190,10 +191,9 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
}
template <int JoinOpType>
-template <bool need_judge_null, typename HashTableType, bool
with_other_conjuncts,
- bool is_mark_join>
+template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join>
Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType&
hash_table_ctx,
-
vectorized::ConstNullMapPtr null_map,
+ const uint8_t* null_map,
vectorized::MutableBlock&
mutable_block,
vectorized::Block*
output_block,
uint32_t probe_rows) {
@@ -204,31 +204,22 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
auto& probe_index = _parent->_probe_index;
auto& build_index = _parent->_build_index;
auto last_probe_index = probe_index;
-
{
SCOPED_TIMER(_init_probe_side_timer);
- _init_probe_side<HashTableType>(
- hash_table_ctx, probe_rows, with_other_conjuncts,
- null_map ? null_map->data() : nullptr,
- need_judge_null &&
- (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
- JoinOpType ==
doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- (is_mark_join && JoinOpType !=
doris::TJoinOp::RIGHT_SEMI_JOIN)));
+ _init_probe_side<HashTableType>(hash_table_ctx, probe_rows,
with_other_conjuncts, null_map);
}
auto& mcol = mutable_block.mutable_columns();
- const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty();
uint32_t current_offset = 0;
- if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
- with_other_conjuncts) {
+ if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+ with_other_conjuncts) {
SCOPED_TIMER(_search_hashtable_timer);
/// If `_build_index_for_null_probe_key` is not zero, it means we are
in progress of handling probe null key.
if (_build_index_for_null_probe_key) {
- DCHECK_EQ(build_index,
hash_table_ctx.hash_table->get_bucket_size());
+ DCHECK(null_map && null_map[probe_index]);
current_offset = _process_probe_null_key(probe_index);
if (!_build_index_for_null_probe_key) {
probe_index++;
@@ -239,13 +230,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
build_index, probe_rows, _probe_indexs.data(),
_build_indexs.data(),
- _null_flags.data(), _picking_null_keys);
+ _null_flags.data(), _picking_null_keys, null_map);
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
_picking_null_keys = picking_null_keys;
- if (build_index == hash_table_ctx.hash_table->get_bucket_size()) {
+ if (null_map && null_map[probe_index]) {
_build_index_for_null_probe_key = 1;
if (current_offset == 0) {
current_offset = _process_probe_null_key(probe_index);
@@ -259,11 +250,11 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
} else {
SCOPED_TIMER(_search_hashtable_timer);
auto [new_probe_idx, new_build_idx, new_current_offset] =
- hash_table_ctx.hash_table->template find_batch<JoinOpType,
with_other_conjuncts,
- is_mark_join,
need_judge_null>(
+ hash_table_ctx.hash_table->template find_batch<JoinOpType>(
hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
build_index, cast_set<int32_t>(probe_rows),
_probe_indexs.data(),
- _probe_visited, _build_indexs.data(),
has_mark_join_conjunct);
+ _probe_visited, _build_indexs.data(), null_map,
with_other_conjuncts,
+ is_mark_join, !_parent->_mark_join_conjuncts.empty());
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
@@ -296,8 +287,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
output_block->swap(mutable_block.to_block());
if constexpr (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
- return do_mark_join_conjuncts<with_other_conjuncts>(
- output_block, hash_table_ctx.hash_table->get_bucket_size());
+ bool ignore_null_map =
+ (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+ hash_table_ctx.hash_table
+ ->empty_build_side(); // empty build side will return
false to instead null
+ return do_mark_join_conjuncts<with_other_conjuncts>(output_block,
+ ignore_null_map ?
nullptr : null_map);
} else if constexpr (with_other_conjuncts) {
return do_other_join_conjuncts(output_block,
hash_table_ctx.hash_table->get_visited(),
hash_table_ctx.hash_table->has_null_key());
@@ -366,7 +362,7 @@ uint32_t
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
template <int JoinOpType>
template <bool with_other_conjuncts>
Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block*
output_block,
- size_t
hash_table_bucket_size) {
+ const
uint8_t* null_map) {
DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
@@ -376,7 +372,6 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
constexpr bool is_null_aware_join = JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN ||
JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-
const auto row_count = output_block->rows();
if (!row_count) {
return Status::OK();
@@ -387,49 +382,45 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
auto& mark_column =
assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable);
vectorized::IColumn::Filter& filter =
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data();
+ RETURN_IF_ERROR(
+
vectorized::VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts,
output_block,
+
mark_column.get_null_map_column(), filter));
+ uint8_t* mark_filter_data = filter.data();
+ uint8_t* mark_null_map = mark_column.get_null_map_data().data();
- if (_parent->_mark_join_conjuncts.empty()) {
+ if (is_null_aware_join) {
// For null aware anti/semi join, if the equal conjuncts was not
matched and the build side has null value,
// the result should be null. Like:
// select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true
// select 4 in (2, 3, null) => null, select 4 in (2, 3) => false
- const bool should_be_null_if_build_side_has_null =
*_has_null_in_build_side;
-
- mark_column.resize(row_count);
- auto* filter_data =
assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column())
- .get_data()
- .data();
- auto* mark_null_map = mark_column.get_null_map_data().data();
- int last_probe_matched = -1;
for (size_t i = 0; i != row_count; ++i) {
- filter_data[i] = _build_indexs[i] != 0 && _build_indexs[i] !=
hash_table_bucket_size;
- if constexpr (is_null_aware_join) {
- if constexpr (with_other_conjuncts) {
- mark_null_map[i] = _null_flags[i];
- } else {
- if (filter_data[i]) {
- last_probe_matched = _probe_indexs[i];
- mark_null_map[i] = false;
- } else if (_build_indexs[i] == 0) {
- mark_null_map[i] =
should_be_null_if_build_side_has_null &&
- last_probe_matched !=
_probe_indexs[i];
- } else if (_build_indexs[i] == hash_table_bucket_size) {
- mark_null_map[i] = true;
- }
+ mark_filter_data[i] = _build_indexs[i] != 0;
+ }
+
+ if constexpr (with_other_conjuncts) {
+ // _null_flags is true means build or probe side of the row is null
+ memcpy(mark_null_map, _null_flags.data(), row_count);
+ } else {
+ if (null_map) {
+ // probe side of the row is null, so the mark sign should also
be null.
+ for (size_t i = 0; i != row_count; ++i) {
+ mark_null_map[i] |= null_map[_probe_indexs[i]];
+ }
+ }
+ if (!with_other_conjuncts && *_has_null_in_build_side) {
+ // _has_null_in_build_side will change false to null when row
not matched
+ for (size_t i = 0; i != row_count; ++i) {
+ mark_null_map[i] |= _build_indexs[i] == 0;
}
}
- }
- if constexpr (!is_null_aware_join) {
- memset(mark_null_map, 0, row_count);
}
} else {
- RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts(
- _parent->_mark_join_conjuncts, output_block,
mark_column.get_null_map_column(),
- filter));
+ // for non null aware join, build_indexs is 0 which means there is no
match
+ // sometimes null will be returned in conjunct, but it should not
actually be null.
+ for (size_t i = 0; i != row_count; ++i) {
+ mark_null_map[i] &= _build_indexs[i] != 0;
+ }
}
- auto* mark_null_map = mark_column.get_null_map_data().data();
-
- auto* mark_filter_data = filter.data();
if constexpr (with_other_conjuncts) {
vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
@@ -453,32 +444,20 @@ Status
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
auto* __restrict filter_map = filter_column->get_data().data();
-
- /**
- * Here need `!with_other_conjuncts` be true,
- * because null aware join with other join conjuncts will process the
`mark_null_map` after the
- * other join conjuncts are executed.
- */
- const bool should_be_null_if_build_side_has_null =
- *_has_null_in_build_side && is_null_aware_join &&
!with_other_conjuncts;
for (size_t i = 0; i != row_count; ++i) {
- bool not_matched_before = _parent->_last_probe_match !=
_probe_indexs[i];
+ if (_parent->_last_probe_match == _probe_indexs[i]) {
+ continue;
+ }
if (_build_indexs[i] == 0) {
bool has_null_mark_value = _parent->_last_probe_null_mark ==
_probe_indexs[i];
- if (not_matched_before) {
- filter_map[i] = true;
- mark_null_map[i] = has_null_mark_value ||
should_be_null_if_build_side_has_null;
- mark_filter_data[i] = false;
- }
- } else {
- if (mark_null_map[i]) { // is null
- _parent->_last_probe_null_mark = _probe_indexs[i];
- } else {
- if (mark_filter_data[i] && not_matched_before) {
- _parent->_last_probe_match = _probe_indexs[i];
- filter_map[i] = true;
- }
- }
+ filter_map[i] = true;
+ mark_filter_data[i] = false;
+ mark_null_map[i] |= has_null_mark_value;
+ } else if (mark_null_map[i]) {
+ _parent->_last_probe_null_mark = _probe_indexs[i];
+ } else if (mark_filter_data[i]) {
+ filter_map[i] = true;
+ _parent->_last_probe_match = _probe_indexs[i];
}
}
@@ -677,7 +656,7 @@ Status
ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
}
template <int JoinOpType>
-template <bool need_judge_null, typename HashTableType>
+template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process(HashTableType&
hash_table_ctx,
vectorized::ConstNullMapPtr
null_map,
vectorized::MutableBlock&
mutable_block,
@@ -687,9 +666,9 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
Status res;
std::visit(
[&](auto is_mark_join, auto have_other_join_conjunct) {
- res = do_process<need_judge_null, HashTableType,
have_other_join_conjunct,
- is_mark_join>(hash_table_ctx, null_map,
mutable_block,
- output_block, probe_rows);
+ res = do_process<HashTableType, have_other_join_conjunct,
is_mark_join>(
+ hash_table_ctx, null_map ? null_map->data() : nullptr,
mutable_block,
+ output_block, probe_rows);
},
vectorized::make_bool_variant(is_mark_join),
vectorized::make_bool_variant(have_other_join_conjunct));
@@ -705,11 +684,7 @@ struct ExtractType<T(U)> {
};
#define INSTANTIATION(JoinOpType, T)
\
- template Status ProcessHashTableProbe<JoinOpType>::process<false,
ExtractType<void(T)>::Type>( \
- ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
- vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
- uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
- template Status ProcessHashTableProbe<JoinOpType>::process<true,
ExtractType<void(T)>::Type>( \
+ template Status
ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block *
output_block, \
uint32_t probe_rows, bool is_mark_join, bool
have_other_join_conjunct); \
diff --git a/be/src/vec/common/hash_table/join_hash_table.h
b/be/src/vec/common/hash_table/join_hash_table.h
index 59463f0e3f4..ab501d67698 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -21,6 +21,8 @@
#include <limits>
+#include "common/exception.h"
+#include "common/status.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_table_allocator.h"
@@ -70,6 +72,8 @@ public:
std::vector<uint8_t>& get_visited() { return visited; }
+ bool empty_build_side() const { return _empty_build_side; }
+
void build(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums, size_t num_elem,
bool keep_null_key) {
build_keys = keys;
@@ -81,63 +85,67 @@ public:
if (!keep_null_key) {
first[bucket_size] = 0; // index = bucket_size means null
}
+ _keep_null_key = keep_null_key;
}
- template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join,
bool need_judge_null>
+ template <int JoinOpType>
auto find_batch(const Key* __restrict keys, const uint32_t* __restrict
build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool& probe_visited,
- uint32_t* __restrict build_idxs, bool
has_mark_join_conjunct = false) {
- if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
- if (_empty_build_side) {
- return
_process_null_aware_left_half_join_for_empty_build_side<JoinOpType>(
- probe_idx, probe_rows, probe_idxs, build_idxs);
- }
+ uint32_t* __restrict build_idxs, const uint8_t* null_map,
+ bool with_other_conjuncts, bool is_mark_join, bool
has_mark_join_conjunct) {
+ if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
+ _empty_build_side) {
+ return
_process_null_aware_left_half_join_for_empty_build_side<JoinOpType>(
+ probe_idx, probe_rows, probe_idxs, build_idxs);
}
- if constexpr (with_other_conjuncts ||
- (is_mark_join && JoinOpType !=
TJoinOp::RIGHT_SEMI_JOIN)) {
- if constexpr (!with_other_conjuncts) {
- constexpr bool is_null_aware_join =
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
- constexpr bool is_left_half_join = JoinOpType ==
TJoinOp::LEFT_SEMI_JOIN ||
- JoinOpType ==
TJoinOp::LEFT_ANTI_JOIN;
-
- /// For null aware join or left half(semi/anti) join without
other conjuncts and without
- /// mark join conjunct.
- /// If one row on probe side has one match in build side, we
should stop searching the
- /// hash table for this row.
- if (is_null_aware_join || (is_left_half_join &&
!has_mark_join_conjunct)) {
- return _find_batch_conjunct<JoinOpType, need_judge_null,
true>(
- keys, build_idx_map, probe_idx, build_idx,
probe_rows, probe_idxs,
- build_idxs);
- }
+ if (with_other_conjuncts) {
+ return _find_batch_conjunct<JoinOpType, false>(
+ keys, build_idx_map, probe_idx, build_idx, probe_rows,
probe_idxs, build_idxs);
+ }
+
+ if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
+ bool is_null_aware_join = JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
+ bool is_left_half_join =
+ JoinOpType == TJoinOp::LEFT_SEMI_JOIN || JoinOpType ==
TJoinOp::LEFT_ANTI_JOIN;
+
+ /// For null aware join or left half(semi/anti) join without other
conjuncts and without
+ /// mark join conjunct.
+ /// If one row on probe side has one match in build side, we
should stop searching the
+ /// hash table for this row.
+ if (is_null_aware_join || (is_left_half_join &&
!has_mark_join_conjunct)) {
+ return _find_batch_conjunct<JoinOpType, true>(keys,
build_idx_map, probe_idx,
+ build_idx,
probe_rows, probe_idxs,
+ build_idxs);
}
- return _find_batch_conjunct<JoinOpType, need_judge_null, false>(
+ return _find_batch_conjunct<JoinOpType, false>(
keys, build_idx_map, probe_idx, build_idx, probe_rows,
probe_idxs, build_idxs);
}
- if constexpr (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN ||
- JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
- JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
+ if (JoinOpType == TJoinOp::INNER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN ||
+ JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType ==
TJoinOp::RIGHT_OUTER_JOIN) {
return _find_batch_inner_outer_join<JoinOpType>(keys,
build_idx_map, probe_idx,
build_idx,
probe_rows, probe_idxs,
probe_visited,
build_idxs);
}
- if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
- keys, build_idx_map, probe_idx, probe_rows, probe_idxs);
+ if (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType ==
TJoinOp::LEFT_SEMI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (null_map) {
+ return _find_batch_left_semi_anti<JoinOpType, true>(
+ keys, build_idx_map, probe_idx, probe_rows,
probe_idxs, null_map);
+ } else {
+ return _find_batch_left_semi_anti<JoinOpType, false>(
+ keys, build_idx_map, probe_idx, probe_rows,
probe_idxs, nullptr);
+ }
}
- if constexpr (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN ||
- JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
+ if (JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType ==
TJoinOp::RIGHT_SEMI_JOIN) {
return _find_batch_right_semi_anti(keys, build_idx_map, probe_idx,
probe_rows);
}
- return std::tuple {0, 0U, 0U};
+ throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid hash join
input");
}
/**
@@ -157,67 +165,16 @@ public:
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs,
uint8_t* __restrict null_flags,
- bool picking_null_keys) {
- uint32_t matched_cnt = 0;
- const auto batch_size = max_batch_size;
-
- auto do_the_probe = [&]() {
- /// If no any rows match the probe key, here start to handle null
keys in build side.
- /// The result of "Any = null" is null.
- if (build_idx == 0 && !picking_null_keys) {
- build_idx = first[bucket_size];
- picking_null_keys = true; // now pick null from build side
- }
-
- while (build_idx && matched_cnt < batch_size) {
- if (picking_null_keys || keys[probe_idx] ==
build_keys[build_idx]) {
- build_idxs[matched_cnt] = build_idx;
- probe_idxs[matched_cnt] = probe_idx;
- null_flags[matched_cnt] = picking_null_keys;
- matched_cnt++;
- }
-
- build_idx = next[build_idx];
-
- // If `build_idx` is 0, all matched keys are handled,
- // now need to handle null keys in build side.
- if (!build_idx && !picking_null_keys) {
- build_idx = first[bucket_size];
- picking_null_keys = true; // now pick null keys from build
side
- }
- }
-
- // may over batch_size when emplace 0 into build_idxs
- if (!build_idx) {
- probe_idxs[matched_cnt] = probe_idx;
- build_idxs[matched_cnt] = 0;
- picking_null_keys = false;
- matched_cnt++;
- }
-
- probe_idx++;
- };
-
- if (build_idx) {
- do_the_probe();
- }
-
- while (probe_idx < probe_rows && matched_cnt < batch_size) {
- build_idx = build_idx_map[probe_idx];
-
- /// If the probe key is null
- if (build_idx == bucket_size) {
- probe_idx++;
- break;
- }
- do_the_probe();
- if (picking_null_keys) {
- break;
- }
+ bool picking_null_keys, const
uint8_t* null_map) {
+ if (null_map) {
+ return _find_null_aware_with_other_conjuncts_impl<true>(
+ keys, build_idx_map, probe_idx, build_idx, probe_rows,
probe_idxs, build_idxs,
+ null_flags, picking_null_keys, null_map);
+ } else {
+ return _find_null_aware_with_other_conjuncts_impl<false>(
+ keys, build_idx_map, probe_idx, build_idx, probe_rows,
probe_idxs, build_idxs,
+ null_flags, picking_null_keys, nullptr);
}
-
- probe_idx -= (build_idx != 0);
- return std::tuple {probe_idx, build_idx, matched_cnt,
picking_null_keys};
}
template <int JoinOpType, bool is_mark_join>
@@ -250,15 +207,11 @@ public:
bool has_null_key() { return _has_null_key; }
- void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map)
const {
- if (null_map) {
- for (unsigned int& bucket : buckets) {
- bucket = bucket == bucket_size ? bucket_size : first[bucket];
- }
- } else {
- for (unsigned int& bucket : buckets) {
- bucket = first[bucket];
- }
+ bool keep_null_key() { return _keep_null_key; }
+
+ void pre_build_idxs(std::vector<uint32>& buckets) const {
+ for (unsigned int& bucket : buckets) {
+ bucket = first[bucket];
}
}
@@ -267,8 +220,12 @@ private:
auto _process_null_aware_left_half_join_for_empty_build_side(int
probe_idx, int probe_rows,
uint32_t*
__restrict probe_idxs,
uint32_t*
__restrict build_idxs) {
- static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN);
+ if (JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+ JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+
"process_null_aware_left_half_join_for_empty_build_side meet invalid "
+ "hash join input");
+ }
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
@@ -298,16 +255,17 @@ private:
return std::tuple {probe_idx, 0U, 0U};
}
- template <int JoinOpType, bool need_judge_null>
+ template <int JoinOpType, bool has_null_map>
auto _find_batch_left_semi_anti(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map,
int probe_idx,
- int probe_rows, uint32_t* __restrict
probe_idxs) {
+ int probe_rows, uint32_t* __restrict
probe_idxs,
+ const uint8_t* null_map) {
uint32_t matched_cnt = 0;
const auto batch_size = max_batch_size;
while (probe_idx < probe_rows && matched_cnt < batch_size) {
- if constexpr (need_judge_null) {
- if (build_idx_map[probe_idx] == bucket_size) {
+ if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
has_null_map) {
+ if (null_map[probe_idx]) {
probe_idx++;
continue;
}
@@ -325,7 +283,7 @@ private:
return std::tuple {probe_idx, 0U, matched_cnt};
}
- template <int JoinOpType, bool need_judge_null, bool
only_need_to_match_one>
+ template <int JoinOpType, bool only_need_to_match_one>
auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t*
__restrict build_idx_map,
int probe_idx, uint32_t build_idx, int
probe_rows,
uint32_t* __restrict probe_idxs, uint32_t*
__restrict build_idxs) {
@@ -341,14 +299,6 @@ private:
build_idxs[matched_cnt] = build_idx;
matched_cnt++;
}
- } else if constexpr (need_judge_null) {
- if (build_idx == bucket_size) {
- build_idxs[matched_cnt] = build_idx;
- probe_idxs[matched_cnt] = probe_idx;
- build_idx = 0;
- matched_cnt++;
- break;
- }
}
if (keys[probe_idx] == build_keys[build_idx]) {
@@ -448,6 +398,76 @@ private:
return std::tuple {probe_idx, build_idx, matched_cnt};
}
+ template <bool has_null_map>
+ auto _find_null_aware_with_other_conjuncts_impl(
+ const Key* __restrict keys, const uint32_t* __restrict
build_idx_map, int probe_idx,
+ uint32_t build_idx, int probe_rows, uint32_t* __restrict
probe_idxs,
+ uint32_t* __restrict build_idxs, uint8_t* __restrict null_flags,
bool picking_null_keys,
+ const uint8_t* null_map) {
+ uint32_t matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+
+ auto do_the_probe = [&]() {
+ /// If no any rows match the probe key, here start to handle null
keys in build side.
+ /// The result of "Any = null" is null.
+ if (build_idx == 0 && !picking_null_keys) {
+ build_idx = first[bucket_size];
+ picking_null_keys = true; // now pick null from build side
+ }
+
+ while (build_idx && matched_cnt < batch_size) {
+ if (picking_null_keys || keys[probe_idx] ==
build_keys[build_idx]) {
+ build_idxs[matched_cnt] = build_idx;
+ probe_idxs[matched_cnt] = probe_idx;
+ null_flags[matched_cnt] = picking_null_keys;
+ matched_cnt++;
+ }
+
+ build_idx = next[build_idx];
+
+ // If `build_idx` is 0, all matched keys are handled,
+ // now need to handle null keys in build side.
+ if (!build_idx && !picking_null_keys) {
+ build_idx = first[bucket_size];
+ picking_null_keys = true; // now pick null keys from build
side
+ }
+ }
+
+ // may over batch_size when emplace 0 into build_idxs
+ if (!build_idx) {
+ probe_idxs[matched_cnt] = probe_idx;
+ build_idxs[matched_cnt] = 0;
+ picking_null_keys = false;
+ matched_cnt++;
+ }
+
+ probe_idx++;
+ };
+
+ if (build_idx) {
+ do_the_probe();
+ }
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ build_idx = build_idx_map[probe_idx];
+
+ /// If the probe key is null
+ if constexpr (has_null_map) {
+ if (null_map[probe_idx]) {
+ probe_idx++;
+ break;
+ }
+ }
+ do_the_probe();
+ if (picking_null_keys) {
+ break;
+ }
+ }
+
+ probe_idx -= (build_idx != 0);
+ return std::tuple {probe_idx, build_idx, matched_cnt,
picking_null_keys};
+ }
+
const Key* __restrict build_keys;
std::vector<uint8_t> visited;
@@ -461,6 +481,7 @@ private:
mutable uint32_t iter_idx = 1;
vectorized::Arena* pool;
bool _has_null_key = false;
+ bool _keep_null_key = false;
bool _empty_build_side = true;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]