This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new ec6ad147ba [fix](hashjoin) join produce blocks with rows larger than 
batch size (#16535)
ec6ad147ba is described below

commit ec6ad147bac0a02fc958f19a912e4b84b41b38af
Author: TengJianPing <[email protected]>
AuthorDate: Thu Feb 9 00:07:04 2023 +0800

    [fix](hashjoin) join produce blocks with rows larger than batch size 
(#16535)
    
    Pick #16402 and #16166 from master
---
 be/src/vec/exec/join/process_hash_table_probe.h    |   6 +
 .../vec/exec/join/process_hash_table_probe_impl.h  | 690 +++++++++++++++------
 be/src/vec/exec/join/vhash_join_node.cpp           |   5 +
 be/src/vec/exec/join/vhash_join_node.h             |  11 +-
 4 files changed, 515 insertions(+), 197 deletions(-)

diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 5d8c8097a7..2347b3fc63 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -66,6 +66,12 @@ struct ProcessHashTableProbe {
                                                 MutableBlock& mutable_block, 
Block* output_block,
                                                 size_t probe_rows);
 
+    void _process_splited_equal_matched_tuples(int start_row_idx, int 
row_count,
+                                               const ColumnPtr& 
other_hit_column,
+                                               std::vector<bool*>& 
visited_map, int right_col_idx,
+                                               int right_col_len, UInt8* 
__restrict null_map_data,
+                                               UInt8* __restrict filter_map, 
Block* output_block);
+
     // Process full outer join/ right join / right semi/anti join to output 
the join result
     // in hash table
     template <typename HashTableType>
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 31e621d1c1..414d8e3ff4 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -19,6 +19,7 @@
 
 #include "common/status.h"
 #include "process_hash_table_probe.h"
+#include "util/simd/bits.h"
 #include "vhash_join_node.h"
 
 namespace doris::vectorized {
@@ -130,7 +131,6 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
         if (output_slot_flags[i]) {
             auto& column = probe_block.get_by_position(i).column;
             if (all_match_one) {
-                DCHECK_EQ(probe_size, column->size() - last_probe_index);
                 mcol[i]->insert_range_from(*column, last_probe_index, 
probe_size);
             } else {
                 DCHECK_GE(_items_counts.size(), last_probe_index + probe_size);
@@ -202,109 +202,161 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
 
     bool all_match_one = true;
     int last_probe_index = probe_index;
+    size_t probe_size = 0;
+    auto& probe_row_match_iter =
+            
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        while (probe_index < probe_rows) {
-            if constexpr (ignore_null && need_null_map_for_probe) {
-                if ((*null_map)[probe_index]) {
-                    if constexpr (probe_all) {
-                        _items_counts[probe_index++] = (uint32_t)1;
-                        // only full outer / left outer need insert the data 
of right table
-                        if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = -1;
-                            _build_block_rows[current_offset] = -1;
-                        } else {
-                            _build_block_offsets.emplace_back(-1);
-                            _build_block_rows.emplace_back(-1);
-                        }
-                        ++current_offset;
+        if constexpr (!is_right_semi_anti_join) {
+            // handle ramaining matched rows from last probe row
+            if (probe_row_match_iter.ok()) {
+                for (; probe_row_match_iter.ok() && current_offset < 
_batch_size;
+                     ++probe_row_match_iter) {
+                    if (LIKELY(current_offset < _build_block_rows.size())) {
+                        _build_block_offsets[current_offset] = 
probe_row_match_iter->block_offset;
+                        _build_block_rows[current_offset] = 
probe_row_match_iter->row_num;
                     } else {
-                        _items_counts[probe_index++] = (uint32_t)0;
+                        
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+                        
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
                     }
-                    all_match_one = false;
-                    continue;
-                }
-            }
-            int last_offset = current_offset;
-            auto find_result =
-                    !need_null_map_for_probe
-                            ? key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena)
-                    : (*null_map)[probe_index]
-                            ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
-                                                           *_arena)) {nullptr, 
false}
-                            : key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena);
-            if (probe_index + PREFETCH_STEP < probe_rows) {
-                key_getter.template prefetch<true>(hash_table_ctx.hash_table,
-                                                   probe_index + 
PREFETCH_STEP, *_arena);
-            }
-
-            if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                          JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-                if (!find_result.is_found()) {
                     ++current_offset;
                 }
-            } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
-                if (find_result.is_found()) {
-                    ++current_offset;
+                _items_counts[probe_index] = current_offset;
+                all_match_one &= (current_offset == 1);
+                if (!probe_row_match_iter.ok()) {
+                    ++probe_index;
                 }
-            } else {
-                if (find_result.is_found()) {
-                    auto& mapped = find_result.get_mapped();
-                    // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
-                    // We should rethink whether to use this iterator mode in 
the future. Now just opt the one row case
-                    if (mapped.get_row_count() == 1) {
-                        if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
-                            mapped.visited = true;
-                        }
-
-                        if constexpr (!is_right_semi_anti_join) {
+                probe_size = 1;
+            }
+        }
+        if (current_offset < _batch_size) {
+            while (probe_index < probe_rows) {
+                if constexpr (ignore_null && need_null_map_for_probe) {
+                    if ((*null_map)[probe_index]) {
+                        if constexpr (probe_all) {
+                            _items_counts[probe_index++] = (uint32_t)1;
+                            // only full outer / left outer need insert the 
data of right table
                             if (LIKELY(current_offset < 
_build_block_rows.size())) {
-                                _build_block_offsets[current_offset] = 
mapped.block_offset;
-                                _build_block_rows[current_offset] = 
mapped.row_num;
+                                _build_block_offsets[current_offset] = -1;
+                                _build_block_rows[current_offset] = -1;
                             } else {
-                                
_build_block_offsets.emplace_back(mapped.block_offset);
-                                _build_block_rows.emplace_back(mapped.row_num);
+                                _build_block_offsets.emplace_back(-1);
+                                _build_block_rows.emplace_back(-1);
                             }
                             ++current_offset;
+                        } else {
+                            _items_counts[probe_index++] = (uint32_t)0;
                         }
-                    } else {
-                        for (auto it = mapped.begin(); it.ok(); ++it) {
+                        all_match_one = false;
+                        if constexpr (probe_all) {
+                            if (current_offset >= _batch_size) {
+                                break;
+                            }
+                        }
+                        continue;
+                    }
+                }
+                int last_offset = current_offset;
+                auto find_result = !need_null_map_for_probe
+                                           ? 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena)
+                                   : (*null_map)[probe_index]
+                                           ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+                                                                          
probe_index,
+                                                                          
*_arena)) {nullptr, false}
+                                           : 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena);
+                if (probe_index + PREFETCH_STEP < probe_rows) {
+                    key_getter.template 
prefetch<true>(hash_table_ctx.hash_table,
+                                                       probe_index + 
PREFETCH_STEP, *_arena);
+                }
+
+                auto current_probe_index = probe_index;
+                if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+                              JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+                    if (!find_result.is_found()) {
+                        ++current_offset;
+                    }
+                    ++probe_index;
+                } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
+                    if (find_result.is_found()) {
+                        ++current_offset;
+                    }
+                    ++probe_index;
+                } else {
+                    if (find_result.is_found()) {
+                        auto& mapped = find_result.get_mapped();
+                        // TODO: Iterators are currently considered to be a 
heavy operation and have a certain impact on performance.
+                        // We should rethink whether to use this iterator mode 
in the future. Now just opt the one row case
+                        if (mapped.get_row_count() == 1) {
+                            if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
+                                mapped.visited = true;
+                            }
+
                             if constexpr (!is_right_semi_anti_join) {
                                 if (LIKELY(current_offset < 
_build_block_rows.size())) {
-                                    _build_block_offsets[current_offset] = 
it->block_offset;
-                                    _build_block_rows[current_offset] = 
it->row_num;
+                                    _build_block_offsets[current_offset] = 
mapped.block_offset;
+                                    _build_block_rows[current_offset] = 
mapped.row_num;
                                 } else {
-                                    
_build_block_offsets.emplace_back(it->block_offset);
-                                    
_build_block_rows.emplace_back(it->row_num);
+                                    
_build_block_offsets.emplace_back(mapped.block_offset);
+                                    
_build_block_rows.emplace_back(mapped.row_num);
                                 }
                                 ++current_offset;
                             }
-                        }
-                        if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
-                            mapped.visited = true;
-                        }
-                    }
-                } else {
-                    if constexpr (probe_all) {
-                        // only full outer / left outer need insert the data 
of right table
-                        if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = -1;
-                            _build_block_rows[current_offset] = -1;
+                            ++probe_index;
                         } else {
-                            _build_block_offsets.emplace_back(-1);
-                            _build_block_rows.emplace_back(-1);
+                            if constexpr (!is_right_semi_anti_join) {
+                                auto it = mapped.begin();
+                                for (; it.ok() && current_offset < 
_batch_size; ++it) {
+                                    if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                        _build_block_offsets[current_offset] = 
it->block_offset;
+                                        _build_block_rows[current_offset] = 
it->row_num;
+                                    } else {
+                                        
_build_block_offsets.emplace_back(it->block_offset);
+                                        
_build_block_rows.emplace_back(it->row_num);
+                                    }
+                                    ++current_offset;
+                                }
+                                probe_row_match_iter = it;
+                                if (!it.ok()) {
+                                    // If all matched rows for the current 
probe row are handled,
+                                    // advance to next probe row.
+                                    // If not(which means it excceed batch 
size), probe_index is not increased and
+                                    // remaining matched rows for the current 
probe row will be
+                                    // handled in the next call of this 
function
+                                    ++probe_index;
+                                }
+                            } else {
+                                ++probe_index;
+                            }
+                            if constexpr (std::is_same_v<Mapped, 
RowRefListWithFlag>) {
+                                mapped.visited = true;
+                            }
                         }
