This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1e03aac6498 [fix](join) incorrect result of mark join (#30543)
1e03aac6498 is described below
commit 1e03aac649856bad6a797c187072eff33c0278cc
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jan 31 17:59:27 2024 +0800
[fix](join) incorrect result of mark join (#30543)
incorrect result of mark join
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 8 -
be/src/pipeline/exec/hashjoin_build_sink.h | 7 -
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 1 -
be/src/pipeline/exec/hashjoin_probe_operator.h | 7 -
be/src/vec/common/hash_table/join_hash_table.h | 101 ++++++------
be/src/vec/exec/join/process_hash_table_probe.h | 19 ++-
.../vec/exec/join/process_hash_table_probe_impl.h | 172 +++++++++++++--------
be/src/vec/exec/join/vhash_join_node.cpp | 8 -
be/src/vec/exec/join/vhash_join_node.h | 16 +-
.../data/nereids_p0/join/test_mark_join.out | 43 ++++++
.../suites/nereids_p0/join/test_mark_join.groovy | 126 +++++++++++++++
11 files changed, 352 insertions(+), 156 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 5b6689e35a5..5c8995bebe9 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -76,12 +76,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_shared_hash_table_dependency->block();
p._shared_hashtable_controller->append_dependency(p.node_id(),
_shared_hash_table_dependency);
- } else {
- if ((p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- p._join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
- p._have_other_join_conjunct) {
- _build_indexes_null = std::make_shared<std::vector<uint32_t>>();
- }
}
_build_blocks_memory_usage =
@@ -496,7 +490,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
state, local_state._shared_state->build_block.get(),
&local_state, use_global_rf));
RETURN_IF_ERROR(
local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
- local_state._shared_state->build_indexes_null =
local_state._build_indexes_null;
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
@@ -542,7 +535,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
_shared_hash_table_context->hash_table_variants));
local_state._shared_state->build_block =
_shared_hash_table_context->block;
- local_state._build_indexes_null =
_shared_hash_table_context->build_indexes_null;
local_state._shared_state->build_indexes_null =
_shared_hash_table_context->build_indexes_null;
const bool use_global_rf =
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 2acc25151ab..efc0a46f3ba 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -120,13 +120,6 @@ protected:
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
std::vector<int> _build_col_ids;
- /*
- * For null aware anti/semi join with other join conjuncts, we do need to
care about the rows in
- * build side with null keys,
- * because the other join conjuncts' result may be changed from null to
false(null & false == false).
- */
- std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;
-
RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index f7a06655b19..a2fb0012ffa 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -301,7 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
- local_state._build_indexes_null =
local_state._shared_state->build_indexes_null;
DCHECK(local_state._has_set_need_null_map_for_probe);
RETURN_IF_CATCH_EXCEPTION({
std::visit(
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 1bdb9864c40..4b7f9271920 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -125,13 +125,6 @@ private:
// For mark join, last probe index of null mark
int _last_probe_null_mark;
- /*
- * For null aware anti/semi join with other join conjuncts, we do need to
care about the rows in
- * build side with null keys,
- * because the other join conjuncts' result may be changed from null to
false(null & false == false).
- */
- std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;
-
vectorized::Block _probe_block;
vectorized::ColumnRawPtrs _probe_columns;
// other expr
diff --git a/be/src/vec/common/hash_table/join_hash_table.h
b/be/src/vec/common/hash_table/join_hash_table.h
index 08311989b5d..85665e76853 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -68,6 +68,7 @@ public:
std::vector<uint8_t>& get_visited() { return visited; }
+ template <int JoinOpType, bool with_other_conjuncts>
void build(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums,
size_t num_elem) {
build_keys = keys;
@@ -76,19 +77,24 @@ public:
next[i] = first[bucket_num];
first[bucket_num] = i;
}
- first[bucket_size] = 0; // index = bucket_num means null
+ if constexpr ((JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+ JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) ||
+ !with_other_conjuncts) {
+ /// Only null aware join with other conjuncts need to access the
null value in hash table
+ first[bucket_size] = 0; // index = bucket_num means null
+ }
}
template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join,
bool need_judge_null>
auto find_batch(const Key* __restrict keys, const uint32_t* __restrict
build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool& probe_visited,
- uint32_t* __restrict build_idxs,
vectorized::ColumnFilterHelper* mark_column) {
+ uint32_t* __restrict build_idxs) {
if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (_empty_build_side) {
return _process_null_aware_left_anti_join_for_empty_build_side<
- JoinOpType, with_other_conjuncts, is_mark_join>(
- probe_idx, probe_rows, probe_idxs, build_idxs,
mark_column);
+ JoinOpType, with_other_conjuncts,
is_mark_join>(probe_idx, probe_rows,
+
probe_idxs, build_idxs);
}
}
@@ -128,51 +134,48 @@ public:
* select 'a' not in ('b', null) => null => 'a' != 'b' and 'a' != null =>
true and null => null
* select 'a' not in ('a', 'b', null) => false
*/
- auto find_null_aware_with_other_conjuncts(
- const Key* __restrict keys, const uint32_t* __restrict
build_idx_map, int probe_idx,
- uint32_t build_idx, int probe_rows, uint32_t* __restrict
probe_idxs,
- uint32_t* __restrict build_idxs, std::set<uint32_t>& null_result,
- const std::vector<uint32_t>& build_indexes_null, const size_t
build_block_count) {
+ auto find_null_aware_with_other_conjuncts(const Key* __restrict keys,
+ const uint32_t* __restrict
build_idx_map,
+ int probe_idx, uint32_t
build_idx, int probe_rows,
+ uint32_t* __restrict probe_idxs,
+ uint32_t* __restrict build_idxs,
+ uint8_t* __restrict null_flags,
+ bool picking_null_keys) {
auto matched_cnt = 0;
const auto batch_size = max_batch_size;
- bool has_matched = false;
auto do_the_probe = [&]() {
+ /// If no any rows match the probe key, here start to handle null
keys in build side.
+ /// The result of "Any = null" is null.
+ if (build_idx == 0 && !picking_null_keys) {
+ build_idx = first[bucket_size];
+ picking_null_keys = true; // now pick null from build side
+ }
+
while (build_idx && matched_cnt < batch_size) {
- if (build_idx == bucket_size) {
- /// All rows in build side should be executed with other
join conjuncts.
- for (size_t i = 1; i != build_block_count; ++i) {
- build_idxs[matched_cnt] = i;
- probe_idxs[matched_cnt] = probe_idx;
- matched_cnt++;
- }
- null_result.emplace(probe_idx);
- build_idx = 0;
- has_matched = true;
- break;
- } else if (keys[probe_idx] == build_keys[build_idx]) {
+ if (picking_null_keys || keys[probe_idx] ==
build_keys[build_idx]) {
build_idxs[matched_cnt] = build_idx;
probe_idxs[matched_cnt] = probe_idx;
+ null_flags[matched_cnt] = picking_null_keys;
matched_cnt++;
- has_matched = true;
}
build_idx = next[build_idx];
+
+ // If `build_idx` is 0, all matched keys are handled,
+ // now need to handle null keys in build side.
+ if (!build_idx && !picking_null_keys) {
+ build_idx = first[bucket_size];
+ picking_null_keys = true; // now pick null keys from build
side
+ }
}
// may over batch_size when emplace 0 into build_idxs
if (!build_idx) {
- if (!has_matched) { // has no any row matched
- for (auto index : build_indexes_null) {
- build_idxs[matched_cnt] = index;
- probe_idxs[matched_cnt] = probe_idx;
- matched_cnt++;
- }
- }
probe_idxs[matched_cnt] = probe_idx;
build_idxs[matched_cnt] = 0;
+ picking_null_keys = false;
matched_cnt++;
- has_matched = false;
}
probe_idx++;
@@ -184,11 +187,20 @@ public:
while (probe_idx < probe_rows && matched_cnt < batch_size) {
build_idx = build_idx_map[probe_idx];
+
+ /// If the probe key is null
+ if (build_idx == bucket_size) {
+ probe_idx++;
+ break;
+ }
do_the_probe();
+ if (picking_null_keys) {
+ break;
+ }
}
probe_idx -= (build_idx != 0);
- return std::tuple {probe_idx, build_idx, matched_cnt};
+ return std::tuple {probe_idx, build_idx, matched_cnt,
picking_null_keys};
}
template <int JoinOpType>
@@ -215,21 +227,23 @@ public:
bool has_null_key() { return _has_null_key; }
- void pre_build_idxs(std::vector<uint32>& bucksets, const uint8_t*
null_map) {
+ void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map)
{
if (null_map) {
- first[bucket_size] = bucket_size; // distinguish between not
matched and null
- }
-
- for (uint32_t i = 0; i < bucksets.size(); i++) {
- bucksets[i] = first[bucksets[i]];
+ for (unsigned int& bucket : buckets) {
+ bucket = bucket == bucket_size ? bucket_size : first[bucket];
+ }
+ } else {
+ for (unsigned int& bucket : buckets) {
+ bucket = first[bucket];
+ }
}
}
private:
template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join>
- auto _process_null_aware_left_anti_join_for_empty_build_side(
- int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs,
- uint32_t* __restrict build_idxs, vectorized::ColumnFilterHelper*
mark_column) {
+ auto _process_null_aware_left_anti_join_for_empty_build_side(int
probe_idx, int probe_rows,
+ uint32_t*
__restrict probe_idxs,
+ uint32_t*
__restrict build_idxs) {
static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
auto matched_cnt = 0;
const auto batch_size = max_batch_size;
@@ -240,11 +254,6 @@ private:
++matched_cnt;
}
- if constexpr (is_mark_join && !with_other_conjuncts) {
- // we will flip the mark column later for anti join, so here set 0
into mark column.
- mark_column->resize_fill(matched_cnt, 0);
- }
-
return std::tuple {probe_idx, 0U, matched_cnt};
}
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index 02bf242e55a..9f4ddbabdcb 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -67,12 +67,11 @@ struct ProcessHashTableProbe {
// each matching join column need to be processed by other join conjunct.
so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and
same_to_prev struct
- Status do_other_join_conjuncts(Block* output_block, bool is_mark_join,
- std::vector<uint8_t>& visited, bool
has_null_in_build_side);
+ Status do_other_join_conjuncts(Block* output_block, std::vector<uint8_t>&
visited,
+ bool has_null_in_build_side);
template <bool with_other_conjuncts>
- Status do_mark_join_conjuncts(Block* output_block, size_t
hash_table_bucket_size,
- const std::set<uint32_t>& null_result);
+ Status do_mark_join_conjuncts(Block* output_block, size_t
hash_table_bucket_size);
template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
@@ -85,6 +84,10 @@ struct ProcessHashTableProbe {
Status process_data_in_hashtable(HashTableType& hash_table_ctx,
MutableBlock& mutable_block,
Block* output_block, bool* eos);
+ /// For null aware join with other conjuncts, if the probe key of one row
on left side is null,
+ /// we should make this row match with all rows in build side.
+ size_t _process_probe_null_key(uint32_t probe_idx);
+
Parent* _parent = nullptr;
const int _batch_size;
const std::shared_ptr<Block>& _build_block;
@@ -93,7 +96,15 @@ struct ProcessHashTableProbe {
std::vector<uint32_t> _probe_indexs;
bool _probe_visited = false;
+ bool _picking_null_keys = false;
std::vector<uint32_t> _build_indexs;
+ std::vector<uint8_t> _null_flags;
+
+ /// If the probe key of one row on left side is null,
+ /// we will make all rows in build side match with this row,
+ /// `_build_index_for_null_probe_key` is used to record the progress if
the build block is too big.
+ uint32_t _build_index_for_null_probe_key {0};
+
std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 1939b702c69..06dfdac9074 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -131,6 +131,11 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
// may over batch size 1 for some outer join case
_probe_indexs.resize(_batch_size + 1);
_build_indexs.resize(_batch_size + 1);
+ if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
+ _null_flags.resize(_batch_size + 1);
+ memset(_null_flags.data(), 0, _batch_size + 1);
+ }
if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
@@ -175,26 +180,41 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
auto& mcol = mutable_block.mutable_columns();
int current_offset = 0;
-
- std::unique_ptr<ColumnFilterHelper> mark_column;
- if (is_mark_join) {
- mark_column = std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() -
1]);
- }
-
- /// `null_result` set which contains the probe indexes of null results.
- std::set<uint32_t> null_result;
if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
SCOPED_TIMER(_search_hashtable_timer);
- auto [new_probe_idx, new_build_idx, new_current_offset] =
-
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
- hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
- build_index, probe_rows, _probe_indexs.data(),
_build_indexs.data(),
- null_result, *(_parent->_build_indexes_null),
_build_block->rows());
- probe_index = new_probe_idx;
- build_index = new_build_idx;
- current_offset = new_current_offset;
+
+ /// If `_build_index_for_null_probe_key` is not zero, it means we are
in progress of handling probe null key.
+ if (_build_index_for_null_probe_key) {
+ DCHECK_EQ(build_index,
hash_table_ctx.hash_table->get_bucket_size());
+ current_offset = _process_probe_null_key(probe_index);
+ if (!_build_index_for_null_probe_key) {
+ probe_index++;
+ build_index = 0;
+ }
+ } else {
+ auto [new_probe_idx, new_build_idx, new_current_offset,
picking_null_keys] =
+
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
+ hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
+ build_index, probe_rows, _probe_indexs.data(),
_build_indexs.data(),
+ _null_flags.data(), _picking_null_keys);
+ probe_index = new_probe_idx;
+ build_index = new_build_idx;
+ current_offset = new_current_offset;
+ _picking_null_keys = picking_null_keys;
+
+ if (build_index == hash_table_ctx.hash_table->get_bucket_size()) {
+ _build_index_for_null_probe_key = 1;
+ if (current_offset == 0) {
+ current_offset = _process_probe_null_key(probe_index);
+ if (!_build_index_for_null_probe_key) {
+ probe_index++;
+ build_index = 0;
+ }
+ }
+ }
+ }
} else {
SCOPED_TIMER(_search_hashtable_timer);
auto [new_probe_idx, new_build_idx,
@@ -203,7 +223,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
need_null_map_for_probe &&
ignore_null > (hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(),
probe_index, build_index, probe_rows,
_probe_indexs.data(),
- _probe_visited, _build_indexs.data(),
mark_column.get());
+ _probe_visited, _build_indexs.data());
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
@@ -235,20 +255,76 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
if constexpr (is_mark_join) {
return do_mark_join_conjuncts<with_other_conjuncts>(
- output_block, hash_table_ctx.hash_table->get_bucket_size(),
null_result);
+ output_block, hash_table_ctx.hash_table->get_bucket_size());
} else if constexpr (with_other_conjuncts) {
- return do_other_join_conjuncts(output_block, is_mark_join,
-
hash_table_ctx.hash_table->get_visited(),
+ return do_other_join_conjuncts(output_block,
hash_table_ctx.hash_table->get_visited(),
hash_table_ctx.hash_table->has_null_key());
}
return Status::OK();
}
+template <int JoinOpType, typename Parent>
+size_t ProcessHashTableProbe<JoinOpType,
Parent>::_process_probe_null_key(uint32_t probe_index) {
+ const auto rows = _build_block->rows();
+
+ DCHECK_LT(_build_index_for_null_probe_key, rows);
+ DCHECK_LT(0, _build_index_for_null_probe_key);
+ size_t matched_cnt = 0;
+ for (; _build_index_for_null_probe_key < rows && matched_cnt <
_batch_size; ++matched_cnt) {
+ _probe_indexs[matched_cnt] = probe_index;
+ _build_indexs[matched_cnt] = _build_index_for_null_probe_key++;
+ _null_flags[matched_cnt] = 1;
+ }
+
+ if (_build_index_for_null_probe_key == rows) {
+ _build_index_for_null_probe_key = 0;
+ _probe_indexs[matched_cnt] = probe_index;
+ _build_indexs[matched_cnt] = 0;
+ _null_flags[matched_cnt] = 0;
+ matched_cnt++;
+ }
+
+ return matched_cnt;
+}
+
+/**
+ * Mark join: there is a column named mark column which stores the result
of mark join conjunct.
+ * For example:
+ * ```sql
+ * select * from t1 where t1.k1 not in (select t2.k1 from t2 where t2.k2
= t1.k2 and t2.k3 > t1.k3) or t1.k4 < 10;
+ * ```
+ * equal join conjuncts: t2.k2 = t1.k2
+ * mark join conjunct: t1.k1 = t2.k1
+ * other join conjuncts: t2.k3 > t1.k3
+ * other predicates: $c$1 or t1.k4 < 10 # `$c$1` means the result of
mark join conjunct(mark column)
+ *
+ * Executing flow:
+ *
+ * Equal join conjuncts (probe hash table)
+ * ↓↓
+ * Mark join conjuncts (result is nullable, stored in mark column)
+ * ↓↓
+ * Other join conjuncts (update the mark column)
+ * ↓↓
+ * Other predicates (filter rows)
+ *
+ * ```sql
+ * select * from t1 where t1.k1 not in (select t2.k1 from t2 where t2.k3
> t1.k3) or t1.k4 < 10;
+ * ```
+ * This sql has no equal join conjuncts:
+ * equal join conjuncts: NAN
+ * mark join conjunct: t1.k1 = t2.k1
+ * other join conjuncts: t2.k3 > t1.k3
+ * other predicates: $c$1 or t1.k4 < 10 # `$c$1` means the result of
mark join conjunct(mark column)
+ *
+ * To avoid using nested loop join, we use the mark join conjunct(`t1.k1 =
t2.k1`) as the equal join conjunct.
+ * So this query will be a "null aware left anti join", which means the
equal conjunct's result should be nullable.
+ */
template <int JoinOpType, typename Parent>
template <bool with_other_conjuncts>
Status ProcessHashTableProbe<JoinOpType, Parent>::do_mark_join_conjuncts(
- Block* output_block, size_t hash_table_bucket_size, const
std::set<uint32_t>& null_result) {
+ Block* output_block, size_t hash_table_bucket_size) {
DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::LEFT_SEMI_JOIN ||
@@ -260,6 +336,10 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_mark_join_conjuncts(
JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
const auto row_count = output_block->rows();
+ if (!row_count) {
+ return Status::OK();
+ }
+
auto mark_column_mutable =
output_block->get_by_position(_parent->_mark_column_id).column->assume_mutable();
auto& mark_column = assert_cast<ColumnNullable&>(*mark_column_mutable);
@@ -281,8 +361,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_mark_join_conjuncts(
filter_data[i] = _build_indexs[i] != 0 && _build_indexs[i] !=
hash_table_bucket_size;
if constexpr (is_null_aware_join) {
if constexpr (with_other_conjuncts) {
- mark_null_map[i] =
- null_result.contains(_probe_indexs[i]) &&
_build_indexs[i] != 0;
+ mark_null_map[i] = _null_flags[i];
} else {
if (filter_data[i]) {
last_probe_matched = _probe_indexs[i];
@@ -361,7 +440,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_mark_join_conjuncts(
if constexpr (is_anti_join) {
// flip the mark column
for (size_t i = 0; i != row_count; ++i) {
- mark_filter_data[i] ^= 1;
+ mark_filter_data[i] ^= 1; // not null/ null
}
}
@@ -372,8 +451,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_mark_join_conjuncts(
template <int JoinOpType, typename Parent>
Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts(
- Block* output_block, bool is_mark_join, std::vector<uint8_t>& visited,
- bool has_null_in_build_side) {
+ Block* output_block, std::vector<uint8_t>& visited, bool
has_null_in_build_side) {
// dispose the other join conjunct exec
auto row_count = output_block->rows();
if (!row_count) {
@@ -440,30 +518,18 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
for (size_t i = 0; i < row_count; ++i) {
bool not_matched_before = _parent->_last_probe_match !=
_probe_indexs[i];
- // _build_indexs[i] == 0 means the end of this probe index
- // if a probe row not matched with any build row, we need output a
false value into mark column
if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
if (_build_indexs[i] == 0) {
- filter_map[i] = is_mark_join && not_matched_before;
- filter_column_ptr[i] = false;
+ filter_map[i] = false;
+ } else if (filter_column_ptr[i]) {
+ filter_map[i] = not_matched_before;
+ _parent->_last_probe_match = _probe_indexs[i];
} else {
- if (filter_column_ptr[i]) {
- filter_map[i] = not_matched_before;
- _parent->_last_probe_match = _probe_indexs[i];
- } else {
- filter_map[i] = false;
- }
+ filter_map[i] = false;
}
} else {
if (_build_indexs[i] == 0) {
- if (not_matched_before) {
- filter_map[i] = true;
- } else if (is_mark_join) {
- filter_map[i] = true;
- filter_column_ptr[i] = false;
- } else {
- filter_map[i] = false;
- }
+ filter_map[i] = not_matched_before;
} else {
filter_map[i] = false;
if (filter_column_ptr[i]) {
@@ -473,21 +539,6 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
}
}
- if (is_mark_join) {
- auto mark_column =
- output_block->get_by_position(orig_columns -
1).column->assume_mutable();
- ColumnFilterHelper helper(*mark_column);
- for (size_t i = 0; i < row_count; ++i) {
- bool mathced = filter_column_ptr[i] &&
- (_build_indexs[i] != 0) == (JoinOpType ==
TJoinOp::LEFT_SEMI_JOIN);
- if (has_null_in_build_side && !mathced) {
- helper.insert_null();
- } else {
- helper.insert_value(mathced);
- }
- }
- }
-
output_block->get_by_position(result_column_id).column =
std::move(new_filter_column);
} else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) {
@@ -512,8 +563,7 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_other_join_conjuncts(
JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
orig_columns = _right_col_idx;
}
- RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id,
- is_mark_join ?
output_block->columns() : orig_columns));
+ RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id,
orig_columns));
}
return Status::OK();
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index a813ec565a4..ec630f3fe32 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -180,12 +180,6 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
}
#endif
- if ((_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- _join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
- _have_other_join_conjunct) {
- _build_indexes_null = std::make_shared<std::vector<uint32_t>>();
- }
-
_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
@@ -761,7 +755,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
// arena will be shared with other instances.
_shared_hash_table_context->arena = _arena;
_shared_hash_table_context->block = _build_block;
- _shared_hash_table_context->build_indexes_null =
_build_indexes_null;
_shared_hash_table_context->hash_table_variants =
_hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
_has_null_in_build_side;
@@ -794,7 +787,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
*std::static_pointer_cast<HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
_build_block = _shared_hash_table_context->block;
- _build_indexes_null = _shared_hash_table_context->build_indexes_null;
if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 95c59094ba6..c38f8f563ea 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -117,11 +117,6 @@ struct ProcessHashTableBuild {
for (uint32_t i = 1; i < _rows; i++) {
if ((*null_map)[i]) {
*has_null_key = true;
- if constexpr (with_other_conjuncts &&
- (JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) {
- _parent->_build_indexes_null->emplace_back(i);
- }
}
}
if (short_circuit_for_null && *has_null_key) {
@@ -136,8 +131,8 @@ struct ProcessHashTableBuild {
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() :
nullptr, true, true,
hash_table_ctx.hash_table->get_bucket_size());
- hash_table_ctx.hash_table->build(hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(),
- _rows);
+ hash_table_ctx.hash_table->template build<JoinOpType,
with_other_conjuncts>(
+ hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
hash_table_ctx.bucket_nums.resize(_batch_size);
hash_table_ctx.bucket_nums.shrink_to_fit();
@@ -303,13 +298,6 @@ private:
std::vector<uint16_t> _probe_column_disguise_null;
std::vector<uint16_t> _probe_column_convert_to_null;
- /*
- * For null aware anti/semi join with other join conjuncts, we do need to
care about the rows in
- * build side with null keys,
- * because the other join conjuncts' result maybe change null to
false(null & false == false).
- */
- std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;
-
DataTypes _right_table_data_types;
DataTypes _left_table_data_types;
std::vector<std::string> _right_table_column_names;
diff --git a/regression-test/data/nereids_p0/join/test_mark_join.out
b/regression-test/data/nereids_p0/join/test_mark_join.out
new file mode 100644
index 00000000000..4098502b75d
--- /dev/null
+++ b/regression-test/data/nereids_p0/join/test_mark_join.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !mark_join1 --
+1 1 true
+2 2 true
+3 \N true
+3 \N true
+4 \N \N
+
+-- !mark_join2 --
+1 1 \N
+2 2 \N
+3 \N \N
+3 \N true
+4 \N true
+
+-- !mark_join3 --
+1 1 false
+2 2 false
+3 \N false
+3 \N false
+4 \N false
+
+-- !mark_join4 --
+1 1 false
+2 2 false
+3 \N \N
+3 \N true
+4 \N true
+
+-- !mark_join5 --
+1 1 false
+2 2 false
+3 \N true
+3 \N true
+4 \N \N
+
+-- !mark_join6 --
+1 1 true
+2 2 true
+3 \N false
+3 \N true
+4 \N false
+
diff --git a/regression-test/suites/nereids_p0/join/test_mark_join.groovy
b/regression-test/suites/nereids_p0/join/test_mark_join.groovy
new file mode 100644
index 00000000000..6008919d831
--- /dev/null
+++ b/regression-test/suites/nereids_p0/join/test_mark_join.groovy
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_mark_join", "nereids_p0") {
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+
+ sql "drop table if exists `test_mark_join_t1`;"
+ sql "drop table if exists `test_mark_join_t2`;"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_mark_join_t1` (
+ k1 int not null,
+ k2 int,
+ k3 bigint,
+ v1 varchar(255) not null,
+ v2 varchar(255),
+ v3 varchar(255)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ );
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS `test_mark_join_t2` (
+ k1 int not null,
+ k2 int,
+ k3 bigint,
+ v1 varchar(255) not null,
+ v2 varchar(255),
+ v3 varchar(255)
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2"
+ );
+ """
+
+ sql """
+ insert into `test_mark_join_t1` values
+ (1, 1, 1, 'abc', 'efg', 'hjk'),
+ (2, 2, 2, 'aabb', 'eeff', 'ccdd'),
+ (3, null, 3, 'iii', null, null),
+ (3, null, null, 'hhhh', null, null),
+ (4, null, 4, 'dddd', 'ooooo', 'kkkkk'
+ );
+ """
+
+ sql """
+ insert into `test_mark_join_t2` values
+ (1, 1, 1, 'abc', 'efg', 'hjk'),
+ (2, 2, 2, 'aabb', 'eeff', 'ccdd'),
+ (3, null, null, 'diid', null, null),
+ (3, null, 3, 'ooekd', null, null),
+ (4, 4, null, 'oepeld', null, 'kkkkk'
+ );
+ """
+
+ qt_mark_join1 """
+ select
+ k1, k2
+ , k1 not in (select test_mark_join_t2.k2 from test_mark_join_t2
where test_mark_join_t2.k3 < test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+ qt_mark_join2 """
+ select
+ k1, k2
+ , k2 not in (select test_mark_join_t2.k3 from test_mark_join_t2
where test_mark_join_t2.k2 > test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+ qt_mark_join3 """
+ select
+ k1, k2
+ , k1 in (select test_mark_join_t2.k1 from test_mark_join_t2 where
test_mark_join_t2.k3 < test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+ qt_mark_join4 """
+ select
+ k1, k2
+ , k1 not in (select test_mark_join_t2.k2 from test_mark_join_t2
where test_mark_join_t2.k3 = test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+ qt_mark_join5 """
+ select
+ k1, k2
+ , k2 not in (select test_mark_join_t2.k3 from test_mark_join_t2
where test_mark_join_t2.k2 = test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+ qt_mark_join6 """
+ select
+ k1, k2
+ , k1 in (select test_mark_join_t2.k1 from test_mark_join_t2 where
test_mark_join_t2.k3 = test_mark_join_t1.k3) vv
+ from test_mark_join_t1 order by 1, 2, 3;
+ """
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]