This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/new_join by this push:
     new f82f9f10748 some fix (#26761)
f82f9f10748 is described below

commit f82f9f107489d93bf47a45cca5f141e64c34e439
Author: Pxl <[email protected]>
AuthorDate: Sat Nov 18 23:08:53 2023 +0800

    some fix (#26761)
---
 be/src/exprs/bloom_filter_func.h                   |   4 +-
 be/src/exprs/minmax_predicate.h                    |  56 ++++--
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   2 +
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   3 +-
 be/src/vec/common/hash_table/hash_map.h            | 197 +++++++++++++++------
 be/src/vec/common/hash_table/hash_map_context.h    |  15 +-
 be/src/vec/core/block.cpp                          |  15 +-
 be/src/vec/exec/join/process_hash_table_probe.h    |   7 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  |  85 ++++-----
 be/src/vec/exec/join/vhash_join_node.cpp           |  33 +++-
 be/src/vec/exec/join/vhash_join_node.h             |  20 +--
 11 files changed, 283 insertions(+), 154 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index 3f1e25efdc8..1ac2367d1c9 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -59,7 +59,7 @@ public:
     // test_element/find_element only used on vectorized engine
     template <typename T>
     bool test_element(T element) const {
-        if constexpr (std::is_same_v<T, Slice>) {
+        if constexpr (std::is_same_v<T, StringRef>) {
             return _bloom_filter->find(element);
         } else {
             return _bloom_filter->find(HashUtil::fixed_len_to_uint32(element));
@@ -68,7 +68,7 @@ public:
 
     template <typename T>
     void add_element(T element) {
-        if constexpr (std::is_same_v<T, Slice>) {
+        if constexpr (std::is_same_v<T, StringRef>) {
             _bloom_filter->insert(element);
         } else {
             _bloom_filter->insert(HashUtil::fixed_len_to_uint32(element));
diff --git a/be/src/exprs/minmax_predicate.h b/be/src/exprs/minmax_predicate.h
index efc4ebf8630..fcf2ef44a19 100644
--- a/be/src/exprs/minmax_predicate.h
+++ b/be/src/exprs/minmax_predicate.h
@@ -17,10 +17,14 @@
 
 #pragma once
 
+#include <type_traits>
+
 #include "common/object_pool.h"
 #include "runtime/type_limit.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/assert_cast.h"
 
 namespace doris {
 // only used in Runtime Filter
@@ -75,25 +79,51 @@ public:
                     assert_cast<const 
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
                             .get_data();
 
-            const T* data = (T*)col.get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
-                if (!nullmap[i]) {
-                    if constexpr (NeedMin) {
-                        _min = std::min(_min, *(data + i));
+            if constexpr (std::is_same_v<T, StringRef>) {
+                const auto& column_string = assert_cast<const 
vectorized::ColumnString&>(col);
+                for (size_t i = start; i < column->size(); i++) {
+                    if (!nullmap[i]) {
+                        if constexpr (NeedMin) {
+                            _min = std::min(_min, 
column_string.get_data_at(i));
+                        }
+                        if constexpr (NeedMax) {
+                            _max = std::max(_max, 
column_string.get_data_at(i));
+                        }
                     }
-                    if constexpr (NeedMax) {
-                        _max = std::max(_max, *(data + i));
+                }
+            } else {
+                const T* data = (T*)col.get_raw_data().data;
+                for (size_t i = start; i < column->size(); i++) {
+                    if (!nullmap[i]) {
+                        if constexpr (NeedMin) {
+                            _min = std::min(_min, *(data + i));
+                        }
+                        if constexpr (NeedMax) {
+                            _max = std::max(_max, *(data + i));
+                        }
                     }
                 }
             }
         } else {
-            const T* data = (T*)column->get_raw_data().data;
-            for (size_t i = start; i < column->size(); i++) {
-                if constexpr (NeedMin) {
-                    _min = std::min(_min, *(data + i));
+            if constexpr (std::is_same_v<T, StringRef>) {
+                const auto& column_string = assert_cast<const 
vectorized::ColumnString&>(*column);
+                for (size_t i = start; i < column->size(); i++) {
+                    if constexpr (NeedMin) {
+                        _min = std::min(_min, column_string.get_data_at(i));
+                    }
+                    if constexpr (NeedMax) {
+                        _max = std::max(_max, column_string.get_data_at(i));
+                    }
                 }
-                if constexpr (NeedMax) {
-                    _max = std::max(_max, *(data + i));
+            } else {
+                const T* data = (T*)column->get_raw_data().data;
+                for (size_t i = start; i < column->size(); i++) {
+                    if constexpr (NeedMin) {
+                        _min = std::min(_min, *(data + i));
+                    }
+                    if constexpr (NeedMax) {
+                        _max = std::max(_max, *(data + i));
+                    }
                 }
             }
         }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index b459d8eddd7..e9873129815 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -89,7 +89,9 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
 
 void HashJoinProbeLocalState::prepare_for_next() {
     _probe_index = 0;
+    _build_index = 0;
     _ready_probe = false;
+    _last_probe_match = -1;
     _prepare_probe_block();
 }
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 31a3bd38b93..c045980881f 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -103,6 +103,7 @@ private:
     bool _ready_probe = false;
     bool _probe_eos = false;
     std::atomic<bool> _probe_inited = false;
+    int _last_probe_match;
 
     vectorized::Block _probe_block;
     vectorized::ColumnRawPtrs _probe_columns;
@@ -116,8 +117,6 @@ private:
     bool _need_null_map_for_probe = false;
     bool _has_set_need_null_map_for_probe = false;
     vectorized::ColumnUInt8::MutablePtr _null_map_column;
-    // for cases when a probe row matches more than batch size build rows.
-    bool _is_any_probe_match_row_output = false;
     std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
             std::make_unique<HashTableCtxVariants>();
 
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 1ccf34c921c..2f81fc27978 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -25,9 +25,11 @@
 #include <span>
 
 #include "common/compiler_util.h"
+#include "vec/columns/column_filter_helper.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
+
 /** NOTE HashMap could only be used for memmoveable (position independent) 
types.
   * Example: std::string is not position independent in libstdc++ with C++11 
ABI or in libc++.
   * Also, key in hash table must be of type, that zero bytes is compared 
equals to zero key.
@@ -227,20 +229,20 @@ public:
     void prepare_build(size_t num_elem, int batch_size) {
         max_batch_size = batch_size;
         bucket_size = calc_bucket_size(num_elem + 1);
-        first.resize(bucket_size, 0);
+        first.resize(bucket_size + 1);
         next.resize(num_elem);
 
         if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
-            visited.resize(num_elem, 0);
+            visited.resize(num_elem);
         }
     }
 
     uint32_t get_bucket_size() const { return bucket_size; }
 
-    size_t size() const { return next.size(); }
+    size_t size() const { return Base::size() == 0 ? next.size() : 
Base::size(); }
 
     std::vector<uint8_t>& get_visited() { return visited; }
 
@@ -252,32 +254,40 @@ public:
             next[i] = first[bucket_num];
             first[bucket_num] = i;
         }
+        first[bucket_size] = 0; // index = bucket_num means null
     }
 
-    template <int JoinOpType, bool with_other_conjuncts>
+    template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, 
bool need_judge_null>
     auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
                     int probe_idx, uint32_t build_idx, int probe_rows,
-                    uint32_t* __restrict probe_idxs, uint32_t* __restrict 
build_idxs) {
+                    uint32_t* __restrict probe_idxs, uint32_t* __restrict 
build_idxs,
+                    doris::vectorized::ColumnFilterHelper* mark_column) {
+        if constexpr (is_mark_join) {
+            return _find_batch_mark<JoinOpType>(keys, bucket_nums, probe_idx, 
probe_rows,
+                                                probe_idxs, build_idxs, 
mark_column);
+        }
+
+        if constexpr (with_other_conjuncts) {
+            return _find_batch_conjunct<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
+                                                    probe_rows, probe_idxs, 
build_idxs);
+        }
+
         if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
                       JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
-            return _find_batch_inner_outer_join<JoinOpType, 
with_other_conjuncts>(
-                    keys, bucket_nums, probe_idx, build_idx, probe_rows, 
probe_idxs, build_idxs);
+            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx, build_idx,
+                                                            probe_rows, 
probe_idxs, build_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
-                      JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
-            return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums, 
probe_idx, probe_rows,
-                                                          probe_idxs);
+                      JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
+                      JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) 
{
+            return _find_batch_left_semi_anti<JoinOpType, need_judge_null>(
+                    keys, bucket_nums, probe_idx, probe_rows, probe_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
-            if constexpr (!with_other_conjuncts) {
-                return _find_batch_right_semi_anti(keys, bucket_nums, 
probe_idx, probe_rows);
-            } else {
-                return _find_batch_right_semi_anti_conjunct(keys, bucket_nums, 
probe_idx, build_idx,
-                                                            probe_rows, 
probe_idxs, build_idxs);
-            }
+            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
         }
         return std::tuple {0, 0U, 0};
     }
@@ -305,10 +315,41 @@ public:
     }
 
 private:
+    // only LEFT_ANTI_JOIN/LEFT_SEMI_JOIN/NULL_AWARE_LEFT_ANTI_JOIN/CROSS_JOIN 
support mark join
+    template <int JoinOpType>
+    auto _find_batch_mark(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+                          int probe_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
+                          uint32_t* __restrict build_idxs,
+                          doris::vectorized::ColumnFilterHelper* mark_column) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+                build_idx = next[build_idx];
+            }
+
+            if (bucket_nums[probe_idx] == bucket_size) {
+                // mark result as null when probe row is null
+                mark_column->insert_null();
+            } else {
+                bool matched = JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? 
build_idx != 0
+                                                                            : 
build_idx == 0;
+                mark_column->insert_value(matched);
+            }
+
+            probe_idxs[matched_cnt] = probe_idx++;
+            build_idxs[matched_cnt] = build_idx;
+            matched_cnt++;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
     auto _find_batch_right_semi_anti(const Key* __restrict keys,
                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
                                      int probe_rows) {
-        auto matched_cnt = 0;
         while (probe_idx < probe_rows) {
             auto build_idx = first[bucket_nums[probe_idx]];
 
@@ -320,28 +361,95 @@ private:
             }
             probe_idx++;
         }
+        return std::tuple {probe_idx, 0U, 0};
+    }
+
+    template <int JoinOpType, bool need_judge_null>
+    auto _find_batch_left_semi_anti(const Key* __restrict keys,
+                                    const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                    int probe_rows, uint32_t* __restrict 
probe_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            if constexpr (need_judge_null) {
+                if (bucket_nums[probe_idx] == bucket_size) {
+                    probe_idx++;
+                    continue;
+                }
+            }
+
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
+                build_idx = next[build_idx];
+            }
+            bool matched =
+                    JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
+            probe_idxs[matched_cnt] = probe_idx++;
+            matched_cnt += matched;
+        }
         return std::tuple {probe_idx, 0U, matched_cnt};
     }
 
-    auto _find_batch_right_semi_anti_conjunct(const Key* __restrict keys,
-                                              const uint32_t* __restrict 
bucket_nums, int probe_idx,
-                                              uint32_t build_idx, int 
probe_rows,
-                                              uint32_t* __restrict probe_idxs,
-                                              uint32_t* __restrict build_idxs) 
{
+    auto _find_batch_left_semi_anti_conjunct(const Key* __restrict keys,
+                                             const uint32_t* __restrict 
bucket_nums, int probe_idx,
+                                             int probe_rows, uint32_t* 
__restrict probe_idxs,
+                                             uint32_t* __restrict build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::tuple {probe_idx, 0U, matched_cnt};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_conjunct(const Key* __restrict keys, const uint32_t* 
__restrict bucket_nums,
+                              int probe_idx, uint32_t build_idx, int 
probe_rows,
+                              uint32_t* __restrict probe_idxs, uint32_t* 
__restrict build_idxs) {
         auto matched_cnt = 0;
         const auto batch_size = max_batch_size;
 
         auto do_the_probe = [&]() {
             auto matched_cnt_old = matched_cnt;
             while (build_idx && matched_cnt < batch_size) {
-                if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
+                if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                              JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+                    if (!visited[build_idx] && keys[probe_idx] == 
build_keys[build_idx]) {
+                        build_idxs[matched_cnt++] = build_idx;
+                    }
+                } else {
                     build_idxs[matched_cnt++] = build_idx;
+                    matched_cnt += keys[probe_idx] == build_keys[build_idx];
                 }
                 build_idx = next[build_idx];
             }
+
             for (auto i = matched_cnt_old; i < matched_cnt; i++) {
                 probe_idxs[i] = probe_idx;
             }
+
+            if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+                if (!build_idx) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = 0;
+                    matched_cnt++;
+                }
+            }
+
             probe_idx++;
         };
 
@@ -354,32 +462,13 @@ private:
             do_the_probe();
         }
 
-        probe_idx -= (matched_cnt == batch_size && build_idx);
+        probe_idx -=
+                (matched_cnt >= batch_size &&
+                 build_idx); // FULL_OUTER_JOIN may over batch_size when 
emplace 0 into build_idxs
         return std::tuple {probe_idx, build_idx, matched_cnt};
     }
 
     template <int JoinOpType>
-    auto _find_batch_left_semi_anti(const Key* __restrict keys,
-                                    const uint32_t* __restrict bucket_nums, 
int probe_idx,
-                                    int probe_rows, uint32_t* __restrict 
probe_idxs) {
-        auto matched_cnt = 0;
-        const auto batch_size = max_batch_size;
-
-        while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            auto build_idx = first[bucket_nums[probe_idx]];
-
-            while (build_idx && keys[probe_idx] != build_keys[build_idx]) {
-                build_idx = next[build_idx];
-            }
-            bool matched =
-                    JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
-            probe_idxs[matched_cnt] = probe_idx++;
-            matched_cnt += matched;
-        }
-        return std::tuple {probe_idx, 0U, matched_cnt};
-    }
-
-    template <int JoinOpType, bool with_other_conjuncts>
     auto _find_batch_inner_outer_join(const Key* __restrict keys,
                                       const uint32_t* __restrict bucket_nums, 
int probe_idx,
                                       uint32_t build_idx, int probe_rows,
@@ -393,14 +482,13 @@ private:
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = build_idx;
-                    if constexpr (!with_other_conjuncts &&
-                                  (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
-                                   JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN)) {
+                    matched_cnt++;
+                    if constexpr (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                                  JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN) {
                         if (!visited[build_idx]) {
                             visited[build_idx] = 1;
                         }
                     }
-                    matched_cnt++;
                 }
                 build_idx = next[build_idx];
             }
@@ -422,8 +510,7 @@ private:
         }
 
         while (probe_idx < probe_rows && matched_cnt < batch_size) {
-            uint32_t bucket_num = bucket_nums[probe_idx];
-            build_idx = first[bucket_num];
+            build_idx = first[bucket_nums[probe_idx]];
             do_the_probe();
         }
 
@@ -434,11 +521,11 @@ private:
     const Key* __restrict build_keys;
     std::vector<uint8_t> visited;
 
-    uint32_t bucket_size = 0;
-    int max_batch_size = 0;
+    uint32_t bucket_size = 1;
+    int max_batch_size = 4064;
 
-    std::vector<uint32_t> first;
-    std::vector<uint32_t> next;
+    std::vector<uint32_t> first = {0};
+    std::vector<uint32_t> next = {0};
 
     // use in iter hash map
     mutable uint32_t iter_idx = 1;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 64216986365..ad25e1cd83a 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -85,11 +85,8 @@ struct MethodBase {
             return;
         }
         for (uint32_t k = 0; k < num_rows; ++k) {
-            if (null_map[k]) {
-                continue;
-            }
-
-            bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+            bucket_nums[k] =
+                    null_map[k] ? bucket_size : hash_table->hash(keys[k]) & 
(bucket_size - 1);
         }
     }
 
@@ -306,9 +303,9 @@ struct MethodOneNumber : public MethodBase<TData> {
         Base::keys = (FieldType*)(key_columns[0]->is_nullable()
                                           ? assert_cast<const 
ColumnNullable*>(key_columns[0])
                                                     ->get_nested_column_ptr()
-                                          : key_columns[0])
-                             ->get_raw_data()
-                             .data;
+                                                    ->get_raw_data()
+                                                    .data
+                                          : 
key_columns[0]->get_raw_data().data);
         std::string name = key_columns[0]->get_name();
         if (is_join) {
             Base::init_join_bucket_num(num_rows, bucket_size, null_map);
@@ -352,6 +349,8 @@ struct MethodKeysFixed : public MethodBase<TData> {
     void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns,
                      const ColumnRawPtrs& nullmap_columns, std::vector<T>& 
result) {
         size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
+        // set size to 0 at first, then use resize to call default constructor 
on index included from [0, row_numbers) to reset all memory
+        result.clear();
         result.resize(row_numbers);
 
         size_t offset = 0;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8492044215f..8fcdde220a3 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -330,11 +330,14 @@ size_t Block::get_position_by_name(const std::string& 
name) const {
 void Block::check_number_of_rows(bool allow_null_columns) const {
     ssize_t rows = -1;
     for (const auto& elem : data) {
-        if (!elem.column && allow_null_columns) continue;
+        if (!elem.column && allow_null_columns) {
+            continue;
+        }
 
         if (!elem.column) {
-            LOG(FATAL) << fmt::format(
-                    "Column {} in block is nullptr, in method 
check_number_of_rows.", elem.name);
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Column {} in block is nullptr, in method 
check_number_of_rows.",
+                            elem.name);
         }
 
         ssize_t size = elem.column->size();
@@ -342,8 +345,8 @@ void Block::check_number_of_rows(bool allow_null_columns) 
const {
         if (rows == -1) {
             rows = size;
         } else if (rows != size) {
-            LOG(FATAL) << fmt::format("Sizes of columns doesn't match: 
{}:{},{}:{}, col size: {}",
-                                      data.front().name, rows, elem.name, 
size, each_col_size());
+            throw Exception(ErrorCode::INTERNAL_ERROR, "Sizes of columns 
doesn't match, block={}",
+                            dump_structure());
         }
     }
 }
@@ -1065,7 +1068,7 @@ std::unique_ptr<Block> 
Block::create_same_struct_block(size_t size, bool is_rese
         if (is_reserve) {
             column->reserve(size);
         } else {
-            column->resize(size);
+            column->insert_many_defaults(size);
         }
         temp_block->insert({std::move(column), d.type, d.name});
     }
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 34b5dc3ee8d..ed7d0c6443b 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -100,9 +100,6 @@ struct ProcessHashTableProbe {
     std::unique_ptr<Arena> _serialize_key_arena;
     std::vector<char> _probe_side_find_result;
 
-    int _right_col_idx;
-    int _right_col_len;
-
     bool _have_other_join_conjunct;
     bool _is_right_semi_anti;
     std::vector<bool>* _left_output_slot_flags;
@@ -114,7 +111,9 @@ struct ProcessHashTableProbe {
     RuntimeProfile::Counter* _build_side_output_timer;
     RuntimeProfile::Counter* _probe_side_output_timer;
     RuntimeProfile::Counter* _probe_process_hashtable_timer;
-    static constexpr int PROBE_SIDE_EXPLODE_RATE = 1;
+
+    int _right_col_idx;
+    int _right_col_len;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index e3fadf2056f..8cb5bd8cb8f 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -52,7 +52,11 @@ ProcessHashTableProbe<JoinOpType, 
Parent>::ProcessHashTableProbe(Parent* parent,
           _search_hashtable_timer(parent->_search_hashtable_timer),
           _build_side_output_timer(parent->_build_side_output_timer),
           _probe_side_output_timer(parent->_probe_side_output_timer),
-          
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {}
+          
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer),
+          _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
+                                 ? 0
+                                 : _parent->left_table_data_types().size()),
+          _right_col_len(_parent->right_table_data_types().size()) {}
 
 template <int JoinOpType, typename Parent>
 void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
@@ -122,13 +126,9 @@ template <typename HashTableType>
 typename HashTableType::State ProcessHashTableProbe<JoinOpType, 
Parent>::_init_probe_side(
         HashTableType& hash_table_ctx, size_t probe_rows, bool 
with_other_join_conjuncts,
         const uint8_t* null_map) {
-    _right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts
-                             ? 0
-                             : _parent->left_table_data_types().size();
-    _right_col_len = _parent->right_table_data_types().size();
-
-    _probe_indexs.resize(_batch_size);
-    _build_indexs.resize(_batch_size);
+    // may over batch size 1 for some outer join case
+    _probe_indexs.resize(_batch_size + 1);
+    _build_indexs.resize(_batch_size + 1);
 
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
@@ -147,6 +147,10 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              MutableBlock& 
mutable_block,
                                                              Block* 
output_block,
                                                              size_t 
probe_rows) {
+    if (_right_col_len && !_build_block) {
+        return Status::InternalError("build block is nullptr");
+    }
+
     auto& probe_index = _parent->_probe_index;
     auto& build_index = _parent->_build_index;
     auto last_probe_index = probe_index;
@@ -170,10 +174,13 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        auto [new_probe_idx, new_build_idx, new_current_offset] =
-                hash_table_ctx.hash_table->template find_batch<JoinOpType, 
with_other_conjuncts>(
-                        hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
-                        build_index, probe_rows, _probe_indexs.data(), 
_build_indexs.data());
+        auto [new_probe_idx, new_build_idx,
+              new_current_offset] = hash_table_ctx.hash_table->template 
find_batch < JoinOpType,
+              with_other_conjuncts, is_mark_join,
+              need_null_map_for_probe &&
+                      ignore_null > (hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(),
+                                     probe_index, build_index, probe_rows, 
_probe_indexs.data(),
+                                     _build_indexs.data(), mark_column.get());
         probe_index = new_probe_idx;
         build_index = new_build_idx;
         current_offset = new_current_offset;
@@ -236,36 +243,28 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
 
         auto null_map_column = ColumnVector<UInt8>::create(row_count, 0);
         auto* __restrict null_map_data = null_map_column->get_data().data();
-
         // process equal-conjuncts-matched tuples that are newly generated
         // in this run if there are any.
         for (int i = 0; i < row_count; ++i) {
-            auto join_hit = _build_indexs[i];
-            auto other_hit = filter_column_ptr[i];
-
-            if (!other_hit) {
-                for (size_t j = 0; j < _right_col_len; ++j) {
-                    typeid_cast<ColumnNullable*>(
-                            std::move(*output_block->get_by_position(j + 
_right_col_idx).column)
-                                    .assume_mutable()
-                                    .get())
-                            ->get_null_map_data()[i] = true;
-                }
-            }
+            bool join_hit = _build_indexs[i];
+            bool other_hit = filter_column_ptr[i];
+
             null_map_data[i] = !join_hit || !other_hit;
 
-            if (join_hit) {
-                filter_map[i] = other_hit;
+            if (!join_hit) {
+                filter_map[i] = _parent->_last_probe_match != _probe_indexs[i];
             } else {
-                filter_map[i] = true;
+                filter_map[i] = other_hit;
+            }
+            if (filter_map[i]) {
+                _parent->_last_probe_match = _probe_indexs[i];
             }
         }
 
         for (size_t i = 0; i < row_count; ++i) {
             if (filter_map[i]) {
                 _tuple_is_null_right_flags->emplace_back(null_map_data[i]);
-                if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN ||
-                              JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
+                if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
                     visited[_build_indexs[i]] = 1;
                 }
             }
@@ -275,9 +274,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
         auto new_filter_column = ColumnVector<UInt8>::create(row_count);
         auto& filter_map = new_filter_column->get_data();
 
-        size_t start_row_idx = 1;
-        filter_map.emplace_back(filter_column_ptr[0]);
-        for (size_t i = start_row_idx; i < row_count; ++i) {
+        for (size_t i = 0; i < row_count; ++i) {
             filter_map[i] = filter_column_ptr[i];
         }
 
@@ -309,16 +306,8 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
         // equal-conjuncts-matched tuple is output in all sub blocks,
         // if there are none, just pick a tuple and output.
 
-        size_t start_row_idx = 1;
-        // Both equal conjuncts and other conjuncts are true
-        filter_map[0] = filter_column_ptr[0] && _build_indexs[0];
-
-        for (size_t i = start_row_idx; i < row_count; ++i) {
-            if (_build_indexs[i] && filter_column_ptr[i]) {
-                filter_map[i] = _build_indexs[i] && filter_column_ptr[i];
-            } else {
-                filter_map[i] = false;
-            }
+        for (size_t i = 0; i < row_count; ++i) {
+            filter_map[i] = _build_indexs[i] && filter_column_ptr[i];
         }
 
         if (is_mark_join) {
@@ -355,9 +344,8 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_other_join_conjuncts(
                       JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             orig_columns = _right_col_idx;
         }
-        static_cast<void>(
-                Block::filter_block(output_block, result_column_id,
-                                    is_mark_join ? output_block->columns() : 
orig_columns));
+        RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id,
+                                            is_mark_join ? 
output_block->columns() : orig_columns));
     }
 
     return Status::OK();
@@ -374,6 +362,11 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
     auto block_size = _build_indexs.size();
 
     if (block_size) {
+        if (mcol.size() < _right_col_len + _right_col_idx) {
+            return Status::InternalError(
+                    "output block invalid, mcol.size()={}, _right_col_len={}, 
_right_col_idx={}",
+                    mcol.size(), _right_col_len, _right_col_idx);
+        }
         for (size_t j = 0; j < _right_col_len; ++j) {
             const auto& column = *_build_block->safe_get_by_position(j).column;
             mcol[j + _right_col_idx]->insert_indices_from_join(column, 
_build_indexs.data(),
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 2b7447ee56b..6ba5e6e08d4 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <array>
 #include <boost/iterator/iterator_facade.hpp>
+#include <climits>
 #include <functional>
 #include <map>
 #include <memory>
@@ -59,6 +60,7 @@
 #include "vec/common/assert_cast.h"
 #include "vec/common/hash_table/hash_map.h"
 #include "vec/common/uint128.h"
+#include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -74,6 +76,8 @@
 
 namespace doris::vectorized {
 
+constexpr uint32_t JOIN_BUILD_SIZE_LIMIT = 
std::numeric_limits<uint32_t>::max();
+
 template Status HashJoinNode::_extract_join_column<true>(
         Block&, COW<IColumn>::mutable_ptr<ColumnVector<unsigned char>>&,
         std::vector<IColumn const*, std::allocator<IColumn const*>>&,
@@ -91,7 +95,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& 
tnode, const Descr
           
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
                                         ? 
tnode.hash_join_node.hash_output_slot_ids
                                         : std::vector<SlotId> {}),
-          _build_block_idx(0),
           _build_side_mem_used(0),
           _build_side_last_mem_used(0) {
     _runtime_filter_descs = tnode.runtime_filters;
@@ -297,7 +300,9 @@ bool HashJoinNode::need_more_input_data() const {
 
 void HashJoinNode::prepare_for_next() {
     _probe_index = 0;
+    _build_index = 0;
     _ready_probe = false;
+    _last_probe_match = -1;
     _prepare_probe_block();
 }
 
@@ -453,7 +458,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_
     if (!st) {
         return st;
     }
-    RETURN_IF_ERROR(_filter_data_and_build_output(state, output_block, eos, 
&temp_block));
+    RETURN_IF_ERROR(_filter_data_and_build_output(state, output_block, eos, 
&temp_block, false));
     // Here make _join_block release the columns' ptr
     _join_block.set_columns(_join_block.clone_empty_columns());
     mutable_join_block.clear();
@@ -726,18 +731,22 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
         // data from probe side.
         _build_side_mem_used += in_block->allocated_bytes();
 
+        if (_build_side_mutable_block.empty()) {
+            auto tmp_build_block =
+                    
VectorizedUtils::create_empty_columnswithtypename(child(1)->row_desc());
+            _build_side_mutable_block = 
MutableBlock::build_mutable_block(&tmp_build_block);
+            RETURN_IF_ERROR(_build_side_mutable_block.merge(
+                    *(tmp_build_block.create_same_struct_block(1, false))));
+        }
+
         if (in_block->rows() != 0) {
             SCOPED_TIMER(_build_side_merge_block_timer);
-            if (_build_side_mutable_block.empty()) {
-                RETURN_IF_ERROR(_build_side_mutable_block.merge(
-                        *(in_block->create_same_struct_block(1, false))));
-            }
             RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
-            if (_build_side_mutable_block.rows() > 
std::numeric_limits<uint32_t>::max()) {
+            if (_build_side_mutable_block.rows() > JOIN_BUILD_SIZE_LIMIT) {
                 return Status::NotSupported(
                         "Hash join do not support build table rows"
                         " over:" +
-                        std::to_string(std::numeric_limits<uint32_t>::max()));
+                        std::to_string(JOIN_BUILD_SIZE_LIMIT));
             }
         }
     }
@@ -947,6 +956,14 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block) {
     RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, 
*_build_expr_call_timer, res_col_ids));
     if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
         _convert_block_to_null(block);
+        // first row is mocked
+        for (int i = 0; i < block.columns(); i++) {
+            assert_cast<ColumnNullable*>(
+                    
(*std::move(block.safe_get_by_position(i).column)).mutate().get())
+                    ->get_null_map_column()
+                    .get_data()
+                    .data()[0] = 1;
+        }
     }
     // TODO: Now we are not sure whether a column is nullable only by 
ExecNode's `row_desc`
     //  so we have to initialize this flag by the first build block.
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index a58241234d8..922c41f38fd 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -117,7 +117,8 @@ struct ProcessHashTableBuild {
     template <int JoinOpType, bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
         if (short_circuit_for_null || ignore_null) {
-            for (uint32_t i = 0; i < _rows; i++) {
+            // first row is mocked and is null
+            for (uint32_t i = 1; i < _rows; i++) {
                 if ((*null_map)[i]) {
                     *has_null_key = true;
                 }
@@ -296,19 +297,20 @@ private:
     friend struct ProcessHashTableProbe;
 
     void _init_short_circuit_for_probe() {
+        bool empty_block = !_build_block;
         _short_circuit_for_probe =
                 (_has_null_in_build_side && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                  !_is_mark_join) ||
-                (!_build_block && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
-                (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && 
!_is_mark_join) ||
-                (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
-                (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
-                (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
+                (empty_block && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
+                (empty_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && 
!_is_mark_join) ||
+                (empty_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+                (empty_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+                (empty_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
 
         //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         //we could get the result is probe table + null-column(if need output)
         _empty_right_table_need_probe_dispose =
-                (!_build_block && !_have_other_join_conjunct && 
!_is_mark_join) &&
+                (empty_block && !_have_other_join_conjunct && !_is_mark_join) 
&&
                 (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN ||
                  _join_op == TJoinOp::LEFT_ANTI_JOIN);
     }
@@ -381,6 +383,7 @@ private:
     uint32_t _build_index = 0;
     bool _ready_probe = false;
     bool _probe_eos = false;
+    int _last_probe_match;
 
     bool _build_side_ignore_null = false;
 
@@ -393,9 +396,6 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
 
-    // for cases when a probe row matches more than batch size build rows.
-    bool _is_any_probe_match_row_output = false;
-    uint8_t _build_block_idx = 0;
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
     MutableBlock _build_side_mutable_block;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to