-                        ++current_offset;
+                    } else {
+                        if constexpr (probe_all) {
+                            // only full outer / left outer need insert the 
data of right table
+                            if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                _build_block_offsets[current_offset] = -1;
+                                _build_block_rows[current_offset] = -1;
+                            } else {
+                                _build_block_offsets.emplace_back(-1);
+                                _build_block_rows.emplace_back(-1);
+                            }
+                            ++current_offset;
+                        }
+                        ++probe_index;
                     }
                 }
-            }
 
-            uint32_t count = (uint32_t)(current_offset - last_offset);
-            _items_counts[probe_index++] = count;
-            all_match_one &= (count == 1);
-            if (current_offset >= _batch_size && !all_match_one) {
-                break;
+                uint32_t count = (uint32_t)(current_offset - last_offset);
+                _items_counts[current_probe_index] = count;
+                all_match_one &= (count == 1);
+                if (current_offset >= _batch_size) {
+                    break;
+                }
             }
+            probe_size = probe_index - last_probe_index + 
(probe_row_match_iter.ok() ? 1 : 0);
         }
     }
 
@@ -318,8 +370,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                   JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
         SCOPED_TIMER(_probe_side_output_timer);
         probe_side_output_column(mcol, _join_node->_left_output_slot_flags, 
current_offset,
-                                 last_probe_index, probe_index - 
last_probe_index, all_match_one,
-                                 false);
+                                 last_probe_index, probe_size, all_match_one, 
false);
     }
 
     output_block->swap(mutable_block.to_block());
