This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch dev_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2b2afa8d78c5f53c25349b3ee01095a8ecc1a773
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Mon Oct 16 18:43:26 2023 +0800

    merge block to single block on join/set node
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  89 ++++-----------
 be/src/pipeline/exec/hashjoin_build_sink.h         |   5 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   4 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  13 +--
 be/src/pipeline/exec/set_sink_operator.h           |   2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   6 +-
 be/src/pipeline/pipeline_x/dependency.h            |   5 +-
 be/src/vec/common/hash_table/hash_map.h            |  75 +++++++++++++
 .../vec/common/hash_table/hash_table_set_build.h   |  17 ++-
 be/src/vec/exec/join/join_op.h                     |  72 +++++-------
 be/src/vec/exec/join/process_hash_table_probe.h    |   9 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 125 +++++----------------
 be/src/vec/exec/join/vhash_join_node.cpp           |  60 +++-------
 be/src/vec/exec/join/vhash_join_node.h             |  30 +++--
 be/src/vec/exec/vset_operation_node.cpp            |  66 ++++++-----
 be/src/vec/exec/vset_operation_node.h              |   5 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |  11 +-
 17 files changed, 248 insertions(+), 346 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 153882075b6..3094c72007e 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -39,10 +39,7 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
 
 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : JoinBuildSinkLocalState(parent, state),
-          _build_block_idx(0),
-          _build_side_mem_used(0),
-          _build_side_last_mem_used(0) {}
+        : JoinBuildSinkLocalState(parent, state) {}
 
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
@@ -52,13 +49,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
-        _shared_state->build_blocks = p._shared_hash_table_context->blocks;
-    } else {
-        _shared_state->build_blocks.reset(new 
std::vector<vectorized::Block>());
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+        _shared_state->build_block = p._shared_hash_table_context->block;
     }
     _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
     _shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
