This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bf16228851 [fix](hashjoin) join produce blocks with rows larger than
batch size (#16166)
bf16228851 is described below
commit bf16228851c569259f2a8d10cd91aa4018199848
Author: TengJianPing <[email protected]>
AuthorDate: Wed Feb 1 16:02:31 2023 +0800
[fix](hashjoin) join produce blocks with rows larger than batch size
(#16166)
* [fix](hashjoin) join produce blocks with rows larger than batch size
* fix
---
.../vec/exec/join/process_hash_table_probe_impl.h | 241 +++++++++++++--------
be/src/vec/exec/join/vhash_join_node.cpp | 2 +
be/src/vec/exec/join/vhash_join_node.h | 6 +
3 files changed, 154 insertions(+), 95 deletions(-)
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 096a54ed43..238a3de0b4 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -130,7 +130,6 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
- DCHECK_EQ(probe_size, column->size() - last_probe_index);
mcol[i]->insert_range_from(*column, last_probe_index,
probe_size);
} else {
DCHECK_GE(_items_counts.size(), last_probe_index + probe_size);
@@ -209,124 +208,177 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
bool all_match_one = true;
int last_probe_index = probe_index;
+ size_t probe_size = 0;
+ auto& probe_row_match_iter =
+
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
{
SCOPED_TIMER(_search_hashtable_timer);
- while (probe_index < probe_rows) {
- if constexpr (ignore_null && need_null_map_for_probe) {
- if ((*null_map)[probe_index]) {
- if constexpr (probe_all) {
- _items_counts[probe_index++] = (uint32_t)1;
- // only full outer / left outer need insert the data
of right table
- if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
- } else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
- }
- ++current_offset;
+ if constexpr (!is_right_semi_anti_join) {
+ // handle ramaining matched rows from last probe row
+ if (probe_row_match_iter.ok()) {
+ for (; probe_row_match_iter.ok() && current_offset <
_batch_size;
+ ++probe_row_match_iter) {
+ if (LIKELY(current_offset < _build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
probe_row_match_iter->block_offset;
+ _build_block_rows[current_offset] =
probe_row_match_iter->row_num;
} else {
- _items_counts[probe_index++] = (uint32_t)0;
+
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
}
- all_match_one = false;
- continue;
- }
- }
- int last_offset = current_offset;
- auto find_result =
- !need_null_map_for_probe
- ? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena)
- : (*null_map)[probe_index]
- ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
- *_arena)) {nullptr,
false}
- : key_getter.find_key(hash_table_ctx.hash_table,
probe_index, *_arena);
- if (probe_index + PREFETCH_STEP < probe_rows) {
- key_getter.template prefetch<true>(hash_table_ctx.hash_table,
- probe_index +
PREFETCH_STEP, *_arena);
- }
-
- if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- if (is_mark_join) {
++current_offset;
-
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
- .get_data()
- .template push_back(!find_result.is_found());
- } else {
- if (!find_result.is_found()) {
- ++current_offset;
- }
}
- } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
- if (is_mark_join) {
- ++current_offset;
-
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
- .get_data()
- .template push_back(find_result.is_found());
- } else {
- if (find_result.is_found()) {
- ++current_offset;
- }
+ _items_counts[probe_index] = current_offset;
+ all_match_one &= (current_offset == 1);
+ if (!probe_row_match_iter.ok()) {
+ ++probe_index;
}
- } else {
- DCHECK(!is_mark_join);
- if (find_result.is_found()) {
- auto& mapped = find_result.get_mapped();
- // TODO: Iterators are currently considered to be a heavy
operation and have a certain impact on performance.
- // We should rethink whether to use this iterator mode in
the future. Now just opt the one row case
- if (mapped.get_row_count() == 1) {
- if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
- mapped.visited = true;
- }
+ probe_size = 1;
+ }
+ }
- if constexpr (!is_right_semi_anti_join) {
+ if (current_offset < _batch_size) {
+ while (probe_index < probe_rows) {
+ if constexpr (ignore_null && need_null_map_for_probe) {
+ if ((*null_map)[probe_index]) {
+ if constexpr (probe_all) {
+ _items_counts[probe_index++] = (uint32_t)1;
+ // only full outer / left outer need insert the
data of right table
if (LIKELY(current_offset <
_build_block_rows.size())) {
- _build_block_offsets[current_offset] =
mapped.block_offset;
- _build_block_rows[current_offset] =
mapped.row_num;
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
} else {
-
_build_block_offsets.emplace_back(mapped.block_offset);
- _build_block_rows.emplace_back(mapped.row_num);
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
+ }
+ ++current_offset;
+ } else {
+ _items_counts[probe_index++] = (uint32_t)0;
+ }
+ all_match_one = false;
+ if constexpr (probe_all) {
+ if (current_offset >= _batch_size) {
+ break;
}
+ }
+ continue;
+ }
+ }
+ int last_offset = current_offset;
+ auto find_result = !need_null_map_for_probe
+ ?
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena)
+ : (*null_map)[probe_index]
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+
probe_index,
+
*_arena)) {nullptr, false}
+ :
key_getter.find_key(hash_table_ctx.hash_table,
+ probe_index,
*_arena);
+ if (probe_index + PREFETCH_STEP < probe_rows) {
+ key_getter.template
prefetch<true>(hash_table_ctx.hash_table,
+ probe_index +
PREFETCH_STEP, *_arena);
+ }
+
+ auto current_probe_index = probe_index;
+ if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (is_mark_join) {
+ ++current_offset;
+
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
+ .get_data()
+ .template push_back(!find_result.is_found());
+ } else {
+ if (!find_result.is_found()) {
++current_offset;
}
+ }
+ ++probe_index;
+ } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
+ if (is_mark_join) {
+ ++current_offset;
+
assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1])
+ .get_data()
+ .template push_back(find_result.is_found());
} else {
- for (auto it = mapped.begin(); it.ok(); ++it) {
+ if (find_result.is_found()) {
+ ++current_offset;
+ }
+ }
+ ++probe_index;
+ } else {
+ DCHECK(!is_mark_join);
+ if (find_result.is_found()) {
+ auto& mapped = find_result.get_mapped();
+ // TODO: Iterators are currently considered to be a
heavy operation and have a certain impact on performance.
+ // We should rethink whether to use this iterator mode
in the future. Now just opt the one row case
+ if (mapped.get_row_count() == 1) {
+ if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
+ mapped.visited = true;
+ }
+
if constexpr (!is_right_semi_anti_join) {
if (LIKELY(current_offset <
_build_block_rows.size())) {
- _build_block_offsets[current_offset] =
it->block_offset;
- _build_block_rows[current_offset] =
it->row_num;
+ _build_block_offsets[current_offset] =
mapped.block_offset;
+ _build_block_rows[current_offset] =
mapped.row_num;
} else {
-
_build_block_offsets.emplace_back(it->block_offset);
-
_build_block_rows.emplace_back(it->row_num);
+
_build_block_offsets.emplace_back(mapped.block_offset);
+
_build_block_rows.emplace_back(mapped.row_num);
}
++current_offset;
}
- }
- if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
- mapped.visited = true;
- }
- }
- } else {
- if constexpr (probe_all) {
- // only full outer / left outer need insert the data
of right table
- if (LIKELY(current_offset < _build_block_rows.size()))
{
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
+ ++probe_index;
} else {
- _build_block_offsets.emplace_back(-1);
- _build_block_rows.emplace_back(-1);
+ if constexpr (!is_right_semi_anti_join) {
+ auto it = mapped.begin();
+ for (; it.ok() && current_offset <
_batch_size; ++it) {
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] =
it->block_offset;
+ _build_block_rows[current_offset] =
it->row_num;
+ } else {
+
_build_block_offsets.emplace_back(it->block_offset);
+
_build_block_rows.emplace_back(it->row_num);
+ }
+ ++current_offset;
+ }
+ probe_row_match_iter = it;
+ if (!it.ok()) {
+ // If all matched rows for the current
probe row are handled,
+ // advance to next probe row.
+ // If not(which means it excceed batch
size), probe_index is not increased and
+ // remaining matched rows for the current
probe row will be
+ // handled in the next call of this
function
+ ++probe_index;
+ }
+ } else {
+ ++probe_index;
+ }
+ if constexpr (std::is_same_v<Mapped,
RowRefListWithFlag>) {
+ mapped.visited = true;
+ }
}
- ++current_offset;
+ } else {
+ if constexpr (probe_all) {
+ // only full outer / left outer need insert the
data of right table
+ if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
+ } else {
+ _build_block_offsets.emplace_back(-1);
+ _build_block_rows.emplace_back(-1);
+ }
+ ++current_offset;
+ }
+ ++probe_index;
}
}
- }
- uint32_t count = (uint32_t)(current_offset - last_offset);
- _items_counts[probe_index++] = count;
- all_match_one &= (count == 1);
- if (current_offset >= _batch_size && !all_match_one) {
- break;
+ uint32_t count = (uint32_t)(current_offset - last_offset);
+ _items_counts[current_probe_index] = count;
+ all_match_one &= (count == 1);
+ if (current_offset >= _batch_size) {
+ break;
+ }
}
+ probe_size = probe_index - last_probe_index +
(probe_row_match_iter.ok() ? 1 : 0);
}
}
@@ -340,8 +392,7 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
SCOPED_TIMER(_probe_side_output_timer);
probe_side_output_column(mcol, _join_node->_left_output_slot_flags,
current_offset,
- last_probe_index, probe_index -
last_probe_index, all_match_one,
- false);
+ last_probe_index, probe_size, all_match_one,
false);
}
output_block->swap(mutable_block.to_block());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 7cdd7248e2..658be0df1f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1027,6 +1027,8 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
JoinOpType::value ==
TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN,
RowRefListWithFlag, RowRefList>>;
+
_probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>();
+
if (_build_expr_ctxs.size() == 1 &&
!_store_null_in_hash_table[0]) {
// Single column optimization
switch (_build_expr_ctxs[0]->root()->result_type()) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index ea72ac7883..9287cd6cfb 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -187,6 +187,10 @@ using HashTableCtxVariants =
ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
+using HashTableIteratorVariants =
+ std::variant<std::monostate, ForwardIterator<RowRefList>,
+ ForwardIterator<RowRefListWithFlag>,
ForwardIterator<RowRefListWithFlags>>;
+
class HashJoinNode final : public VJoinNodeBase {
public:
// TODO: Best prefetch step is decided by machine. We should also provide a
@@ -278,6 +282,8 @@ private:
// for full/right outer join
ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
+ HashTableIteratorVariants _probe_row_match_iter;
+
std::shared_ptr<std::vector<Block>> _build_blocks;
Block _probe_block;
ColumnRawPtrs _probe_columns;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]