@@ -383,100 +434,165 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
 
         bool all_match_one = true;
         int last_probe_index = probe_index;
-        while (probe_index < probe_rows) {
-            // ignore null rows
-            if constexpr (ignore_null && need_null_map_for_probe) {
-                if ((*null_map)[probe_index]) {
-                    if constexpr (probe_all) {
-                        _items_counts[probe_index++] = (uint32_t)1;
-                        same_to_prev.emplace_back(false);
-                        visited_map.emplace_back(nullptr);
-                        // only full outer / left outer need insert the data 
of right table
+
+        int row_count_from_last_probe = 0;
+        bool is_the_last_sub_block = false;
+        size_t probe_size = 0;
+        auto& probe_row_match_iter =
+                
std::get<ForwardIterator<Mapped>>(_join_node->_probe_row_match_iter);
+        if (probe_row_match_iter.ok()) {
+            auto origin_offset = current_offset;
+            for (; probe_row_match_iter.ok() && current_offset < _batch_size;
+                 ++probe_row_match_iter) {
+                if (LIKELY(current_offset < _build_block_rows.size())) {
+                    _build_block_offsets[current_offset] = 
probe_row_match_iter->block_offset;
+                    _build_block_rows[current_offset] = 
probe_row_match_iter->row_num;
+                } else {
+                    
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
+                    
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
+                }
+                ++current_offset;
+                visited_map.emplace_back(&probe_row_match_iter->visited);
+            }
+            same_to_prev.emplace_back(false);
+            for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
+                same_to_prev.emplace_back(true);
+            }
+
+            row_count_from_last_probe = current_offset;
+            all_match_one &= (current_offset == 1);
+            _items_counts[probe_index] = current_offset;
+            if (!probe_row_match_iter.ok()) {
+                ++probe_index;
+                is_the_last_sub_block = true;
+            }
+            probe_size = 1;
+        }
+        int multi_matched_output_row_count = 0;
+        if (current_offset < _batch_size) {
+            while (probe_index < probe_rows) {
+                // ignore null rows
+                if constexpr (ignore_null && need_null_map_for_probe) {
+                    if ((*null_map)[probe_index]) {
+                        if constexpr (probe_all) {
+                            _items_counts[probe_index++] = (uint32_t)1;
+                            same_to_prev.emplace_back(false);
+                            visited_map.emplace_back(nullptr);
+                            // only full outer / left outer need insert the 
data of right table
+                            if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                _build_block_offsets[current_offset] = -1;
+                                _build_block_rows[current_offset] = -1;
+                            } else {
+                                _build_block_offsets.emplace_back(-1);
+                                _build_block_rows.emplace_back(-1);
+                            }
+                            ++current_offset;
+                        } else {
+                            _items_counts[probe_index++] = (uint32_t)0;
+                        }
+                        all_match_one = false;
+                        if constexpr (probe_all) {
+                            if (current_offset >= _batch_size) {
+                                break;
+                            }
+                        }
+                        continue;
+                    }
+                }
+
+                auto last_offset = current_offset;
+                auto find_result = !need_null_map_for_probe
+                                           ? 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena)
+                                   : (*null_map)[probe_index]
+                                           ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table,
+                                                                          
probe_index,
+                                                                          
*_arena)) {nullptr, false}
+                                           : 
key_getter.find_key(hash_table_ctx.hash_table,
+                                                                 probe_index, 
*_arena);
+                if (probe_index + PREFETCH_STEP < probe_rows) {
+                    key_getter.template 
prefetch<true>(hash_table_ctx.hash_table,
+                                                       probe_index + 
PREFETCH_STEP, *_arena);
+                }
+
+                auto current_probe_index = probe_index;
+                if (find_result.is_found()) {
+                    auto& mapped = find_result.get_mapped();
+                    auto origin_offset = current_offset;
+                    // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
+                    // We should rethink whether to use this iterator mode in 
the future. Now just opt the one row case
+                    if (mapped.get_row_count() == 1) {
                         if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = -1;
-                            _build_block_rows[current_offset] = -1;
+                            _build_block_offsets[current_offset] = 
mapped.block_offset;
+                            _build_block_rows[current_offset] = mapped.row_num;
                         } else {
-                            _build_block_offsets.emplace_back(-1);
-                            _build_block_rows.emplace_back(-1);
+                            
_build_block_offsets.emplace_back(mapped.block_offset);
+                            _build_block_rows.emplace_back(mapped.row_num);
                         }
                         ++current_offset;
+                        visited_map.emplace_back(&mapped.visited);
+                        ++probe_index;
                     } else {
-                        _items_counts[probe_index++] = (uint32_t)0;
+                        auto multi_match_last_offset = current_offset;
+                        auto it = mapped.begin();
+                        // breaks if row count exceeds batch_size
+                        for (; it.ok() && current_offset < _batch_size; ++it) {
+                            if (LIKELY(current_offset < 
_build_block_rows.size())) {
+                                _build_block_offsets[current_offset] = 
it->block_offset;
+                                _build_block_rows[current_offset] = 
it->row_num;
+                            } else {
+                                
_build_block_offsets.emplace_back(it->block_offset);
+                                _build_block_rows.emplace_back(it->row_num);
+                            }
+                            ++current_offset;
+                            visited_map.emplace_back(&it->visited);
+                        }
+                        probe_row_match_iter = it;
+                        // If all matched rows for the current probe row are 
handled,
+                        // advance to next probe row.
+                        if (!it.ok()) {
+                            ++probe_index;
+                        } else {
+                            // If not(which means it excceed batch size), 
probe_index is not increased and
+                            // remaining matched rows for the current probe 
row will be
+                            // handled in the next call of this function
+                            multi_matched_output_row_count =
+                                    current_offset - multi_match_last_offset;
+                        }
                     }
-                    all_match_one = false;
-                    continue;
-                }
-            }
-
-            auto last_offset = current_offset;
-            auto find_result =
-                    !need_null_map_for_probe
-                            ? key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena)
-                    : (*null_map)[probe_index]
-                            ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
-                                                           *_arena)) {nullptr, 
false}
-                            : key_getter.find_key(hash_table_ctx.hash_table, 
probe_index, *_arena);
-            if (probe_index + PREFETCH_STEP < probe_rows) {
-                key_getter.template prefetch<true>(hash_table_ctx.hash_table,
-                                                   probe_index + 
PREFETCH_STEP, *_arena);
-            }
-            if (find_result.is_found()) {
-                auto& mapped = find_result.get_mapped();
-                auto origin_offset = current_offset;
-                // TODO: Iterators are currently considered to be a heavy 
operation and have a certain impact on performance.
-                // We should rethink whether to use this iterator mode in the 
future. Now just opt the one row case
-                if (mapped.get_row_count() == 1) {
+                    same_to_prev.emplace_back(false);
+                    for (int i = 0; i < current_offset - origin_offset - 1; 
++i) {
+                        same_to_prev.emplace_back(true);
+                    }
+                } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
+                                     JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
+                                     JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
+                                     JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+                    same_to_prev.emplace_back(false);
+                    visited_map.emplace_back(nullptr);
+                    // only full outer / left outer need insert the data of 
right table
+                    // left anti use -1 use a default value
                     if (LIKELY(current_offset < _build_block_rows.size())) {
-                        _build_block_offsets[current_offset] = 
mapped.block_offset;
-                        _build_block_rows[current_offset] = mapped.row_num;
+                        _build_block_offsets[current_offset] = -1;
+                        _build_block_rows[current_offset] = -1;
                     } else {
-                        _build_block_offsets.emplace_back(mapped.block_offset);
-                        _build_block_rows.emplace_back(mapped.row_num);
+                        _build_block_offsets.emplace_back(-1);
+                        _build_block_rows.emplace_back(-1);
                     }
                     ++current_offset;
-                    visited_map.emplace_back(&mapped.visited);
+                    ++probe_index;
                 } else {
-                    for (auto it = mapped.begin(); it.ok(); ++it) {
-                        if (LIKELY(current_offset < _build_block_rows.size())) 
{
-                            _build_block_offsets[current_offset] = 
it->block_offset;
-                            _build_block_rows[current_offset] = it->row_num;
-                        } else {
-                            
_build_block_offsets.emplace_back(it->block_offset);
-                            _build_block_rows.emplace_back(it->row_num);
-                        }
-                        ++current_offset;
-                        visited_map.emplace_back(&it->visited);
-                    }
-                }
-                same_to_prev.emplace_back(false);
-                for (int i = 0; i < current_offset - origin_offset - 1; ++i) {
-                    same_to_prev.emplace_back(true);
+                    // other join, no nothing
+                    ++probe_index;
                 }
