This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7a1d112f3ba3dd3f8138779e7807e69bc7f964be
Author: HappenLee <[email protected]>
AuthorDate: Tue Oct 24 14:04:33 2023 +0800

    support right anti/semi/outer join (#25785)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  47 ++--
 .../aggregate_function_collect.h                   |   1 -
 be/src/vec/columns/column_string.cpp               |   4 +-
 be/src/vec/columns/column_vector.cpp               |   2 +-
 be/src/vec/common/hash_table/hash_map.h            | 116 +++++++---
 be/src/vec/common/hash_table/hash_map_context.h    |  27 ++-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 241 +++++++++++----------
 be/src/vec/exec/join/vhash_join_node.cpp           |  46 ++--
 be/src/vec/exec/join/vhash_join_node.h             |  16 +-
 9 files changed, 287 insertions(+), 213 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 99178064a27..f18cab51c02 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -194,29 +194,30 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     Status st = _dependency->extract_join_column<true>(block, null_map_val, 
raw_ptrs, res_col_ids);
 
     st = std::visit(
-            Overload {
-                    [&](std::monostate& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        LOG(FATAL) << "FATAL: uninited hash table";
-                        __builtin_unreachable();
-                        return Status::OK();
-                    },
-                    [&](auto&& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        vectorized::ProcessHashTableBuild<HashTableCtxType,
-                                                          
HashJoinBuildSinkLocalState>
-                                hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
state);
-                        return hash_table_build_process
-                                .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
-                                        arg,
-                                        has_null_value || 
short_circuit_for_null_in_build_side
-                                                ? &null_map_val->get_data()
-                                                : nullptr,
-                                        
&_shared_state->_has_null_in_build_side);
-                    }},
-            *_shared_state->hash_table_variants,
+            Overload {[&](std::monostate& arg, auto join_op, auto 
has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          LOG(FATAL) << "FATAL: uninited hash table";
+                          __builtin_unreachable();
+                          return Status::OK();
+                      },
+                      [&](auto&& arg, auto&& join_op, auto has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          using HashTableCtxType = std::decay_t<decltype(arg)>;
+                          using JoinOpType = std::decay_t<decltype(join_op)>;
+                          vectorized::ProcessHashTableBuild<HashTableCtxType,
+                                                            
HashJoinBuildSinkLocalState>
+                                  hash_table_build_process(rows, block, 
raw_ptrs, this,
+                                                           
state->batch_size(), state);
+                          return hash_table_build_process
+                                  .template run<JoinOpType::value, 
has_null_value,
+                                                
short_circuit_for_null_in_build_side>(
+                                          arg,
+                                          has_null_value || 
short_circuit_for_null_in_build_side
+                                                  ? &null_map_val->get_data()
+                                                  : nullptr,
+                                          
&_shared_state->_has_null_in_build_side);
+                      }},
+            *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
             vectorized::make_bool_variant(_build_side_ignore_null),
             
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side));
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h 
b/be/src/vec/aggregate_functions/aggregate_function_collect.h
index 3a4b0bad20f..734f846a2f5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_collect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -134,7 +134,6 @@ struct AggregateFunctionCollectSetData<StringRef, HasLimit> 
{
         if (max_size == -1) {
             max_size = rhs.max_size;
         }
-        max_size = rhs.max_size;
 
         for (auto& rhs_elem : rhs.data_set) {
             if constexpr (HasLimit::value) {
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 7d7597e7dc3..2f76e65278b 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -137,7 +137,7 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const uint32_t* __res
     auto* dst_offsets_data = offsets.data();
 
     for (auto* __restrict x = indices_begin; x != indices_end; ++x) {
-        const auto offset = *x;
+        int64_t offset = *x;
         total_chars_size += src_offset_data[offset] - src_offset_data[offset - 
1];
         dst_offsets_data[dst_offsets_pos++] = total_chars_size;
     }
@@ -150,7 +150,7 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const uint32_t* __res
 
     size_t dst_chars_pos = old_char_size;
     for (auto* __restrict x = indices_begin; x != indices_end; ++x) {
-        const auto offset = *x;
+        const int64_t offset = *x;
         const auto start = src_offset_data[offset - 1];
         const auto end = src_offset_data[offset];
 
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 1f053c4ce4f..c4ca97df7a2 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -379,7 +379,7 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src,
         // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
         for (int i = 0; i < new_size; ++i) {
             data[origin_size + i] =
-                    (indices_begin[i] == 0) + (indices_begin[i] == 0) * 
src_data[indices_begin[i]];
+                    (indices_begin[i] == 0) + (indices_begin[i] != 0) * 
src_data[indices_begin[i]];
         }
     } else {
         // real data : indices_begin[i] == 0 what at is meaningless
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 89dfe7f8aac..ab094d69a67 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -216,53 +216,102 @@ public:
         return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
     }
 
-    void reserve(int num_elem) {
+    template <int JoinOpType>
+    void prepare_build(size_t num_elem, int batch_size) {
+        max_batch_size = batch_size;
         bucket_size = calc_bucket_size(num_elem + 1);
         first.resize(bucket_size, 0);
         next.resize(num_elem);
-    }
 
-    void build(const Key* __restrict keys, const size_t* __restrict 
hash_values, size_t num_elem,
-               int batch_size) {
-        _batch_size = batch_size;
-        bucket_size = calc_bucket_size(num_elem);
-        first.resize(bucket_size, 0);
-        next.resize(num_elem);
+        if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            visited.resize(num_elem, 0);
+        }
+    }
 
+    void build(const Key* __restrict keys, const size_t* __restrict 
bucket_nums,
+               const size_t num_elem) {
         build_keys = keys;
         for (size_t i = 1; i < num_elem; i++) {
-            next[i] = first[hash_values[i]];
-            first[hash_values[i]] = i;
+            next[i] = first[bucket_nums[i]];
+            first[bucket_nums[i]] = i;
         }
     }
 
     template <int JoinOpType>
-    auto find_batch(const Key* __restrict keys, const size_t* __restrict 
hash_values, int probe_idx,
+    auto find_batch(const Key* __restrict keys, const size_t* __restrict 
bucket_nums, int probe_idx,
                     int probe_rows, std::vector<uint32_t>& probe_idxs,
                     std::vector<uint32_t>& build_idxs) {
         if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
-                      JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN) {
-            return _find_batch_inner_outer_join<JoinOpType>(keys, hash_values, 
probe_idx,
+                      JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
+            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx,
                                                             probe_rows, 
probe_idxs, build_idxs);
         }
         if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
                       JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
-            return _find_batch_left_semi_anti<JoinOpType>(keys, hash_values, 
probe_idx, probe_rows,
+            return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums, 
probe_idx, probe_rows,
                                                           probe_idxs);
         }
+        if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
+        }
         return std::pair {0, 0};
     }
 
     size_t get_bucket_mask() { return bucket_size - 1; }
 
+    template <int JoinOpType>
+    bool iterate_map(std::vector<uint32_t>& build_idxs) const {
+        const auto batch_size = max_batch_size;
+        const auto elem_num = visited.size();
+        int count = 0;
+        build_idxs.reserve(batch_size);
+
+        while (count < batch_size && iter_idx < elem_num) {
+            const auto matched = visited[iter_idx];
+            build_idxs[count] = iter_idx;
+            if constexpr (JoinOpType != doris::TJoinOp::RIGHT_ANTI_JOIN) {
+                count += !matched;
+            } else {
+                count += matched;
+            }
+            iter_idx++;
+        }
+
+        build_idxs.resize(count);
+        return iter_idx == elem_num;
+    }
+
 private:
+    auto _find_batch_right_semi_anti(const Key* __restrict keys,
+                                     const size_t* __restrict bucket_nums, int 
probe_idx,
+                                     int probe_rows) {
+        while (LIKELY(probe_idx < probe_rows)) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    visited[build_idx] = 1;
+                }
+                build_idx = next[build_idx];
+            }
+        }
+        return std::pair {probe_rows, 0};
+    }
+
     template <int JoinOpType>
     auto _find_batch_left_semi_anti(const Key* __restrict keys,
-                                    const size_t* __restrict hash_values, int 
probe_idx,
+                                    const size_t* __restrict bucket_nums, int 
probe_idx,
                                     int probe_rows, std::vector<uint32_t>& 
probe_idxs) {
+        const auto batch_size = max_batch_size;
         int matched_cnt = 0;
-        while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) {
-            uint32_t build_idx = first[hash_values[probe_idx]];
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
+            uint32_t build_idx = first[bucket_nums[probe_idx]];
             while (build_idx) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     break;
@@ -279,31 +328,36 @@ private:
 
     template <int JoinOpType>
     auto _find_batch_inner_outer_join(const Key* __restrict keys,
-                                      const size_t* __restrict hash_values, 
int probe_idx,
+                                      const size_t* __restrict bucket_nums, 
int probe_idx,
                                       int probe_rows, std::vector<uint32_t>& 
probe_idxs,
                                       std::vector<uint32_t>& build_idxs) {
         int matched_cnt = 0;
         uint32_t build_idx = 0;
+        const auto batch_size = max_batch_size;
 
         auto do_the_probe = [&]() {
-            while (build_idx && LIKELY(matched_cnt < _batch_size)) {
+            while (build_idx && LIKELY(matched_cnt < batch_size)) {
                 if (keys[probe_idx] == build_keys[build_idx]) {
                     probe_idxs[matched_cnt] = probe_idx;
                     build_idxs[matched_cnt] = build_idx;
+                    if constexpr (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                                  JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN) {
+                        visited[build_idx] = 1;
+                    }
                     matched_cnt++;
                 }
                 build_idx = next[build_idx];
             }
 
-            if constexpr (JoinOpType != doris::TJoinOp::INNER_JOIN) {
+            if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
                 // `(!matched_cnt || probe_idxs[matched_cnt - 1] != 
probe_idx)` means not match one build side
                 if (!build_idx && (!matched_cnt || probe_idxs[matched_cnt - 1] 
!= probe_idx)) {
                     probe_idxs[matched_cnt] = probe_idx;
-                    build_idxs[matched_cnt] = build_idx;
+                    build_idxs[matched_cnt] = 0;
                     matched_cnt++;
                 }
             }
-
             probe_idx++;
         };
 
@@ -314,25 +368,31 @@ private:
             current_build_idx = 0;
             do_the_probe();
         }
-        while (LIKELY(probe_idx < probe_rows && matched_cnt < _batch_size)) {
-            build_idx = first[hash_values[probe_idx]];
+        while (LIKELY(probe_idx < probe_rows && matched_cnt < batch_size)) {
+            build_idx = first[bucket_nums[probe_idx]];
             do_the_probe();
         }
 
-        if (matched_cnt == _batch_size && build_idx) {
-            current_probe_idx = probe_idx - 1;
+        if (matched_cnt == batch_size && build_idx) {
+            probe_idx--;
+            current_probe_idx = probe_idx;
             current_build_idx = build_idx;
         }
         return std::pair {probe_idx, matched_cnt};
     }
 
     const Key* __restrict build_keys;
-    uint32_t bucket_size = 0;
-    int _batch_size = 0;
+    std::vector<uint8_t> visited;
 
+    int max_batch_size = 0;
+
+    // use to save the last probe idx if the matched == max_batch_size
     int current_probe_idx = -1;
     uint32_t current_build_idx = 0;
 
+    mutable uint32_t iter_idx = 1;
+
+    uint32_t bucket_size = 0;
     std::vector<uint32_t> first;
     std::vector<uint32_t> next;
     Cell cell;
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 6e8f9e0746c..549301ae477 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -69,7 +69,7 @@ struct MethodBase {
         }
     }
     virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                                      const uint8_t* null_map = nullptr) = 0;
+                                      const uint8_t* null_map = nullptr, bool 
is_build = false) = 0;
 
     void init_hash_values(size_t num_rows, const uint8_t* null_map) {
         if (null_map == nullptr) {
@@ -81,7 +81,6 @@ struct MethodBase {
             if (null_map[k]) {
                 continue;
             }
-
             hash_values[k] = hash_table->hash(keys[k]);
         }
     }
@@ -95,7 +94,7 @@ struct MethodBase {
 
     void calculate_bucket(size_t num_rows) {
         size_t mask = hash_table->get_bucket_mask();
-        for (int i = 0; i < num_rows; i++) {
+        for (size_t i = 0; i < num_rows; i++) {
             hash_values[i] &= mask;
         }
     }
@@ -169,7 +168,7 @@ struct MethodSerialized : public MethodBase<TData> {
     }
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
         Base::arena.clear();
         stored_keys.resize(num_rows);
 
@@ -227,7 +226,7 @@ struct MethodStringNoCache : public MethodBase<TData> {
     std::vector<StringRef> stored_keys;
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
         const IColumn& column = *key_columns[0];
         const auto& column_string = assert_cast<const ColumnString&>(
                 column.is_nullable()
@@ -263,7 +262,7 @@ struct MethodOneNumber : public MethodBase<TData> {
                                                       FieldType>;
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
         Base::keys = (FieldType*)(key_columns[0]->is_nullable()
                                           ? assert_cast<const 
ColumnNullable*>(key_columns[0])
                                                     ->get_nested_column_ptr()
@@ -296,7 +295,9 @@ struct MethodKeysFixed : public MethodBase<TData> {
 
     using State = ColumnsHashing::HashMethodKeysFixed<typename Base::Value, 
Key, Mapped,
                                                       has_nullable_keys>;
-
+    // need keep until the hash probe end.
+    std::vector<Key> build_stored_keys;
+    // refresh each time probe
     std::vector<Key> stored_keys;
     Sizes key_sizes;
 
@@ -365,7 +366,7 @@ struct MethodKeysFixed : public MethodBase<TData> {
     }
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_build 
= false) override {
         ColumnRawPtrs actual_columns;
         ColumnRawPtrs null_maps;
         if (has_nullable_keys) {
@@ -383,8 +384,14 @@ struct MethodKeysFixed : public MethodBase<TData> {
         } else {
             actual_columns = key_columns;
         }
-        stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps);
-        Base::keys = stored_keys.data();
+
+        if (is_build) {
+            build_stored_keys = pack_fixeds<Key>(num_rows, actual_columns, 
null_maps);
+            Base::keys = build_stored_keys.data();
+        } else {
+            stored_keys = pack_fixeds<Key>(num_rows, actual_columns, 
null_maps);
+            Base::keys = stored_keys.data();
+        }
         Base::init_hash_values(num_rows, null_map);
     }
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index f4bef996911..fc408711768 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -85,7 +85,7 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
         _tuple_is_null_right_flags->resize(size);
         auto* __restrict null_data = _tuple_is_null_right_flags->data();
         for (int i = 0; i < size; ++i) {
-            null_data[i] = _build_block_rows[i] == -1;
+            null_data[i] = _build_block_rows[i] == 0;
         }
     }
 }
@@ -570,131 +570,134 @@ template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
         HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* 
output_block,
         bool* eos) {
-    using Mapped = typename HashTableType::Mapped;
+    //    using Mapped = typename HashTableType::Mapped;
     SCOPED_TIMER(_probe_process_hashtable_timer);
-    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
-                  std::is_same_v<Mapped, RowRefListWithFlags>) {
-        hash_table_ctx.init_iterator();
-        auto& mcol = mutable_block.mutable_columns();
-
-        bool right_semi_anti_without_other = _is_right_semi_anti && 
!_have_other_join_conjunct;
-        int right_col_idx =
-                right_semi_anti_without_other ? 0 : 
_parent->left_table_data_types().size();
-        int right_col_len = _parent->right_table_data_types().size();
-
-        auto& iter = hash_table_ctx.iterator;
-        auto block_size = 0;
-        auto& visited_iter =
-                
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
-        _build_blocks_locs.resize(_batch_size);
-        if (visited_iter.ok()) {
-            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    _build_blocks_locs[block_size++] = visited_iter->row_num;
-                }
-            } else {
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        if (visited_iter->visited) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                    } else {
-                        if (!visited_iter->visited) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                    }
-                }
-            }
-            if (!visited_iter.ok()) {
-                ++iter;
-            }
-        }
+    //    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
+    //                  std::is_same_v<Mapped, RowRefListWithFlags>) {
+    //        hash_table_ctx.init_iterator();
+    //        auto& mcol = mutable_block.mutable_columns();
+    //
+    //        bool right_semi_anti_without_other = _is_right_semi_anti && 
!_have_other_join_conjunct;
+    //        int right_col_idx =
+    //                right_semi_anti_without_other ? 0 : 
_parent->left_table_data_types().size();
+    //        int right_col_len = _parent->right_table_data_types().size();
+    //
+    //        auto& iter = hash_table_ctx.iterator;
+    //        auto block_size = 0;
+    //        auto& visited_iter =
+    //                
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
+    //        _build_blocks_locs.resize(_batch_size);
+    //        if (visited_iter.ok()) {
+    //            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
+    //                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
+    //                    _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                }
+    //            } else {
+    //                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
+    //                    if constexpr (JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN) {
+    //                        if (visited_iter->visited) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                    } else {
+    //                        if (!visited_iter->visited) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                    }
+    //                }
+    //            }
+    //            if (!visited_iter.ok()) {
+    //                ++iter;
+    //            }
+    //        }
+    //
+    //        for (; iter != hash_table_ctx.hash_table->end() && block_size < 
_batch_size; ++iter) {
+    //            auto& mapped = iter->get_second();
+    //            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
+    //                if (mapped.visited) {
+    //                    if constexpr (JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN) {
+    //                        visited_iter = mapped.begin();
+    //                        for (; visited_iter.ok() && block_size < 
_batch_size; ++visited_iter) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                        if (visited_iter.ok()) {
+    //                            // block_size >= _batch_size, quit for loop
+    //                            break;
+    //                        }
+    //                    }
+    //                } else {
+    //                    if constexpr (JoinOpType != 
TJoinOp::RIGHT_SEMI_JOIN) {
+    //                        visited_iter = mapped.begin();
+    //                        for (; visited_iter.ok() && block_size < 
_batch_size; ++visited_iter) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                        if (visited_iter.ok()) {
+    //                            // block_size >= _batch_size, quit for loop
+    //                            break;
+    //                        }
+    //                    }
+    //                }
+    //            } else {
+    //                visited_iter = mapped.begin();
+    //                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
+    //                    if constexpr (JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN) {
+    //                        if (visited_iter->visited) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                    } else {
+    //                        if (!visited_iter->visited) {
+    //                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
+    //                        }
+    //                    }
+    //                }
+    //                if (visited_iter.ok()) {
+    //                    // block_size >= _batch_size, quit for loop
+    //
+    //                    const auto size = _build_blocks_locs.size();
+    //                    _build_block_rows.resize(_build_blocks_locs.size());
+    //                    for (int i = 0; i < size; i++) {
+    //                        _build_block_rows[i] = _build_blocks_locs[i];
+    //                    }                    break;
+    //                }
+    //            }
+    //        }
+    //        _build_blocks_locs.resize(block_size);
 
-        for (; iter != hash_table_ctx.hash_table->end() && block_size < 
_batch_size; ++iter) {
-            auto& mapped = iter->get_second();
-            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                if (mapped.visited) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        visited_iter = mapped.begin();
-                        for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                        if (visited_iter.ok()) {
-                            // block_size >= _batch_size, quit for loop
-                            break;
-                        }
-                    }
-                } else {
-                    if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
-                        visited_iter = mapped.begin();
-                        for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                        if (visited_iter.ok()) {
-                            // block_size >= _batch_size, quit for loop
-                            break;
-                        }
-                    }
-                }
-            } else {
-                visited_iter = mapped.begin();
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        if (visited_iter->visited) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                    } else {
-                        if (!visited_iter->visited) {
-                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
-                        }
-                    }
-                }
-                if (visited_iter.ok()) {
-                    // block_size >= _batch_size, quit for loop
-                    break;
-                }
-            }
-        }
-        _build_blocks_locs.resize(block_size);
-
-        const auto size = _build_blocks_locs.size();
-        _build_block_rows.resize(_build_blocks_locs.size());
-        for (int i = 0; i < size; i++) {
-            _build_block_rows[i] = _build_blocks_locs[i];
-        }
-
-        for (size_t j = 0; j < right_col_len; ++j) {
-            const auto& column = *_build_block->get_by_position(j).column;
-            mcol[j + right_col_idx]->insert_indices_from(
-                    column, _build_block_rows.data(),
-                    _build_block_rows.data() + _build_block_rows.size());
-        }
+    auto& mcol = mutable_block.mutable_columns();
+    *eos = hash_table_ctx.hash_table->template 
iterate_map<JoinOpType>(_build_block_rows);
+    auto block_size = _build_block_rows.size();
+    int right_col_idx = _parent->left_table_data_types().size();
+    int right_col_len = _parent->right_table_data_types().size();
+
+    for (size_t j = 0; j < right_col_len; ++j) {
+        const auto& column = *_build_block->get_by_position(j).column;
+        mcol[j + right_col_idx]->insert_indices_from(
+                column, _build_block_rows.data(),
+                _build_block_rows.data() + _build_block_rows.size());
+    }
 
-        // just resize the left table column in case with other conjunct to 
make block size is not zero
-        if (_is_right_semi_anti && _have_other_join_conjunct) {
-            auto target_size = mcol[right_col_idx]->size();
-            for (int i = 0; i < right_col_idx; ++i) {
-                mcol[i]->resize(target_size);
-            }
+    // just resize the left table column in case with other conjunct to make 
block size is not zero
+    if (_is_right_semi_anti && _have_other_join_conjunct) {
+        auto target_size = mcol[right_col_idx]->size();
+        for (int i = 0; i < right_col_idx; ++i) {
+            mcol[i]->resize(target_size);
         }
+    }
 
-        // right outer join / full join need insert data of left table
-        if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN ||
-                      JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
-            for (int i = 0; i < right_col_idx; ++i) {
-                
assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
-            }
-            _tuple_is_null_left_flags->resize_fill(block_size, 1);
+    // right outer join / full join need insert data of left table
+    if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN ||
+                  JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
+        for (int i = 0; i < right_col_idx; ++i) {
+            
assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size);
         }
-        *eos = iter == hash_table_ctx.hash_table->end();
-        output_block->swap(
-                mutable_block.to_block(right_semi_anti_without_other ? 
right_col_idx : 0));
-        DCHECK(block_size <= _batch_size);
-        return Status::OK();
-    } else {
-        LOG(FATAL) << "Invalid RowRefList";
-        return Status::InvalidArgument("Invalid RowRefList");
+        _tuple_is_null_left_flags->resize_fill(block_size, 1);
     }
+    output_block->swap(mutable_block.to_block(0));
+    DCHECK(block_size <= _batch_size);
+    return Status::OK();
+    //    else {
+    //        LOG(FATAL) << "Invalid RowRefList";
+    //        return Status::InvalidArgument("Invalid RowRefList");
+    //    }
 }
 
 template <int JoinOpType, typename Parent>
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index e765323f39b..cd2d4eb6445 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -972,28 +972,30 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block) {
     Status st = _extract_join_column<true>(block, null_map_val, raw_ptrs, 
res_col_ids);
 
     st = std::visit(
-            Overload {
-                    [&](std::monostate& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        LOG(FATAL) << "FATAL: uninited hash table";
-                        __builtin_unreachable();
-                        return Status::OK();
-                    },
-                    [&](auto&& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
-                                hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
state);
-                        return hash_table_build_process
-                                .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
-                                        arg,
-                                        has_null_value || 
short_circuit_for_null_in_build_side
-                                                ? &null_map_val->get_data()
-                                                : nullptr,
-                                        &_has_null_in_build_side);
-                    }},
-            *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
+            Overload {[&](std::monostate& arg, auto join_op, auto 
has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          LOG(FATAL) << "FATAL: uninited hash table";
+                          __builtin_unreachable();
+                          return Status::OK();
+                      },
+                      [&](auto&& arg, auto&& join_op, auto has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          using HashTableCtxType = std::decay_t<decltype(arg)>;
+                          using JoinOpType = std::decay_t<decltype(join_op)>;
+
+                          ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
+                                  hash_table_build_process(rows, block, 
raw_ptrs, this,
+                                                           
state->batch_size(), state);
+                          return hash_table_build_process
+                                  .template run<JoinOpType::value, 
has_null_value,
+                                                
short_circuit_for_null_in_build_side>(
+                                          arg,
+                                          has_null_value || 
short_circuit_for_null_in_build_side
+                                                  ? &null_map_val->get_data()
+                                                  : nullptr,
+                                          &_has_null_in_build_side);
+                      }},
+            *_hash_table_variants, _join_op_variants, 
make_bool_variant(_build_side_ignore_null),
             make_bool_variant(_short_circuit_for_null_in_build_side));
 
     return st;
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index c15e3674642..5fc1c3c6cae 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -105,7 +105,7 @@ using ProfileCounter = RuntimeProfile::Counter;
 
 template <class HashTableContext, typename Parent>
 struct ProcessHashTableBuild {
-    ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
+    ProcessHashTableBuild(uint32_t rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
                           Parent* parent, int batch_size, RuntimeState* state)
             : _rows(rows),
               _acquired_block(acquired_block),
@@ -115,7 +115,7 @@ struct ProcessHashTableBuild {
               _state(state),
               
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
 
-    template <bool ignore_null, bool short_circuit_for_null>
+    template <int JoinOpType, bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
         if (short_circuit_for_null || ignore_null) {
             for (int i = 0; i < _rows; i++) {
@@ -131,18 +131,20 @@ struct ProcessHashTableBuild {
         if (!_parent->runtime_filter_descs().empty()) {
             _parent->_inserted_blocks.insert(&_acquired_block);
         }
-        hash_table_ctx.hash_table->reserve(_rows);
+
+        SCOPED_TIMER(_parent->_build_table_insert_timer);
+        hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_state->batch_size());
+
         hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
-                                            null_map ? null_map->data() : 
nullptr);
+                                            null_map ? null_map->data() : 
nullptr, true);
         hash_table_ctx.calculate_bucket(_rows);
-        SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.hash_values.data(),
-                                         _rows, _state->batch_size());
+                                         _rows);
         return Status::OK();
     }
 
 private:
-    const int _rows;
+    const uint32_t _rows;
     Block& _acquired_block;
     ColumnRawPtrs& _build_raw_ptrs;
     Parent* _parent;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to