This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new ec6ad147ba [fix](hashjoin) join produce blocks with rows larger than
batch size (#16535)
ec6ad147ba is described below
commit ec6ad147bac0a02fc958f19a912e4b84b41b38af
Author: TengJianPing <[email protected]>
AuthorDate: Thu Feb 9 00:07:04 2023 +0800
[fix](hashjoin) join produce blocks with rows larger than batch size
(#16535)
Pick #16402 and #16166 from master
---
be/src/vec/exec/join/process_hash_table_probe.h | 6 +
.../vec/exec/join/process_hash_table_probe_impl.h | 690 +++++++++++++++------
be/src/vec/exec/join/vhash_join_node.cpp | 5 +
be/src/vec/exec/join/vhash_join_node.h | 11 +-
4 files changed, 515 insertions(+), 197 deletions(-)
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index 5d8c8097a7..2347b3fc63 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -66,6 +66,12 @@ struct ProcessHashTableProbe {
MutableBlock& mutable_block,
Block* output_block,
size_t probe_rows);
+ void _process_splited_equal_matched_tuples(int start_row_idx, int
row_count,
+ const ColumnPtr&
other_hit_column,
+ std::vector<bool*>&
visited_map, int right_col_idx,
+ int right_col_len, UInt8*
__restrict null_map_data,
+ UInt8* __restrict filter_map,
Block* output_block);
+
// Process full outer join/ right join / right semi/anti join to output
the join result
// in hash table
template <typename HashTableType>
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 31e621d1c1..414d8e3ff4 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -19,6 +19,7 @@
#include "common/status.h"
#include "process_hash_table_probe.h"
+#include "util/simd/bits.h"
#include "vhash_join_node.h"
namespace doris::vectorized {
@@ -130,7 +131,6 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
- DCHECK_EQ(probe_size, column->size() - last_probe_index);
mcol[i]->insert_range_from(*column, last_probe_index,
probe_size);
} else {
DCHECK_GE(_items_counts.size(), last_probe_index + probe_size);
@@ -202,109 +202,161 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
bool all_match_one = true;
int last_probe_index = probe_index;
+ size_t probe_size = 0;
+ auto& probe_row_match_iter =
+
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
{
SCOPED_TIMER(_search_hashtable_timer);
- while (probe_index < probe_rows) {
- if constexpr (ignore_null && need_null_map_for_probe) {
- if ((*null_map)[probe_index]) {
- if constexpr (probe_all) {
- _items_counts[probe_index++] = (uint32_t)1;
- // only full outer / left outer need insert the data
of right table
- if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
- } else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
- }
- ++current_offset;
+ if constexpr (!is_right_semi_anti_join) {
+ // handle ramaining matched rows from last probe row
+ if (probe_row_match_iter.ok()) {
+ for (; probe_row_match_iter.ok() && current_offset <
_batch_size;
+ ++probe_row_match_iter) {
+ if (LIKELY(current_offset < _build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
probe_row_match_iter->block_offset;
+ _build_block_rows[current_offset] =
probe_row_match_iter->row_num;
} else {
- _items_counts[probe_index++] = (uint32_t)0;
+
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
}
- all_match_one = false;
- continue;
- }
- }
- int last_offset = current_offset;
- auto find_result =
- !need_null_map_for_probe
- ? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena)
- : (*null_map)[probe_index]
- ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
- *_arena)) {nullptr,
false}
- : key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena);
- if (probe_index + PREFETCH_STEP < probe_rows) {
- key_getter.template prefetch<true>(hash_table_ctx.hash_table,
- probe_index +
PREFETCH_STEP, *_arena);
- }
-
- if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- if (!find_result.is_found()) {
++current_offset;
}
- } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
- if (find_result.is_found()) {
- ++current_offset;
+ _items_counts[probe_index] = current_offset;
+ all_match_one &= (current_offset == 1);
+ if (!probe_row_match_iter.ok()) {
+ ++probe_index;
}
- } else {
- if (find_result.is_found()) {
- auto& mapped = find_result.get_mapped();
- // TODO: Iterators are currently considered to be a heavy
operation and have a certain impact on performance.
- // We should rethink whether to use this iterator mode in
the future. Now just opt the one row case
- if (mapped.get_row_count() == 1) {
- if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
- mapped.visited = true;
- }
-
- if constexpr (!is_right_semi_anti_join) {
+ probe_size = 1;
+ }
+ }
+ if (current_offset < _batch_size) {
+ while (probe_index < probe_rows) {
+ if constexpr (ignore_null && need_null_map_for_probe) {
+ if ((*null_map)[probe_index]) {
+ if constexpr (probe_all) {
+ _items_counts[probe_index++] = (uint32_t)1;
+ // only full outer / left outer need insert the
data of right table
if (LIKELY(current_offset <
_build_block_rows.size())) {
- _build_block_offsets[current_offset] =
mapped.block_offset;
- _build_block_rows[current_offset] =
mapped.row_num;
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
} else {
-
_build_block_offsets.emplace_back(mapped.block_offset);
- _build_block_rows.emplace_back(mapped.row_num);
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
}
++current_offset;
+ } else {
+ _items_counts[probe_index++] = (uint32_t)0;
}
- } else {
- for (auto it = mapped.begin(); it.ok(); ++it) {
+ all_match_one = false;
+ if constexpr (probe_all) {
+ if (current_offset >= _batch_size) {
+ break;
+ }
+ }
+ continue;
+ }
+ }
+ int last_offset = current_offset;
+ auto find_result = !need_null_map_for_probe
+ ?
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena)
+ : (*null_map)[probe_index]
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+
probe_index,
+
*_arena)) {nullptr, false}
+ :
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena);
+ if (probe_index + PREFETCH_STEP < probe_rows) {
+ key_getter.template
prefetch<true>(hash_table_ctx.hash_table,
+ probe_index +
PREFETCH_STEP, *_arena);
+ }
+
+ auto current_probe_index = probe_index;
+ if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (!find_result.is_found()) {
+ ++current_offset;
+ }
+ ++probe_index;
+ } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
+ if (find_result.is_found()) {
+ ++current_offset;
+ }
+ ++probe_index;
+ } else {
+ if (find_result.is_found()) {
+ auto& mapped = find_result.get_mapped();
+ // TODO: Iterators are currently considered to be a
heavy operation and have a certain impact on performance.
+ // We should rethink whether to use this iterator mode
in the future. Now just opt the one row case
+ if (mapped.get_row_count() == 1) {
+ if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
+ mapped.visited = true;
+ }
+
if constexpr (!is_right_semi_anti_join) {
if (LIKELY(current_offset <
_build_block_rows.size())) {
- _build_block_offsets[current_offset] =
it->block_offset;
- _build_block_rows[current_offset] =
it->row_num;
+ _build_block_offsets[current_offset] =
mapped.block_offset;
+ _build_block_rows[current_offset] =
mapped.row_num;
} else {
-
_build_block_offsets.emplace_back(it->block_offset);
-
_build_block_rows.emplace_back(it->row_num);
+
_build_block_offsets.emplace_back(mapped.block_offset);
+
_build_block_rows.emplace_back(mapped.row_num);
}
++current_offset;
}
- }
- if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
- mapped.visited = true;
- }
- }
- } else {
- if constexpr (probe_all) {
- // only full outer / left outer need insert the data
of right table
- if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
+ ++probe_index;
} else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
+ if constexpr (!is_right_semi_anti_join) {
+ auto it = mapped.begin();
+ for (; it.ok() && current_offset <
_batch_size; ++it) {
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
it->block_offset;
+ _build_block_rows[current_offset] =
it->row_num;
+ } else {
+
_build_block_offsets.emplace_back(it->block_offset);
+
_build_block_rows.emplace_back(it->row_num);
+ }
+ ++current_offset;
+ }
+ probe_row_match_iter = it;
+ if (!it.ok()) {
+ // If all matched rows for the current
probe row are handled,
+ // advance to next probe row.
+ // If not(which means it excceed batch
size), probe_index is not increased and
+ // remaining matched rows for the current
probe row will be
+ // handled in the next call of this
function
+ ++probe_index;
+ }
+ } else {
+ ++probe_index;
+ }
+ if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
+ mapped.visited = true;
+ }
}
- ++current_offset;
+ } else {
+ if constexpr (probe_all) {
+ // only full outer / left outer need insert the
data of right table
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
+ } else {
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
+ }
+ ++current_offset;
+ }
+ ++probe_index;
}
}
- }
- uint32_t count = (uint32_t)(current_offset - last_offset);
- _items_counts[probe_index++] = count;
- all_match_one &= (count == 1);
- if (current_offset >= _batch_size && !all_match_one) {
- break;
+ uint32_t count = (uint32_t)(current_offset - last_offset);
+ _items_counts[current_probe_index] = count;
+ all_match_one &= (count == 1);
+ if (current_offset >= _batch_size) {
+ break;
+ }
}
+ probe_size = probe_index - last_probe_index +
(probe_row_match_iter.ok() ? 1 : 0);
}
}
@@ -318,8 +370,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
SCOPED_TIMER(_probe_side_output_timer);
probe_side_output_column(mcol, _join_node->_left_output_slot_flags,
current_offset,
- last_probe_index, probe_index -
last_probe_index, all_match_one,
- false);
+ last_probe_index, probe_size, all_match_one,
false);
}
output_block->swap(mutable_block.to_block());
@@ -383,100 +434,165 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
bool all_match_one = true;
int last_probe_index = probe_index;
- while (probe_index < probe_rows) {
- // ignore null rows
- if constexpr (ignore_null && need_null_map_for_probe) {
- if ((*null_map)[probe_index]) {
- if constexpr (probe_all) {
- _items_counts[probe_index++] = (uint32_t)1;
- same_to_prev.emplace_back(false);
- visited_map.emplace_back(nullptr);
- // only full outer / left outer need insert the data
of right table
+
+ int row_count_from_last_probe = 0;
+ bool is_the_last_sub_block = false;
+ size_t probe_size = 0;
+ auto& probe_row_match_iter =
+
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
+ if (probe_row_match_iter.ok()) {
+ auto origin_offset = current_offset;
+ for (; probe_row_match_iter.ok() && current_offset < _batch_size;
+ ++probe_row_match_iter) {
+ if (LIKELY(current_offset < _build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
probe_row_match_iter->block_offset;
+ _build_block_rows[current_offset] =
probe_row_match_iter->row_num;
+ } else {
+
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
+ }
+ ++current_offset;
+ visited_map.emplace_back(&probe_row_match_iter->visited);
+ }
+ same_to_prev.emplace_back(false);
+ for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
+ same_to_prev.emplace_back(true);
+ }
+
+ row_count_from_last_probe = current_offset;
+ all_match_one &= (current_offset == 1);
+ _items_counts[probe_index] = current_offset;
+ if (!probe_row_match_iter.ok()) {
+ ++probe_index;
+ is_the_last_sub_block = true;
+ }
+ probe_size = 1;
+ }
+ int multi_matched_output_row_count = 0;
+ if (current_offset < _batch_size) {
+ while (probe_index < probe_rows) {
+ // ignore null rows
+ if constexpr (ignore_null && need_null_map_for_probe) {
+ if ((*null_map)[probe_index]) {
+ if constexpr (probe_all) {
+ _items_counts[probe_index++] = (uint32_t)1;
+ same_to_prev.emplace_back(false);
+ visited_map.emplace_back(nullptr);
+ // only full outer / left outer need insert the
data of right table
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
+ } else {
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
+ }
+ ++current_offset;
+ } else {
+ _items_counts[probe_index++] = (uint32_t)0;
+ }
+ all_match_one = false;
+ if constexpr (probe_all) {
+ if (current_offset >= _batch_size) {
+ break;
+ }
+ }
+ continue;
+ }
+ }
+
+ auto last_offset = current_offset;
+ auto find_result = !need_null_map_for_probe
+ ?
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena)
+ : (*null_map)[probe_index]
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+
probe_index,
+
*_arena)) {nullptr, false}
+ :
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena);
+ if (probe_index + PREFETCH_STEP < probe_rows) {
+ key_getter.template
prefetch<true>(hash_table_ctx.hash_table,
+ probe_index +
PREFETCH_STEP, *_arena);
+ }
+
+ auto current_probe_index = probe_index;
+ if (find_result.is_found()) {
+ auto& mapped = find_result.get_mapped();
+ auto origin_offset = current_offset;
+ // TODO: Iterators are currently considered to be a heavy
operation and have a certain impact on performance.
+ // We should rethink whether to use this iterator mode in
the future. Now just opt the one row case
+ if (mapped.get_row_count() == 1) {
if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
+ _build_block_offsets[current_offset] =
mapped.block_offset;
+ _build_block_rows[current_offset] = mapped.row_num;
} else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
+
_build_block_offsets.emplace_back(mapped.block_offset);
+ _build_block_rows.emplace_back(mapped.row_num);
}
++current_offset;
+ visited_map.emplace_back(&mapped.visited);
+ ++probe_index;
} else {
- _items_counts[probe_index++] = (uint32_t)0;
+ auto multi_match_last_offset = current_offset;
+ auto it = mapped.begin();
+ // breaks if row count exceeds batch_size
+ for (; it.ok() && current_offset < _batch_size; ++it) {
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
it->block_offset;
+ _build_block_rows[current_offset] =
it->row_num;
+ } else {
+
_build_block_offsets.emplace_back(it->block_offset);
+ _build_block_rows.emplace_back(it->row_num);
+ }
+ ++current_offset;
+ visited_map.emplace_back(&it->visited);
+ }
+ probe_row_match_iter = it;
+ // If all matched rows for the current probe row are
handled,
+ // advance to next probe row.
+ if (!it.ok()) {
+ ++probe_index;
+ } else {
+ // If not(which means it excceed batch size),
probe_index is not increased and
+ // remaining matched rows for the current probe
row will be
+ // handled in the next call of this function
+ multi_matched_output_row_count =
+ current_offset - multi_match_last_offset;
+ }
}
- all_match_one = false;
- continue;
- }
- }
-
- auto last_offset = current_offset;
- auto find_result =
- !need_null_map_for_probe
- ? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena)
- : (*null_map)[probe_index]
- ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
- *_arena)) {nullptr,
false}
- : key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena);
- if (probe_index + PREFETCH_STEP < probe_rows) {
- key_getter.template prefetch<true>(hash_table_ctx.hash_table,
- probe_index +
PREFETCH_STEP, *_arena);
- }
- if (find_result.is_found()) {
- auto& mapped = find_result.get_mapped();
- auto origin_offset = current_offset;
- // TODO: Iterators are currently considered to be a heavy
operation and have a certain impact on performance.
- // We should rethink whether to use this iterator mode in the
future. Now just opt the one row case
- if (mapped.get_row_count() == 1) {
+ same_to_prev.emplace_back(false);
+ for (int i = 0; i < current_offset - origin_offset - 1;
++i) {
+ same_to_prev.emplace_back(true);
+ }
+ } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
+ JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
+ JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ same_to_prev.emplace_back(false);
+ visited_map.emplace_back(nullptr);
+ // only full outer / left outer need insert the data of
right table
+ // left anti use -1 use a default value
if (LIKELY(current_offset < _build_block_rows.size())) {
- _build_block_offsets[current_offset] =
mapped.block_offset;
- _build_block_rows[current_offset] = mapped.row_num;
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
} else {
- _build_block_offsets.emplace_back(mapped.block_offset);
- _build_block_rows.emplace_back(mapped.row_num);
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
}
++current_offset;
- visited_map.emplace_back(&mapped.visited);
+ ++probe_index;
} else {
- for (auto it = mapped.begin(); it.ok(); ++it) {
- if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] =
it->block_offset;
- _build_block_rows[current_offset] = it->row_num;
- } else {
-
_build_block_offsets.emplace_back(it->block_offset);
- _build_block_rows.emplace_back(it->row_num);
- }
- ++current_offset;
- visited_map.emplace_back(&it->visited);
- }
- }
- same_to_prev.emplace_back(false);
- for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
- same_to_prev.emplace_back(true);
+ // other join, no nothing
+ ++probe_index;
}
- } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
- JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
- JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- same_to_prev.emplace_back(false);
- visited_map.emplace_back(nullptr);
- // only full outer / left outer need insert the data of right
table
- // left anti use -1 use a default value
- if (LIKELY(current_offset < _build_block_rows.size())) {
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
- } else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
+ uint32_t count = (uint32_t)(current_offset - last_offset);
+ _items_counts[current_probe_index] = count;
+ all_match_one &= (count == 1);
+ if (current_offset >= _batch_size) {
+ break;
}
- ++current_offset;
- } else {
- // other join, no nothing
- }
- uint32_t count = (uint32_t)(current_offset - last_offset);
- _items_counts[probe_index++] = count;
- all_match_one &= (count == 1);
- if (current_offset >= _batch_size && !all_match_one) {
- break;
}
+ probe_size = probe_index - last_probe_index +
(probe_row_match_iter.ok() ? 1 : 0);
}
{
@@ -487,13 +603,13 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
{
SCOPED_TIMER(_probe_side_output_timer);
probe_side_output_column(mcol,
_join_node->_left_output_slot_flags, current_offset,
- last_probe_index, probe_index -
last_probe_index,
- all_match_one, true);
+ last_probe_index, probe_size,
all_match_one, true);
}
output_block->swap(mutable_block.to_block());
// dispose the other join conjunct exec
- if (output_block->rows()) {
+ auto row_count = output_block->rows();
+ if (row_count) {
int result_column_id = -1;
int orig_columns = output_block->columns();
RETURN_IF_ERROR((*_join_node->_vother_join_conjunct_ptr)
@@ -502,13 +618,29 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
auto column =
output_block->get_by_position(result_column_id).column;
if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
- auto new_filter_column = ColumnVector<UInt8>::create();
- auto& filter_map = new_filter_column->get_data();
+ auto new_filter_column =
ColumnVector<UInt8>::create(row_count);
+ auto* __restrict filter_map =
new_filter_column->get_data().data();
- auto null_map_column =
ColumnVector<UInt8>::create(column->size(), 0);
+ auto null_map_column = ColumnVector<UInt8>::create(row_count,
0);
auto* __restrict null_map_data =
null_map_column->get_data().data();
- for (int i = 0; i < column->size(); ++i) {
+ // It contains non-first sub block of splited
equal-conjuncts-matched tuples from last probe row
+ if (row_count_from_last_probe > 0) {
+ _process_splited_equal_matched_tuples(0,
row_count_from_last_probe, column,
+ visited_map,
right_col_idx, right_col_len,
+ null_map_data,
filter_map, output_block);
+ // This is the last sub block of splitted block, and no
equal-conjuncts-matched tuple
+ // is output in all sub blocks, need to output a tuple for
this probe row
+ if (is_the_last_sub_block &&
!_join_node->_is_any_probe_match_row_output) {
+ filter_map[0] = true;
+ null_map_data[0] = true;
+ }
+ }
+
+ int end_idx = row_count - multi_matched_output_row_count;
+ // process equal-conjuncts-matched tuples that are newly
generated
+ // in this run if there are any.
+ for (size_t i = row_count_from_last_probe; i < end_idx; ++i) {
auto join_hit = visited_map[i] != nullptr;
auto other_hit = column->get_bool(i);
@@ -524,21 +656,39 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
null_map_data[i] = !join_hit || !other_hit;
+ // For cases where one probe row matches multiple build
rows for equal conjuncts,
+ // all the other-conjuncts-matched tuples should be output.
+ //
+ // Other-conjuncts-NOT-matched tuples fall into two
categories:
+ // 1. The beginning consecutive one(s).
+ // For these tuples, only the last one is marked to
output;
+ // If there are any following
other-conjuncts-matched tuples,
+ // the last tuple is also marked NOT to output.
+ // 2. All the remaining other-conjuncts-NOT-matched
tuples.
+ // All these tuples are marked not to output.
if (join_hit) {
*visited_map[i] |= other_hit;
- filter_map.push_back(other_hit || !same_to_prev[i] ||
- (!column->get_bool(i - 1) &&
filter_map.back()));
+ filter_map[i] = other_hit || !same_to_prev[i] ||
+ (!column->get_bool(i - 1) &&
filter_map[i - 1]);
// Here to keep only hit join conjunct and other join
conjunt is true need to be output.
// if not, only some key must keep one row will output
will null right table column
- if (same_to_prev[i] && filter_map.back() &&
!column->get_bool(i - 1)) {
+ if (same_to_prev[i] && filter_map[i] &&
!column->get_bool(i - 1)) {
filter_map[i - 1] = false;
}
} else {
- filter_map.push_back(true);
+ filter_map[i] = true;
}
}
- for (int i = 0; i < column->size(); ++i) {
+ // It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
+ if (multi_matched_output_row_count > 0) {
+ _join_node->_is_any_probe_match_row_output = false;
+ _process_splited_equal_matched_tuples(
+ row_count - multi_matched_output_row_count,
+ multi_matched_output_row_count, column,
visited_map, right_col_idx,
+ right_col_len, null_map_data, filter_map,
output_block);
+ }
+ for (size_t i = 0; i < row_count; ++i) {
if (filter_map[i]) {
_tuple_is_null_right_flags->emplace_back(null_map_data[i]);
}
@@ -549,10 +699,26 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
auto new_filter_column = ColumnVector<UInt8>::create();
auto& filter_map = new_filter_column->get_data();
- if (!column->empty()) {
+ size_t start_row_idx = 1;
+ // We are handling euqual-conjuncts matched tuples that are
splitted into multiple blocks
+ if (row_count_from_last_probe > 0) {
+ if (_join_node->_is_any_probe_match_row_output) {
+ // if any matched tuple for this probe row is output,
+ // ignore all the following tuples for this probe row.
+ for (int row_idx = 0; row_idx <
row_count_from_last_probe; ++row_idx) {
+ filter_map.emplace_back(false);
+ }
+ start_row_idx += row_count_from_last_probe;
+ if (row_count_from_last_probe < row_count) {
+
filter_map.emplace_back(column->get_bool(row_count_from_last_probe));
+ }
+ } else {
+ filter_map.emplace_back(column->get_bool(0));
+ }
+ } else {
filter_map.emplace_back(column->get_bool(0));
}
- for (int i = 1; i < column->size(); ++i) {
+ for (size_t i = start_row_idx; i < row_count; ++i) {
if (column->get_bool(i) || (same_to_prev[i] &&
filter_map[i - 1])) {
// Only last same element is true, output last one
filter_map.push_back(true);
@@ -561,19 +727,62 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
filter_map.push_back(false);
}
}
+ // It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
+ if (multi_matched_output_row_count > 0) {
+ // If a matched row is output, all the equal-matched
tuples in
+ // the following sub blocks should be ignored
+ _join_node->_is_any_probe_match_row_output =
filter_map[row_count - 1];
+ } else if (row_count_from_last_probe > 0 &&
+ !_join_node->_is_any_probe_match_row_output) {
+ // We are handling euqual-conjuncts matched tuples that
are splitted into multiple blocks,
+ // and no matched tuple has been output in all previous
run.
+ // If a tuple is output in this run, all the following
mathced tuples should be ignored
+ if (filter_map[row_count_from_last_probe - 1]) {
+ _join_node->_is_any_probe_match_row_output = true;
+ }
+ }
output_block->get_by_position(result_column_id).column =
std::move(new_filter_column);
} else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- auto new_filter_column = ColumnVector<UInt8>::create();
- auto& filter_map = new_filter_column->get_data();
-
- if (!column->empty()) {
+ auto new_filter_column =
ColumnVector<UInt8>::create(row_count);
+ auto* __restrict filter_map =
new_filter_column->get_data().data();
+
+ // for left anti join, the probe side is output only when
+ // there are no matched tuples for the probe row.
+
+ // If multiple equal-conjuncts-matched tuples is splitted into
several
+ // sub blocks, just filter out all the
other-conjuncts-NOT-matched tuples at first,
+ // and when processing the last sub block, check whether there
are any
+ // equal-conjuncts-matched tuple is output in all sub blocks,
+ // if there are none, just pick a tuple and output.
+
+ size_t start_row_idx = 1;
+ // We are handling euqual-conjuncts matched tuples that are
splitted into multiple blocks
+ if (row_count_from_last_probe > 0) {
+ if (_join_node->_is_any_probe_match_row_output) {
+ // if any matched tuple for this probe row is output,
+ // ignore all the following tuples for this probe row.
+ for (int row_idx = 0; row_idx <
row_count_from_last_probe; ++row_idx) {
+ filter_map[row_idx] = false;
+ }
+ start_row_idx += row_count_from_last_probe;
+ if (row_count_from_last_probe < row_count) {
+ filter_map[row_count_from_last_probe] =
+
column->get_bool(row_count_from_last_probe) &&
+ visited_map[row_count_from_last_probe];
+ }
+ } else {
+ // Both equal conjuncts and other conjuncts are true
+ filter_map[0] = column->get_bool(0) && visited_map[0];
+ }
+ } else {
// Both equal conjuncts and other conjuncts are true
- filter_map.emplace_back(column->get_bool(0) &&
visited_map[0]);
+ filter_map[0] = column->get_bool(0) && visited_map[0];
}
- for (int i = 1; i < column->size(); ++i) {
+
+ for (size_t i = start_row_idx; i < row_count; ++i) {
if ((visited_map[i] && column->get_bool(i)) ||
(same_to_prev[i] && filter_map[i - 1])) {
// When either of two conditions is meet:
@@ -581,20 +790,59 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
// 2. This row is joined from the same build side row
as the previous row
// Set filter_map[i] to true and filter_map[i - 1] to
false if same_to_prev[i]
// is true.
- filter_map.push_back(true);
+ filter_map[i] = true;
filter_map[i - 1] = !same_to_prev[i] && filter_map[i -
1];
} else {
- filter_map.push_back(false);
+ filter_map[i] = false;
+ }
+ }
+
+ int end_row_idx;
+ if (row_count_from_last_probe > 0) {
+ end_row_idx = row_count - multi_matched_output_row_count;
+ if (!_join_node->_is_any_probe_match_row_output) {
+ // We are handling euqual-conjuncts matched tuples
that are splitted into multiple blocks,
+ // and no matched tuple has been output in all
previous run.
+ // If a tuple is output in this run, all the following
mathced tuples should be ignored
+ if (filter_map[row_count_from_last_probe - 1]) {
+ _join_node->_is_any_probe_match_row_output = true;
+ filter_map[row_count_from_last_probe - 1] = false;
+ }
+ if (is_the_last_sub_block &&
!_join_node->_is_any_probe_match_row_output) {
+ // This is the last sub block of splitted block,
and no equal-conjuncts-matched tuple
+ // is output in all sub blocks, output a tuple for
this probe row
+ filter_map[0] = true;
+ }
+ }
+ if (multi_matched_output_row_count > 0) {
+ // It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
+ // If a matched row is output, all the equal-matched
tuples in
+ // the following sub blocks should be ignored
+ _join_node->_is_any_probe_match_row_output =
filter_map[row_count - 1];
+ filter_map[row_count - 1] = false;
}
+ } else if (multi_matched_output_row_count > 0) {
+ end_row_idx = row_count - multi_matched_output_row_count;
+ // It contains the first sub block of splited
equal-conjuncts-matched tuples of the current probe row
+ // If a matched row is output, all the equal-matched
tuples in
+ // the following sub blocks should be ignored
+ _join_node->_is_any_probe_match_row_output =
filter_map[row_count - 1];
+ filter_map[row_count - 1] = false;
+ } else {
+ end_row_idx = row_count;
}
// Same to the semi join, but change the last value to
opposite value
- for (int i = 1; i < same_to_prev.size(); ++i) {
+ for (int i = 1 + row_count_from_last_probe; i < end_row_idx;
++i) {
if (!same_to_prev[i]) {
filter_map[i - 1] = !filter_map[i - 1];
}
}
- filter_map[same_to_prev.size() - 1] =
!filter_map[same_to_prev.size() - 1];
+ auto non_sub_blocks_matched_row_count =
+ row_count - row_count_from_last_probe -
multi_matched_output_row_count;
+ if (non_sub_blocks_matched_row_count > 0) {
+ filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1];
+ }
output_block->get_by_position(result_column_id).column =
std::move(new_filter_column);
@@ -606,7 +854,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
} else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
auto filter_size = 0;
- for (int i = 0; i < column->size(); ++i) {
+ for (int i = 0; i < row_count; ++i) {
DCHECK(visited_map[i]);
auto result = column->get_bool(i);
*visited_map[i] |= result;
@@ -637,6 +885,42 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
}
+// For left or full outer join with other conjuncts.
+// If multiple equal-conjuncts-matched tuples is splitted into several
+// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at
first,
+// and when processing the last sub block, check whether there are any
+// equal-conjuncts-matched tuple is output in all sub blocks,
+// if not, just pick a tuple and output.
+template <int JoinOpType>
+void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
+ int start_row_idx, int row_count, const ColumnPtr& other_hit_column,
+ std::vector<bool*>& visited_map, int right_col_idx, int right_col_len,
+ UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block*
output_block) {
+ int end_row_idx = start_row_idx + row_count;
+ for (int i = start_row_idx; i < end_row_idx; ++i) {
+ auto join_hit = visited_map[i] != nullptr;
+ auto other_hit = other_hit_column->get_bool(i);
+
+ if (!other_hit) {
+ for (size_t j = 0; j < right_col_len; ++j) {
+ typeid_cast<ColumnNullable*>(
+ std::move(*output_block->get_by_position(j +
right_col_idx).column)
+ .assume_mutable()
+ .get())
+ ->get_null_map_data()[i] = true;
+ }
+ }
+
+ null_map_data[i] = !join_hit || !other_hit;
+ filter_map[i] = other_hit;
+
+ if (join_hit) {
+ *visited_map[i] |= other_hit;
+ }
+ }
+ _join_node->_is_any_probe_match_row_output |=
simd::contain_byte(filter_map, row_count, 1);
+}
+
template <int JoinOpType>
template <typename HashTableType>
Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType&
hash_table_ctx,
@@ -657,7 +941,8 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto& iter = hash_table_ctx.iter;
auto block_size = 0;
- auto& visited_iter = _join_node->_outer_join_pull_visited_iter;
+ auto& visited_iter =
+
std::get<ForwardIterator<Mapped>>(_join_node->_outer_join_pull_visited_iter);
auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
block_size++;
@@ -668,7 +953,6 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
};
if (visited_iter.ok()) {
- DCHECK((std::is_same_v<Mapped, RowRefListWithFlag>));
for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
insert_from_hash_table(visited_iter->block_offset,
visited_iter->row_num);
}
@@ -681,11 +965,17 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
auto& mapped = iter->get_second();
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
if (mapped.visited) {
- for (auto it = mapped.begin(); it.ok(); ++it) {
+ visited_iter = mapped.begin();
+ for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
- insert_from_hash_table(it->block_offset,
it->row_num);
+ insert_from_hash_table(visited_iter->block_offset,
+ visited_iter->row_num);
}
}
+ if (visited_iter.ok()) {
+ // block_size >= _batch_size, quit for loop
+ break;
+ }
} else {
if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
visited_iter = mapped.begin();
@@ -700,17 +990,24 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
}
} else {
- for (auto it = mapped.begin(); it.ok(); ++it) {
+ visited_iter = mapped.begin();
+ for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
- if (it->visited) {
- insert_from_hash_table(it->block_offset,
it->row_num);
+ if (visited_iter->visited) {
+ insert_from_hash_table(visited_iter->block_offset,
+ visited_iter->row_num);
}
} else {
- if (!it->visited) {
- insert_from_hash_table(it->block_offset,
it->row_num);
+ if (!visited_iter->visited) {
+ insert_from_hash_table(visited_iter->block_offset,
+ visited_iter->row_num);
}
}
}
+ if (visited_iter.ok()) {
+ // block_size >= _batch_size, quit for loop
+ break;
+ }
}
}
@@ -733,6 +1030,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
*eos = iter == hash_table_ctx.hash_table.end();
output_block->swap(
mutable_block.to_block(right_semi_anti_without_other ?
right_col_idx : 0));
+ DCHECK(block_size <= _batch_size);
return Status::OK();
} else {
LOG(FATAL) << "Invalid RowRefList";
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9408622f78..79f6bbf447 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -562,6 +562,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
if (_is_outer_join) {
_add_tuple_is_null_column(&temp_block);
}
+ auto output_rows = temp_block.rows();
+ DCHECK(output_rows <= state->batch_size());
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
@@ -932,6 +934,9 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
JoinOpType::value ==
TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN,
RowRefListWithFlag, RowRefList>>;
+
_probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>();
+
_outer_join_pull_visited_iter.emplace<ForwardIterator<RowRefListType>>();
+
if (_build_expr_ctxs.size() == 1 &&
!_store_null_in_hash_table[0]) {
// Single column optimization
switch (_build_expr_ctxs[0]->root()->result_type()) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index f6d6982128..e7ee1feef0 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -195,6 +195,10 @@ using HashTableCtxVariants =
ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
+using HashTableIteratorVariants =
+ std::variant<std::monostate, ForwardIterator<RowRefList>,
+ ForwardIterator<RowRefListWithFlag>,
ForwardIterator<RowRefListWithFlags>>;
+
class HashJoinNode final : public VJoinNodeBase {
public:
// TODO: Best prefetch step is decided by machine. We should also provide a
@@ -263,7 +267,9 @@ private:
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
// for full/right outer join
- ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
+ HashTableIteratorVariants _outer_join_pull_visited_iter;
+
+ HashTableIteratorVariants _probe_row_match_iter;
std::shared_ptr<std::vector<Block>> _build_blocks;
Block _probe_block;
@@ -290,6 +296,9 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
+ // for cases when a probe row matches more than batch size build rows.
+ bool _is_any_probe_match_row_output = false;
+
SharedHashTableContextPtr _shared_hash_table_context = nullptr;
Status _materialize_build_side(RuntimeState* state) override;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]