-            } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
-                                 JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
-                                 JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                                 JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-                same_to_prev.emplace_back(false);
-                visited_map.emplace_back(nullptr);
-                // only full outer / left outer need insert the data of right 
table
-                // left anti use -1 use a default value
-                if (LIKELY(current_offset < _build_block_rows.size())) {
-                    _build_block_offsets[current_offset] = -1;
-                    _build_block_rows[current_offset] = -1;
-                } else {
-                    _build_block_offsets.emplace_back(-1);
-                    _build_block_rows.emplace_back(-1);
+                uint32_t count = (uint32_t)(current_offset - last_offset);
+                _items_counts[current_probe_index] = count;
+                all_match_one &= (count == 1);
+                if (current_offset >= _batch_size) {
+                    break;
                 }
-                ++current_offset;
-            } else {
-                // other join, no nothing
-            }
-            uint32_t count = (uint32_t)(current_offset - last_offset);
-            _items_counts[probe_index++] = count;
-            all_match_one &= (count == 1);
-            if (current_offset >= _batch_size && !all_match_one) {
-                break;
             }
+            probe_size = probe_index - last_probe_index + 
(probe_row_match_iter.ok() ? 1 : 0);
         }
 
         {
@@ -487,13 +603,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
         {
             SCOPED_TIMER(_probe_side_output_timer);
             probe_side_output_column(mcol, 
_join_node->_left_output_slot_flags, current_offset,
-                                     last_probe_index, probe_index - 
last_probe_index,
-                                     all_match_one, true);
+                                     last_probe_index, probe_size, 
all_match_one, true);
         }
         output_block->swap(mutable_block.to_block());
 
         // dispose the other join conjunct exec