@@ -82,11 +73,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     if (!_should_build_hash_table) {
         _shared_hash_table_dependency->block_writing();
         p._shared_hashtable_controller->append_dependency(p.id(), 
_shared_hash_table_dependency);
-    } else if (p._is_broadcast_join) {
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
     }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -156,25 +142,24 @@ void 
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     _shared_state->short_circuit_for_probe =
             (_shared_state->_has_null_in_build_side &&
              p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::INNER_JOIN &&
+            (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN 
&&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
 
     //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
     //we could get the result is probe table + null-column(if need output)
     _shared_state->empty_right_table_need_probe_dispose =
-            (_shared_state->build_blocks->empty() && 
!p._have_other_join_conjunct &&
-             !p._is_mark_join) &&
+            (!_shared_state->build_block && !p._have_other_join_conjunct && 
!p._is_mark_join) &&
             (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN ||
              p._join_op == TJoinOp::LEFT_ANTI_JOIN);
 }
 
 Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
-                                                        vectorized::Block& 
block, uint8_t offset) {
+                                                        vectorized::Block& 
block) {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
@@ -220,7 +205,7 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                         vectorized::ProcessHashTableBuild<HashTableCtxType,
                                                           
HashJoinBuildSinkLocalState>
                                 hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
+                                                         state->batch_size(), 
state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
                                         arg,
@@ -402,9 +387,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (local_state._shared_state->_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -417,52 +399,29 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
+            if (local_state._build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(local_state._build_side_mem_used - 
local_state._build_side_last_mem_used >
-                     BUILD_BLOCK_MAX_SIZE)) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (local_state._build_side_mutable_block.rows() >
+                std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            local_state._shared_state->build_blocks->emplace_back(
-                    local_state._build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
-
-            local_state._build_side_mutable_block = vectorized::MutableBlock();
-            ++local_state._build_block_idx;
-            local_state._build_side_last_mem_used = 
local_state._build_side_mem_used;
         }
     }
 
     if (local_state._should_build_hash_table && source_state == 
SourceState::FINISHED) {
         if (!local_state._build_side_mutable_block.empty()) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            local_state._shared_state->build_blocks->emplace_back(
+            local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                     local_state._build_side_mutable_block.to_block());
             COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
+                           (*local_state._shared_state->build_block).bytes());
             RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
+                    state, (*local_state._shared_state->build_block)));
         }
         auto ret = std::visit(
                 Overload {[&](std::monostate&) -> Status {
@@ -552,7 +511,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (source_state == SourceState::FINISHED) {
         // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
         // when the build side is not empty.
-        if (!local_state._shared_state->build_blocks->empty() &&
+        if (!local_state._shared_state->build_block &&
             _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             local_state._shared_state->probe_ignore_null = true;
         }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 9b43f95cd3b..9cf559588cc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -61,11 +61,11 @@ public:
     ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
     using Parent = HashJoinBuildSinkOperatorX;
     HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
-    ~HashJoinBuildSinkLocalState() = default;
+    ~HashJoinBuildSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
-    Status process_build_block(RuntimeState* state, vectorized::Block& block, 
uint8_t offset);
+    Status process_build_block(RuntimeState* state, vectorized::Block& block);
 
     void init_short_circuit_for_probe();
     HashJoinBuildSinkOperatorX* join_build() { return 
(HashJoinBuildSinkOperatorX*)_parent; }
@@ -94,7 +94,6 @@ protected:
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     bool _should_build_hash_table = true;
-    uint8_t _build_block_idx = 0;
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
     vectorized::MutableBlock _build_side_mutable_block;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 082f45199c1..14503a90357 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -89,9 +89,7 @@ public:
     vectorized::DataTypes right_table_data_types();
     vectorized::DataTypes left_table_data_types();
     bool* has_null_in_build_side() { return 
&_shared_state->_has_null_in_build_side; }
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
-        return _shared_state->build_blocks;
-    }
+    std::shared_ptr<vectorized::Block> build_block() const { return 
_shared_state->build_block; }
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 604729a4700..003f77eceff 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -60,8 +60,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
     auto& mem_used = local_state._shared_state->mem_used;
-    auto& build_blocks = local_state._shared_state->build_blocks;
-    auto& build_block_index = local_state._shared_state->build_block_index;
+    auto& build_block = local_state._shared_state->build_block;
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
 
     if (in_block->rows() != 0) {
@@ -71,11 +70,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
     if (source_state == SourceState::FINISHED ||
         local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        build_blocks.emplace_back(local_state._mutable_block.to_block());
-        RETURN_IF_ERROR(_process_build_block(local_state, 
build_blocks[build_block_index],
-                                             build_block_index, state));
+        build_block = local_state._mutable_block.to_block();
+        RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
         local_state._mutable_block.clear();
-        ++build_block_index;
 
         if (source_state == SourceState::FINISHED) {
             if constexpr (is_intersect) {
@@ -101,7 +98,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::_process_build_block(
-        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& 
block, uint8_t offset,
+        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
         RuntimeState* state) {
     size_t rows = block.rows();
     if (rows == 0) {
@@ -117,7 +114,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     vectorized::HashTableBuildX<HashTableCtxType, is_intersect>
-                            hash_table_build_process(rows, raw_ptrs, offset, 
state);
+                            hash_table_build_process(rows, raw_ptrs, state);
                     static_cast<void>(hash_table_build_process(local_state, 
arg));
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 945ec06891c..fe4ff3ca6b7 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -117,7 +117,7 @@ private:
     friend struct HashTableBuild;
 
     Status _process_build_block(SetSinkLocalState<is_intersect>& local_state,
-                                vectorized::Block& block, uint8_t offset, 
RuntimeState* state);
+                                vectorized::Block& block, RuntimeState* state);
     Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state,
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs);
 
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 58cf33c90b5..1daa93e91c0 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -163,12 +163,12 @@ void 
SetSourceOperatorX<is_intersect>::_add_result_columns(
         SetSourceLocalState<is_intersect>& local_state, 
vectorized::RowRefListWithFlags& value,
         int& block_size) {
     auto& build_col_idx = local_state._shared_state->build_col_idx;
-    auto& build_blocks = local_state._shared_state->build_blocks;
+    auto& build_block = local_state._shared_state->build_block;
 
     auto it = value.begin();
     for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
-        auto& column = 
*build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (local_state._mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        auto& column = *build_block.get_by_position(idx->first).column;
+        if (local_state._mutable_cols[idx->second]->is_nullable() ^ 
column.is_nullable()) {
             DCHECK(local_state._mutable_cols[idx->second]->is_nullable());
             
((vectorized::ColumnNullable*)(local_state._mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 5d643f58fbd..4868675241a 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -567,7 +567,7 @@ struct HashJoinSharedState : public JoinSharedState {
             std::make_shared<vectorized::HashTableVariants>();
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
+    std::shared_ptr<vectorized::Block> build_block;
     bool probe_ignore_null = false;
 };
 
@@ -660,8 +660,7 @@ struct SetSharedState {
     /// default init
     //record memory during running
     int64_t mem_used = 0;
-    std::vector<vectorized::Block> build_blocks; // build to source
-    int build_block_index = 0;                   // build to source
+    vectorized::Block build_block; // build to source
     //record element size in hashtable
     int64_t valid_element_in_hash_tbl = 0;
     //first:column_id, could point to origin column or cast column
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 5b7cd6f4642..1488db0afea 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <span>
+
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -193,10 +195,83 @@ public:
     bool has_null_key_data() const { return false; }
 };
 
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, 
Allocator> {
+public:
+    using Self = JoinHashMapTable;
+    using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+    using key_type = Key;
+    using value_type = typename Cell::value_type;
+    using mapped_type = typename Cell::Mapped;
+
+    using LookupResult = typename Base::LookupResult;
+
+    using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+    void expanse_for_add_elem(size_t num_elem) {
+        bucket_size = calc_bucket_size(num_elem + 1);
+        first.resize(bucket_size, 0);
+        next.resize(num_elem + 1, 0);
+    }
+
+    static uint32_t calc_bucket_size(size_t num_elem) {
+        size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem 
- 1) / 7;
+        return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
+    }
+
+    void build(std::span<Key> keys) {
+        build_keys = keys;
+        const auto num_elem = keys.size();
+        for (size_t i = 1; i < num_elem; i++) {
+            uint32_t bucket_num = Base::hash(keys[i]) & (bucket_size - 1);
+            next[i] = first[bucket_num];
+            first[bucket_num] = i;
+        }
+    }
+
+    uint32_t bucket_num(const Key& key) const { return Base::hash(key) & 
(bucket_size - 1); }
+
+    LookupResult ALWAYS_INLINE find(Key key) { return Base::find(key); }
+    LookupResult ALWAYS_INLINE find(Key x, size_t hash_value) { return 
Base::find(x, hash_value); }
+
+    auto find(Key* __restrict keys, const size_t* __restrict hash_values, int 
probe_idx, int n,
+              std::vector<uint32_t>& probe_idxs, std::vector<int>& build_idxs) 
{
+        auto matched_cnt = 0;
+        while (probe_idx < n && matched_cnt < 4096) {
+            uint32_t bucket_num = hash_values[probe_idx] & (bucket_size - 1);
+            auto build_idx = first[bucket_num];
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    matched_cnt++;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::pair {probe_idx, matched_cnt};
+    }
+
+private:
+    uint32_t bucket_size = 0;
+    std::vector<uint32_t> first;
+    std::vector<uint32_t> next;
+    std::span<Key> build_keys;
+    Cell cell;
+    doris::vectorized::Arena* pool;
+};
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, 
Grower, Allocator>;
 
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+using JoinHashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, 
Grower, Allocator>;
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMapWithSavedHash =
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index ff1fec3ab1c..94c625e4f28 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -24,10 +24,9 @@ namespace doris::vectorized {
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
     HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs,
-                   VSetOperationNode<is_intersect>* operation_node, uint8_t 
offset,
+                   VSetOperationNode<is_intersect>* operation_node, 
                    RuntimeState* state)
             : _rows(rows),
-              _offset(offset),
               _build_raw_ptrs(build_raw_ptrs),
               _operation_node(operation_node),
               _state(state) {}
@@ -48,9 +47,9 @@ struct HashTableBuild {
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
             HashTableContext::try_presis_key(key, origin, arena);
-            ctor(key, Mapped {k, _offset});
+            ctor(key, Mapped {k});
         };
-        auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; 
};
+        auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; };
 
         for (; k < _rows; ++k) {
             if (k % CHECK_FRECUENCY == 0) {
@@ -63,7 +62,6 @@ struct HashTableBuild {
 
 private:
     const int _rows;
-    const uint8_t _offset;
     ColumnRawPtrs& _build_raw_ptrs;
     VSetOperationNode<is_intersect>* _operation_node;
     RuntimeState* _state;
@@ -71,8 +69,8 @@ private:
 
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuildX {
-    HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, 
RuntimeState* state)
-            : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), 
_state(state) {}
+    HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, RuntimeState* 
state)
+            : _rows(rows), _build_raw_ptrs(build_raw_ptrs), _state(state) {}
 
     Status operator()(pipeline::SetSinkLocalState<is_intersect>& local_state,
                       HashTableContext& hash_table_ctx) {
@@ -91,9 +89,9 @@ struct HashTableBuildX {
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
             HashTableContext::try_presis_key(key, origin, local_state._arena);
-            ctor(key, Mapped {k, _offset});
+            ctor(key, Mapped {k});
         };
-        auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; 
};
+        auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; };
 
         for (; k < _rows; ++k) {
             if (k % CHECK_FRECUENCY == 0) {
@@ -106,7 +104,6 @@ struct HashTableBuildX {
 
 private:
     const int _rows;
-    const uint8_t _offset;
     ColumnRawPtrs& _build_raw_ptrs;
     RuntimeState* _state;
     std::vector<size_t> _build_side_hash_values;
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index 1b8b8f2c695..858f5197b03 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -18,7 +18,6 @@
 #pragma once
 #include "vec/common/arena.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
 #include "vec/core/block.h"
 
 namespace doris::vectorized {
@@ -45,19 +44,19 @@ namespace doris::vectorized {
  */
 struct RowRef {
     uint32_t row_num = 0;
-    uint8_t block_offset;
 
     RowRef() = default;
-    RowRef(size_t row_num_count, uint8_t block_offset_)
-            : row_num(row_num_count), block_offset(block_offset_) {}
+    RowRef(size_t row_num_count) : row_num(row_num_count) {}
+    void clear() {};
 };
 
 struct RowRefWithFlag : public RowRef {
     bool visited;
 
     RowRefWithFlag() = default;
-    RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool 
is_visited = false)
-            : RowRef(row_num_count, block_offset_), visited(is_visited) {}
+    RowRefWithFlag(size_t row_num_count, bool is_visited = false)
+            : RowRef(row_num_count), visited(is_visited) {}
+    void clear() {};
 };
 
 /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
@@ -93,14 +92,15 @@ public:
     ForwardIterator() : root(nullptr), first(false), batch(nullptr), 
position(0) {}
 
     ForwardIterator(RowRefListType* begin)
-            : root(begin), first(true), batch(root->next), position(0) {}
+            : root(begin), first(true), batch((&root->next)), position(0) {}
 
     RowRefType& operator*() {
         if (first) {
             return *root;
         }
-        return batch->row_refs[position];
+        return batch->operator[](position);
     }
+
     RowRefType* operator->() { return &(**this); }
 
     void operator++() {
@@ -109,21 +109,17 @@ public:
             return;
         }
 
-        if (batch) {
+        if (batch && position < batch->size()) {
             ++position;
-            if (position >= batch->size) {
-                batch = batch->next;
-                position = 0;
-            }
         }
     }
 
-    bool ok() const { return first || batch; }
+    bool ok() const { return first || (batch && position < batch->size()); }
 
 private:
     RowRefListType* root;
     bool first;
-    Batch<RowRefType>* batch;
+    std::vector<RowRefType>* batch;
     size_t position;
 };
 
@@ -131,76 +127,60 @@ struct RowRefList : RowRef {
     using RowRefType = RowRef;
 
     RowRefList() = default;
-    RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, 
block_offset_) {}
+    RowRefList(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefList> begin() { return 
ForwardIterator<RowRefList>(this); }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(std::move(row_ref)); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefList>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlag : RowRef {
     using RowRefType = RowRef;
 
     RowRefListWithFlag() = default;
-    RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) : 
RowRef(row_num_, block_offset_) {}
+    RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefListWithFlag> const begin() {
         return ForwardIterator<RowRefListWithFlag>(this);
     }
 
     /// insert element after current one
-    void insert(RowRef&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
     bool visited = false;
 
 private:
     friend class ForwardIterator<RowRefListWithFlag>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlags : RowRefWithFlag {
     using RowRefType = RowRefWithFlag;
 
     RowRefListWithFlags() = default;
-    RowRefListWithFlags(size_t row_num_, uint8_t block_offset_)
-            : RowRefWithFlag(row_num_, block_offset_) {}
+    RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
 
     ForwardIterator<RowRefListWithFlags> const begin() {
         return ForwardIterator<RowRefListWithFlags>(this);
     }
 
     /// insert element after current one
-    void insert(RowRefWithFlag&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefListWithFlags>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 435cea84186..421e2e72020 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -75,7 +75,7 @@ struct ProcessHashTableProbe {
                                                UInt8* __restrict null_map_data,
                                                UInt8* __restrict filter_map, 
Block* output_block);
 
-    void _emplace_element(int8_t block_offset, int32_t block_row, int& 
current_offset);
+    void _emplace_element(int32_t block_row, int& current_offset);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
@@ -94,14 +94,13 @@ struct ProcessHashTableProbe {
 
     Parent* _parent;
     const int _batch_size;
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     std::unique_ptr<Arena> _arena;
     std::vector<StringRef> _probe_keys;
 
     std::vector<uint32_t> _probe_indexs;
-    PaddedPODArray<int8_t> _build_block_offsets;
-    PaddedPODArray<int32_t> _build_block_rows;
-    std::vector<std::pair<int8_t, int>> _build_blocks_locs;
+    PaddedPODArray<int> _build_block_rows;
+    std::vector<int> _build_blocks_locs;
     // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
     ColumnUInt8::Container* _tuple_is_null_left_flags;
     // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index f4a3010c49f..dd38b653e65 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -32,7 +32,7 @@ template <int JoinOpType, typename Parent>
 ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* 
parent, int batch_size)
         : _parent(parent),
           _batch_size(batch_size),
-          _build_blocks(parent->build_blocks()),
+          _build_block(parent->build_block()),
           _tuple_is_null_left_flags(parent->is_outer_join()
                                             ? &(reinterpret_cast<ColumnUInt8&>(
                                                         
*parent->_tuple_is_null_left_flag_column)
@@ -69,51 +69,13 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
     if (!is_semi_anti_join || have_other_join_conjunct) {
-        if (_build_blocks->size() == 1) {
-            for (int i = 0; i < _right_col_len; i++) {
-                auto& column = *(*_build_blocks)[0].get_by_position(i).column;
-                if (output_slot_flags[i]) {
-                    mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
-                                                                  
_build_block_rows.data() + size);
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
-            }
-        } else {
-            for (int i = 0; i < _right_col_len; i++) {
-                if (output_slot_flags[i]) {
-                    for (int j = 0; j < size; j++) {
-                        if constexpr (probe_all) {
-                            if (_build_block_offsets[j] == -1) {
-                                DCHECK(mcol[i + 
_right_col_idx]->is_nullable());
-                                assert_cast<ColumnNullable*>(mcol[i + 
_right_col_idx].get())
-                                        ->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        } else {
-                            if (_build_block_offsets[j] == -1) {
-                                // the only case to reach here:
-                                // 1. left anti join with other conjuncts, and
-                                // 2. equal conjuncts does not match
-                                // since nullptr is emplaced back to 
visited_map,
-                                // the output value of the build side does not 
matter,
-                                // just insert default value
-                                mcol[i + _right_col_idx]->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        }
-                    }
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
+        for (int i = 0; i < _right_col_len; i++) {
+            const auto& column = *_build_block->get_by_position(i).column;
+            if (output_slot_flags[i]) {
+                mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
+                                                              
_build_block_rows.data() + size);
+            } else {
+                mcol[i + _right_col_idx]->insert_many_defaults(size);
             }
         }
     }
@@ -167,7 +129,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     _row_count_from_last_probe = 0;
 
     _build_block_rows.clear();
-    _build_block_offsets.clear();
     _probe_indexs.clear();
     if (with_other_join_conjuncts) {
         // use in right join to change visited state after exec the vother 
join conjunct
@@ -178,7 +139,6 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     }
     _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
     _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
 
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
@@ -199,7 +159,7 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 
     SCOPED_TIMER(_search_hashtable_timer);
     for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
-        _emplace_element(probe_row_match_iter->block_offset, 
probe_row_match_iter->row_num,
+        _emplace_element(probe_row_match_iter->row_num,
                          current_offset);
         _probe_indexs.emplace_back(probe_index);
         if constexpr (with_other_join_conjuncts) {
@@ -218,10 +178,8 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 }
 
 template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t 
block_offset,
-                                                                 int32_t 
block_row,
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t 
block_row,
                                                                  int& 
current_offset) {
-    _build_block_offsets.emplace_back(block_offset);
     _build_block_rows.emplace_back(block_row);
     current_offset++;
 }
@@ -290,7 +248,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                 if ((*null_map)[probe_index]) {
                     if constexpr (probe_all) {
                         // only full outer / left outer need insert the data 
of right table
-                        _emplace_element(-1, -1, current_offset);
+                        _emplace_element(-1, current_offset);
                         _probe_indexs.emplace_back(probe_index);
 
                         if constexpr (with_other_conjuncts) {
@@ -340,7 +298,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                     // logic for now.
                     if constexpr (is_mark_join && with_other_conjuncts) {
                         for (auto it = mapped.begin(); it.ok(); ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
+                            _emplace_element(it->row_num, current_offset);
                             _visited_map.emplace_back(&it->visited);
                         }
                         ++probe_index;
@@ -348,7 +306,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                         auto multi_match_last_offset = current_offset;
                         auto it = mapped.begin();
                         for (; it.ok() && current_offset < _batch_size; ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
+                            _emplace_element(it->row_num, current_offset);
 
                             if constexpr (with_other_conjuncts) {
                                 _visited_map.emplace_back(&it->visited);
@@ -383,7 +341,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                      JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
                                      (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && 
is_mark_join)) {
                     // only full outer / left outer need insert the data of 
right table
-                    _emplace_element(-1, -1, current_offset);
+                    _emplace_element(-1, current_offset);
 
                     if constexpr (with_other_conjuncts) {
                         _same_to_prev.emplace_back(false);
@@ -793,24 +751,20 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
         auto& visited_iter =
                 
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
         _build_blocks_locs.resize(_batch_size);
-        auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
-            _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, 
row_nums);
-        };
-
         if (visited_iter.ok()) {
             if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                    _build_blocks_locs[block_size++] = visited_iter->row_num;
                 }
             } else {
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     } else {
                         if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     }
                 }
@@ -827,7 +781,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         visited_iter = mapped.begin();
                         for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                         if (visited_iter.ok()) {
                             // block_size >= _batch_size, quit for loop
@@ -838,7 +792,7 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                     if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
                         visited_iter = mapped.begin();
                         for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                         if (visited_iter.ok()) {
                             // block_size >= _batch_size, quit for loop
@@ -851,11 +805,11 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
                 for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
                     if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                         if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     } else {
                         if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
+                            _build_blocks_locs[block_size++] = 
visited_iter->row_num;
                         }
                     }
                 }
@@ -867,38 +821,11 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
         }
         _build_blocks_locs.resize(block_size);
 
-        auto insert_build_rows = [&](int8_t offset) {
-            for (size_t j = 0; j < right_col_len; ++j) {
-                auto& column = 
*(*_build_blocks)[offset].get_by_position(j).column;
-                mcol[j + right_col_idx]->insert_indices_from(
-                        column, _build_block_rows.data(),
-                        _build_block_rows.data() + _build_block_rows.size());
-            }
-        };
-        if (_build_blocks->size() > 1) {
-            std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
-                      [](const auto a, const auto b) { return a.first > 
b.first; });
-            auto start = 0, end = 0;
-            while (start < _build_blocks_locs.size()) {
-                while (end < _build_blocks_locs.size() &&
-                       _build_blocks_locs[start].first == 
_build_blocks_locs[end].first) {
-                    end++;
-                }
-                auto offset = _build_blocks_locs[start].first;
-                _build_block_rows.resize(end - start);
-                for (int i = 0; start + i < end; i++) {
-                    _build_block_rows[i] = _build_blocks_locs[start + 
i].second;
-                }
-                start = end;
-                insert_build_rows(offset);
-            }
-        } else if (_build_blocks->size() == 1) {
-            const auto size = _build_blocks_locs.size();
-            _build_block_rows.resize(_build_blocks_locs.size());
-            for (int i = 0; i < size; i++) {
-                _build_block_rows[i] = _build_blocks_locs[i].second;
-            }
-            insert_build_rows(0);
+        for (size_t j = 0; j < right_col_len; ++j) {
+            const auto& column = *_build_block->get_by_position(j).column;
+            mcol[j + right_col_idx]->insert_indices_from(
+                    column, _build_block_rows.data(),
+                    _build_block_rows.data() + _build_block_rows.size());
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index af7336a1f5e..2f789709698 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -98,12 +98,7 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const 
TPlanNode& tnode, const Descr
     _arena = std::make_shared<Arena>();
     _hash_table_variants = std::make_shared<HashTableVariants>();
     _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
-    _build_blocks.reset(new std::vector<Block>());
-
-    // avoid vector expand change block address.
-    // one block can store 4g data, _build_blocks can store 128*4g data.
-    // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-    _build_blocks->reserve(HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+    _build_block = std::make_shared<Block>();
 }
 
 Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -726,9 +721,6 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* 
state) {
 Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
     SCOPED_TIMER(_build_timer);
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -741,41 +733,25 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(_build_side_merge_block_timer);
+            if (_build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(_build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > 
BUILD_BLOCK_MAX_SIZE)) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (_build_side_mutable_block.rows() > 
std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
-
-            _build_side_mutable_block = MutableBlock();
-            ++_build_block_idx;
-            _build_side_last_mem_used = _build_side_mem_used;
         }
     }
 
     if (_should_build_hash_table && eos) {
         if (!_build_side_mutable_block.empty()) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
+            _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
+            COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+            RETURN_IF_ERROR(_process_build_block(state, *_build_block));
         }
         auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
                                             LOG(FATAL) << "FATAL: uninited 
hash table";
@@ -797,7 +773,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = _arena;
-            _shared_hash_table_context->blocks = _build_blocks;
+            _shared_hash_table_context->block = _build_block;
             _shared_hash_table_context->hash_table_variants = 
_hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
                     _has_null_in_build_side;
@@ -829,7 +805,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                 *_hash_table_variants,
                 *std::static_pointer_cast<HashTableVariants>(
                         _shared_hash_table_context->hash_table_variants));
-        _build_blocks = _shared_hash_table_context->blocks;
+        _build_block = _shared_hash_table_context->block;
 
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
@@ -862,7 +838,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
     // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
     // when the build side is not empty.
-    if (!_build_blocks->empty() && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    if (_build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
         _probe_ignore_null = true;
     }
     _init_short_circuit_for_probe();
@@ -961,7 +937,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, 
const std::vector<int>&
     }
 }
 
-Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, 
uint8_t offset) {
+Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
     if (UNLIKELY(rows == 0)) {
@@ -1004,7 +980,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
                                 hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
+                                                         state->batch_size(), 
state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
                                         arg,
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 50e1567f4e2..b1e21e66f16 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -106,14 +106,13 @@ using ProfileCounter = RuntimeProfile::Counter;
 template <class HashTableContext, typename Parent>
 struct ProcessHashTableBuild {
     ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
-                          Parent* parent, int batch_size, uint8_t offset, 
RuntimeState* state)
+                          Parent* parent, int batch_size, RuntimeState* state)
             : _rows(rows),
               _skip_rows(0),
               _acquired_block(acquired_block),
               _build_raw_ptrs(build_raw_ptrs),
               _parent(parent),
               _batch_size(batch_size),
-              _offset(offset),
               _state(state),
               
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
 
@@ -180,7 +179,7 @@ struct ProcessHashTableBuild {
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
             HashTableContext::try_presis_key(key, origin, arena);
             inserted = true;
-            ctor(key, Mapped {k, _offset});
+            ctor(key, Mapped {k});
         };
 
         bool build_unique = _parent->build_unique();
@@ -212,13 +211,13 @@ struct ProcessHashTableBuild {
         } else if (has_runtime_filter && !build_unique) {
             EMPLACE_IMPL(
                     if (inserted) { inserted_rows.push_back(k); } else {
-                        mapped.insert({k, _offset}, *_parent->arena());
+                        mapped.insert({k}, *_parent->arena());
                         inserted_rows.push_back(k);
                     });
         } else if (!has_runtime_filter && build_unique) {
             EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
         } else {
-            EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, 
*_parent->arena()); });
+            EMPLACE_IMPL(if (!inserted) { mapped.insert({k}, 
*_parent->arena()); });
         }
         _parent->_build_rf_cardinality += inserted_rows.size();
 
@@ -239,7 +238,6 @@ private:
     ColumnRawPtrs& _build_raw_ptrs;
     Parent* _parent;
     int _batch_size;
-    uint8_t _offset;
     RuntimeState* _state;
 
     ProfileCounter* _build_side_compute_hash_timer;
@@ -325,8 +323,6 @@ using HashTableIteratorVariants =
         std::variant<std::monostate, ForwardIterator<RowRefList>,
                      ForwardIterator<RowRefListWithFlag>, 
ForwardIterator<RowRefListWithFlags>>;
 
-static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
-
 class HashJoinNode final : public VJoinNodeBase {
 public:
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
@@ -369,7 +365,7 @@ public:
     bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }
-    std::shared_ptr<std::vector<Block>> build_blocks() const { return 
_build_blocks; }
+    std::shared_ptr<Block> build_block() const { return _build_block; }
     std::vector<bool>* left_output_slot_flags() { return 
&_left_output_slot_flags; }
     std::vector<bool>* right_output_slot_flags() { return 
&_right_output_slot_flags; }
     bool* has_null_in_build_side() { return &_has_null_in_build_side; }
@@ -390,16 +386,16 @@ private:
         _short_circuit_for_probe =
                 (_has_null_in_build_side && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                  !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN 
&& !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+                (!_build_block && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
 
         //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         //we could get the result is probe table + null-column(if need output)
         _empty_right_table_need_probe_dispose =
-                (_build_blocks->empty() && !_have_other_join_conjunct && 
!_is_mark_join) &&
+                (!_build_block && !_have_other_join_conjunct && 
!_is_mark_join) &&
                 (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN ||
                  _join_op == TJoinOp::LEFT_ANTI_JOIN);
     }
@@ -467,7 +463,7 @@ private:
     HashTableIteratorVariants _outer_join_pull_visited_iter;
     HashTableIteratorVariants _probe_row_match_iter;
 
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     ColumnUInt8::MutablePtr _null_map_column;
@@ -501,7 +497,7 @@ private:
 
     Status _materialize_build_side(RuntimeState* state) override;
 
-    Status _process_build_block(RuntimeState* state, Block& block, uint8_t 
offset);
+    Status _process_build_block(RuntimeState* state, Block& block);
 
     Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer, 
std::vector<int>& res_col_ids);
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 9b15db67b3c..0da84ecfe2c 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -60,7 +60,6 @@ 
VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan
         : ExecNode(pool, tnode, descs),
           _valid_element_in_hash_tbl(0),
           _mem_used(0),
-          _build_block_index(0),
           _build_finished(false) {
     _hash_table_variants = std::make_unique<HashTableVariants>();
 }
@@ -223,36 +222,45 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
 
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* 
block, bool eos) {
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (block->rows() != 0) {
         _mem_used += block->allocated_bytes();
         RETURN_IF_ERROR(_mutable_block.merge(*block));
     }
 
-    if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        _build_blocks.emplace_back(_mutable_block.to_block());
-        RETURN_IF_ERROR(
-                process_build_block(_build_blocks[_build_block_index], 
_build_block_index, state));
+    if (block->rows() != 0) {
+        if (_build_block.empty()) {
+            
RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(1, 
false))));
+        }
+        RETURN_IF_ERROR(_mutable_block.merge(*block));
+        if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
+            return Status::NotSupported(
+                    "Hash join do not support build table rows"
+                    " over:" +
+                    std::to_string(std::numeric_limits<uint32_t>::max()));
+        }
+    }
+
+    if (eos) {
+        if (!_mutable_block.empty()) {
+            _build_block = _mutable_block.to_block();
+        }
+        RETURN_IF_ERROR(process_build_block(_build_block, state));
         _mutable_block.clear();
-        ++_build_block_index;
 
-        if (eos) {
-            if constexpr (is_intersect) {
-                _valid_element_in_hash_tbl = 0;
-            } else {
-                std::visit(
-                        [&](auto&& arg) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                _valid_element_in_hash_tbl = 
arg.hash_table->size();
-                            }
-                        },
-                        *_hash_table_variants);
-            }
-            _build_finished = true;
-            _can_read = _children.size() == 1;
+        if constexpr (is_intersect) {
+            _valid_element_in_hash_tbl = 0;
+        } else {
+            std::visit(
+                    [&](auto&& arg) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            _valid_element_in_hash_tbl = 
arg.hash_table->size();
+                        }
+                    },
+                    *_hash_table_variants);
         }
+        _build_finished = true;
+        _can_read = _children.size() == 1;
     }
     return Status::OK();
 }
@@ -304,7 +312,7 @@ Status 
VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
 }
 
 template <bool is_intersect>
-Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
uint8_t offset,
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block,
                                                             RuntimeState* 
state) {
     size_t rows = block.rows();
     if (rows == 0) {
@@ -320,7 +328,7 @@ Status 
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     HashTableBuild<HashTableCtxType, is_intersect> 
hash_table_build_process(
-                            rows, raw_ptrs, this, offset, state);
+                            rows, raw_ptrs, this, state);
                     st = hash_table_build_process(arg, _arena);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
@@ -336,8 +344,8 @@ void 
VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
                                                          int& block_size) {
     auto it = value.begin();
     for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); 
++idx) {
-        auto& column = 
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (_mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        const auto& column = *_build_block.get_by_position(idx->first).column;
+        if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) {
             DCHECK(_mutable_cols[idx->second]->is_nullable());
             ((ColumnNullable*)(_mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
@@ -508,10 +516,6 @@ void VSetOperationNode<is_intersect>::debug_string(int 
indentation_level,
 template <bool is_intersect>
 void VSetOperationNode<is_intersect>::release_mem() {
     _hash_table_variants = nullptr;
-
-    std::vector<Block> tmp_build_blocks;
-    _build_blocks.swap(tmp_build_blocks);
-
     _probe_block.clear();
 }
 
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index dfd96430115..51c7cfc79c8 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -79,7 +79,7 @@ private:
     //It's time to abstract out the same methods and provide them directly to 
others;
     void hash_table_init();
     Status hash_table_build(RuntimeState* state);
-    Status process_build_block(Block& block, uint8_t offset, RuntimeState* 
state);
+    Status process_build_block(Block& block, RuntimeState* state);
     Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
     Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int 
child_id);
     void refresh_hash_table();
@@ -112,11 +112,10 @@ private:
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
 
-    std::vector<Block> _build_blocks;
+    Block _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     std::vector<MutableColumnPtr> _mutable_cols;
-    int _build_block_index;
     bool _build_finished;
     std::vector<bool> _probe_finished_children_index;
     MutableBlock _mutable_block;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 6b31cf07ec9..e1c01709042 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -53,18 +53,15 @@ struct SharedRuntimeFilterContext {
 
 struct SharedHashTableContext {
     SharedHashTableContext()
-            : hash_table_variants(nullptr),
-              blocks(new std::vector<vectorized::Block>()),
-              signaled(false),
-              short_circuit_for_null_in_probe_side(false) {}
+            : hash_table_variants(nullptr), 
block(std::make_shared<vectorized::Block>()) {}
 
     Status status;
     std::shared_ptr<Arena> arena;
     std::shared_ptr<void> hash_table_variants;
-    std::shared_ptr<std::vector<Block>> blocks;
+    std::shared_ptr<Block> block;
     std::map<int, SharedRuntimeFilterContext> runtime_filters;
-    bool signaled;
-    bool short_circuit_for_null_in_probe_side;
+    bool signaled {};
+    bool short_circuit_for_null_in_probe_side {};
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to