-        if (output_block->rows()) {
+        auto row_count = output_block->rows();
+        if (row_count) {
             int result_column_id = -1;
             int orig_columns = output_block->columns();
             RETURN_IF_ERROR((*_join_node->_vother_join_conjunct_ptr)
@@ -502,13 +618,29 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
             auto column = 
output_block->get_by_position(result_column_id).column;
             if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN ||
                           JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
-                auto new_filter_column = ColumnVector<UInt8>::create();
-                auto& filter_map = new_filter_column->get_data();
+                auto new_filter_column = 
ColumnVector<UInt8>::create(row_count);
+                auto* __restrict filter_map = 
new_filter_column->get_data().data();
 
-                auto null_map_column = 
ColumnVector<UInt8>::create(column->size(), 0);
+                auto null_map_column = ColumnVector<UInt8>::create(row_count, 
0);
                 auto* __restrict null_map_data = 
null_map_column->get_data().data();
 
-                for (int i = 0; i < column->size(); ++i) {
+                // It contains non-first sub block of splited 
equal-conjuncts-matched tuples from last probe row
+                if (row_count_from_last_probe > 0) {
+                    _process_splited_equal_matched_tuples(0, 
row_count_from_last_probe, column,
+                                                          visited_map, 
right_col_idx, right_col_len,
+                                                          null_map_data, 
filter_map, output_block);
+                    // This is the last sub block of splitted block, and no 
equal-conjuncts-matched tuple
+                    // is output in all sub blocks, need to output a tuple for 
this probe row
+                    if (is_the_last_sub_block && 
!_join_node->_is_any_probe_match_row_output) {
+                        filter_map[0] = true;
+                        null_map_data[0] = true;
+                    }
+                }
+
+                int end_idx = row_count - multi_matched_output_row_count;
+                // process equal-conjuncts-matched tuples that are newly 
generated
+                // in this run if there are any.
+                for (size_t i = row_count_from_last_probe; i < end_idx; ++i) {
                     auto join_hit = visited_map[i] != nullptr;
                     auto other_hit = column->get_bool(i);
 
@@ -524,21 +656,39 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
                     }
                     null_map_data[i] = !join_hit || !other_hit;
 
+                    // For cases where one probe row matches multiple build 
rows for equal conjuncts,
+                    // all the other-conjuncts-matched tuples should be output.
+                    //
+                    // Other-conjuncts-NOT-matched tuples fall into two 
categories:
+                    //    1. The beginning consecutive one(s).
+                    //       For these tuples, only the last one is marked to 
output;
+                    //       If there are any following 
other-conjuncts-matched tuples,
+                    //       the last tuple is also marked NOT to output.
+                    //    2. All the remaining other-conjuncts-NOT-matched 
tuples.
+                    //       All these tuples are marked not to output.
                     if (join_hit) {
                         *visited_map[i] |= other_hit;
-                        filter_map.push_back(other_hit || !same_to_prev[i] ||
-                                             (!column->get_bool(i - 1) && 
filter_map.back()));
+                        filter_map[i] = other_hit || !same_to_prev[i] ||
+                                        (!column->get_bool(i - 1) && 
filter_map[i - 1]);
                         // Here to keep only hit join conjunct and other join 
conjunt is true need to be output.
                         // if not, only some key must keep one row will output 
will null right table column
-                        if (same_to_prev[i] && filter_map.back() && 
!column->get_bool(i - 1)) {
+                        if (same_to_prev[i] && filter_map[i] && 
!column->get_bool(i - 1)) {
                             filter_map[i - 1] = false;
                         }
                     } else {
-                        filter_map.push_back(true);
+                        filter_map[i] = true;
                     }
                 }
 
-                for (int i = 0; i < column->size(); ++i) {
+                // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
+                if (multi_matched_output_row_count > 0) {
+                    _join_node->_is_any_probe_match_row_output = false;
+                    _process_splited_equal_matched_tuples(
+                            row_count - multi_matched_output_row_count,
+                            multi_matched_output_row_count, column, 
visited_map, right_col_idx,
+                            right_col_len, null_map_data, filter_map, 
output_block);
+                }
+                for (size_t i = 0; i < row_count; ++i) {
                     if (filter_map[i]) {
                         
_tuple_is_null_right_flags->emplace_back(null_map_data[i]);
                     }
@@ -549,10 +699,26 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
                 auto new_filter_column = ColumnVector<UInt8>::create();
                 auto& filter_map = new_filter_column->get_data();
 
-                if (!column->empty()) {
+                size_t start_row_idx = 1;
+                // We are handling euqual-conjuncts matched tuples that are 
splitted into multiple blocks
+                if (row_count_from_last_probe > 0) {
+                    if (_join_node->_is_any_probe_match_row_output) {
+                        // if any matched tuple for this probe row is output,
+                        // ignore all the following tuples for this probe row.
+                        for (int row_idx = 0; row_idx < 
row_count_from_last_probe; ++row_idx) {
+                            filter_map.emplace_back(false);
+                        }
+                        start_row_idx += row_count_from_last_probe;
+                        if (row_count_from_last_probe < row_count) {
+                            
filter_map.emplace_back(column->get_bool(row_count_from_last_probe));
+                        }
+                    } else {
+                        filter_map.emplace_back(column->get_bool(0));
+                    }
+                } else {
                     filter_map.emplace_back(column->get_bool(0));
                 }
-                for (int i = 1; i < column->size(); ++i) {
+                for (size_t i = start_row_idx; i < row_count; ++i) {
                     if (column->get_bool(i) || (same_to_prev[i] && 
filter_map[i - 1])) {
                         // Only last same element is true, output last one
                         filter_map.push_back(true);
@@ -561,19 +727,62 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
                         filter_map.push_back(false);
                     }
                 }
+                // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
+                if (multi_matched_output_row_count > 0) {
+                    // If a matched row is output, all the equal-matched 
tuples in
+                    // the following sub blocks should be ignored
+                    _join_node->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
+                } else if (row_count_from_last_probe > 0 &&
+                           !_join_node->_is_any_probe_match_row_output) {
+                    // We are handling euqual-conjuncts matched tuples that 
are splitted into multiple blocks,
+                    // and no matched tuple has been output in all previous 
run.
+                    // If a tuple is output in this run, all the following 
mathced tuples should be ignored
+                    if (filter_map[row_count_from_last_probe - 1]) {
+                        _join_node->_is_any_probe_match_row_output = true;
+                    }
+                }
 
                 output_block->get_by_position(result_column_id).column =
                         std::move(new_filter_column);
             } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
                                  JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-                auto new_filter_column = ColumnVector<UInt8>::create();
-                auto& filter_map = new_filter_column->get_data();
-
-                if (!column->empty()) {
+                auto new_filter_column = 
ColumnVector<UInt8>::create(row_count);
+                auto* __restrict filter_map = 
new_filter_column->get_data().data();
+
+                // for left anti join, the probe side is output only when
+                // there are no matched tuples for the probe row.
+
+                // If multiple equal-conjuncts-matched tuples is splitted into 
several
+                // sub blocks, just filter out all the 
other-conjuncts-NOT-matched tuples at first,
+                // and when processing the last sub block, check whether there 
are any
+                // equal-conjuncts-matched tuple is output in all sub blocks,
+                // if there are none, just pick a tuple and output.
+
+                size_t start_row_idx = 1;
+                // We are handling euqual-conjuncts matched tuples that are 
splitted into multiple blocks
+                if (row_count_from_last_probe > 0) {
+                    if (_join_node->_is_any_probe_match_row_output) {
+                        // if any matched tuple for this probe row is output,
+                        // ignore all the following tuples for this probe row.
+                        for (int row_idx = 0; row_idx < 
row_count_from_last_probe; ++row_idx) {
+                            filter_map[row_idx] = false;
+                        }
+                        start_row_idx += row_count_from_last_probe;
+                        if (row_count_from_last_probe < row_count) {
+                            filter_map[row_count_from_last_probe] =
+                                    
column->get_bool(row_count_from_last_probe) &&
+                                    visited_map[row_count_from_last_probe];
+                        }
+                    } else {
+                        // Both equal conjuncts and other conjuncts are true
+                        filter_map[0] = column->get_bool(0) && visited_map[0];
+                    }
+                } else {
                     // Both equal conjuncts and other conjuncts are true
-                    filter_map.emplace_back(column->get_bool(0) && 
visited_map[0]);
+                    filter_map[0] = column->get_bool(0) && visited_map[0];
                 }
-                for (int i = 1; i < column->size(); ++i) {
+
+                for (size_t i = start_row_idx; i < row_count; ++i) {
                     if ((visited_map[i] && column->get_bool(i)) ||
                         (same_to_prev[i] && filter_map[i - 1])) {
                         // When either of two conditions is meet:
@@ -581,20 +790,59 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
                         // 2. This row is joined from the same build side row 
as the previous row
                         // Set filter_map[i] to true and filter_map[i - 1] to 
false if same_to_prev[i]
                         // is true.
-                        filter_map.push_back(true);
+                        filter_map[i] = true;
                         filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 
1];
                     } else {
-                        filter_map.push_back(false);
+                        filter_map[i] = false;
+                    }
+                }
+
+                int end_row_idx;
+                if (row_count_from_last_probe > 0) {
+                    end_row_idx = row_count - multi_matched_output_row_count;
+                    if (!_join_node->_is_any_probe_match_row_output) {
+                        // We are handling euqual-conjuncts matched tuples 
that are splitted into multiple blocks,
+                        // and no matched tuple has been output in all 
previous run.
+                        // If a tuple is output in this run, all the following 
mathced tuples should be ignored
+                        if (filter_map[row_count_from_last_probe - 1]) {
+                            _join_node->_is_any_probe_match_row_output = true;
+                            filter_map[row_count_from_last_probe - 1] = false;
+                        }
+                        if (is_the_last_sub_block && 
!_join_node->_is_any_probe_match_row_output) {
+                            // This is the last sub block of splitted block, 
and no equal-conjuncts-matched tuple
+                            // is output in all sub blocks, output a tuple for 
this probe row
+                            filter_map[0] = true;
+                        }
+                    }
+                    if (multi_matched_output_row_count > 0) {
+                        // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
+                        // If a matched row is output, all the equal-matched 
tuples in
+                        // the following sub blocks should be ignored
+                        _join_node->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
+                        filter_map[row_count - 1] = false;
                     }
+                } else if (multi_matched_output_row_count > 0) {
+                    end_row_idx = row_count - multi_matched_output_row_count;
+                    // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
+                    // If a matched row is output, all the equal-matched 
tuples in
+                    // the following sub blocks should be ignored
+                    _join_node->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
+                    filter_map[row_count - 1] = false;
+                } else {
+                    end_row_idx = row_count;
                 }
 
                 // Same to the semi join, but change the last value to 
opposite value
-                for (int i = 1; i < same_to_prev.size(); ++i) {
+                for (int i = 1 + row_count_from_last_probe; i < end_row_idx; 
++i) {
                     if (!same_to_prev[i]) {
                         filter_map[i - 1] = !filter_map[i - 1];
                     }
                 }
-                filter_map[same_to_prev.size() - 1] = 
!filter_map[same_to_prev.size() - 1];
+                auto non_sub_blocks_matched_row_count =
+                        row_count - row_count_from_last_probe - 
multi_matched_output_row_count;
+                if (non_sub_blocks_matched_row_count > 0) {
+                    filter_map[end_row_idx - 1] = !filter_map[end_row_idx - 1];
+                }
 
                 output_block->get_by_position(result_column_id).column =
                         std::move(new_filter_column);
@@ -606,7 +854,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
                 }
             } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
                 auto filter_size = 0;
-                for (int i = 0; i < column->size(); ++i) {
+                for (int i = 0; i < row_count; ++i) {
                     DCHECK(visited_map[i]);
                     auto result = column->get_bool(i);
                     *visited_map[i] |= result;
@@ -637,6 +885,42 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
     }
 }
 
+// For left or full outer join with other conjuncts.
+// If multiple equal-conjuncts-matched tuples is splitted into several
+// sub blocks, just filter out all the other-conjuncts-NOT-matched tuples at 
first,
+// and when processing the last sub block, check whether there are any
+// equal-conjuncts-matched tuple is output in all sub blocks,
+// if not, just pick a tuple and output.
+template <int JoinOpType>
+void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
+        int start_row_idx, int row_count, const ColumnPtr& other_hit_column,
+        std::vector<bool*>& visited_map, int right_col_idx, int right_col_len,
+        UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* 
output_block) {
+    int end_row_idx = start_row_idx + row_count;
+    for (int i = start_row_idx; i < end_row_idx; ++i) {
+        auto join_hit = visited_map[i] != nullptr;
+        auto other_hit = other_hit_column->get_bool(i);
+
+        if (!other_hit) {
+            for (size_t j = 0; j < right_col_len; ++j) {
+                typeid_cast<ColumnNullable*>(
+                        std::move(*output_block->get_by_position(j + 
right_col_idx).column)
+                                .assume_mutable()
+                                .get())
+                        ->get_null_map_data()[i] = true;
+            }
+        }
+
+        null_map_data[i] = !join_hit || !other_hit;
+        filter_map[i] = other_hit;
+
+        if (join_hit) {
+            *visited_map[i] |= other_hit;
+        }
+    }
+    _join_node->_is_any_probe_match_row_output |= 
simd::contain_byte(filter_map, row_count, 1);
+}
+
 template <int JoinOpType>
 template <typename HashTableType>
 Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& 
hash_table_ctx,
@@ -657,7 +941,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
 
         auto& iter = hash_table_ctx.iter;
         auto block_size = 0;
-        auto& visited_iter = _join_node->_outer_join_pull_visited_iter;
+        auto& visited_iter =
+                
std::get<ForwardIterator<Mapped>>(_join_node->_outer_join_pull_visited_iter);
 
         auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) {
             block_size++;
@@ -668,7 +953,6 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
         };
 
         if (visited_iter.ok()) {
-            DCHECK((std::is_same_v<Mapped, RowRefListWithFlag>));
             for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                 insert_from_hash_table(visited_iter->block_offset, 
visited_iter->row_num);
             }
@@ -681,11 +965,17 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
             auto& mapped = iter->get_second();
             if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
                 if (mapped.visited) {
-                    for (auto it = mapped.begin(); it.ok(); ++it) {
+                    visited_iter = mapped.begin();
+                    for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                         if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                            insert_from_hash_table(it->block_offset, 
it->row_num);
+                            insert_from_hash_table(visited_iter->block_offset,
+                                                   visited_iter->row_num);
                         }
                     }
+                    if (visited_iter.ok()) {
+                        // block_size >= _batch_size, quit for loop
+                        break;
+                    }
                 } else {
                     if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
                         visited_iter = mapped.begin();
@@ -700,17 +990,24 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
                     }
                 }
             } else {
-                for (auto it = mapped.begin(); it.ok(); ++it) {
+                visited_iter = mapped.begin();
+                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        if (it->visited) {
-                            insert_from_hash_table(it->block_offset, 
it->row_num);
+                        if (visited_iter->visited) {
+                            insert_from_hash_table(visited_iter->block_offset,
+                                                   visited_iter->row_num);
                         }
                     } else {
-                        if (!it->visited) {
-                            insert_from_hash_table(it->block_offset, 
it->row_num);
+                        if (!visited_iter->visited) {
+                            insert_from_hash_table(visited_iter->block_offset,
+                                                   visited_iter->row_num);
                         }
                     }
                 }
+                if (visited_iter.ok()) {
+                    // block_size >= _batch_size, quit for loop
+                    break;
+                }
             }
         }
 
@@ -733,6 +1030,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
         *eos = iter == hash_table_ctx.hash_table.end();
         output_block->swap(
                 mutable_block.to_block(right_semi_anti_without_other ? 
right_col_idx : 0));
+        DCHECK(block_size <= _batch_size);
         return Status::OK();
     } else {
         LOG(FATAL) << "Invalid RowRefList";
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9408622f78..79f6bbf447 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -562,6 +562,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
     if (_is_outer_join) {
         _add_tuple_is_null_column(&temp_block);
     }
+    auto output_rows = temp_block.rows();
+    DCHECK(output_rows <= state->batch_size());
     {
         SCOPED_TIMER(_join_filter_timer);
         RETURN_IF_ERROR(
@@ -932,6 +934,9 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
                                                    JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
                                                    JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN,
                                            RowRefListWithFlag, RowRefList>>;
+                
_probe_row_match_iter.emplace<ForwardIterator<RowRefListType>>();
+                
_outer_join_pull_visited_iter.emplace<ForwardIterator<RowRefListType>>();
+
                 if (_build_expr_ctxs.size() == 1 && 
!_store_null_in_hash_table[0]) {
                     // Single column optimization
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index f6d6982128..e7ee1feef0 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -195,6 +195,10 @@ using HashTableCtxVariants =
                      ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
                      
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
 
+using HashTableIteratorVariants =
+        std::variant<std::monostate, ForwardIterator<RowRefList>,
+                     ForwardIterator<RowRefListWithFlag>, 
ForwardIterator<RowRefListWithFlags>>;
+
 class HashJoinNode final : public VJoinNodeBase {
 public:
     // TODO: Best prefetch step is decided by machine. We should also provide a
@@ -263,7 +267,9 @@ private:
     std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
 
     // for full/right outer join
-    ForwardIterator<RowRefListWithFlag> _outer_join_pull_visited_iter;
+    HashTableIteratorVariants _outer_join_pull_visited_iter;
+
+    HashTableIteratorVariants _probe_row_match_iter;
 
     std::shared_ptr<std::vector<Block>> _build_blocks;
     Block _probe_block;
@@ -290,6 +296,9 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
 
+    // for cases when a probe row matches more than batch size build rows.
+    bool _is_any_probe_match_row_output = false;
+
     SharedHashTableContextPtr _shared_hash_table_context = nullptr;
 
     Status _materialize_build_side(RuntimeState* state) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to