This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 41b2073d648818b7c60bd9ad9dceafd9f6571937
Author: HappenLee <[email protected]>
AuthorDate: Tue Oct 31 11:03:35 2023 +0800

    Join rewrite (#26140)
    
    Co-authored-by: BiteTheDDDDt <[email protected]>
---
 be/src/exprs/runtime_filter_slots.h                |  36 ++-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 146 +++------
 be/src/pipeline/exec/hashjoin_build_sink.h         |   7 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   4 +-
 be/src/pipeline/exec/set_sink_operator.cpp         |  13 +-
 be/src/pipeline/exec/set_sink_operator.h           |   2 +-
 be/src/pipeline/exec/set_source_operator.cpp       |   6 +-
 be/src/pipeline/pipeline_x/dependency.h            |   7 +-
 be/src/vec/columns/column.h                        |   9 +-
 be/src/vec/columns/column_array.cpp                |  11 +
 be/src/vec/columns/column_array.h                  |   3 +
 be/src/vec/columns/column_complex.h                |  15 +
 be/src/vec/columns/column_const.h                  |   5 +
 be/src/vec/columns/column_decimal.h                |  12 +
 be/src/vec/columns/column_dictionary.h             |   5 +
 be/src/vec/columns/column_fixed_length_object.h    |  22 ++
 be/src/vec/columns/column_map.cpp                  |  11 +
 be/src/vec/columns/column_map.h                    |   3 +
 be/src/vec/columns/column_nullable.cpp             |  10 +
 be/src/vec/columns/column_nullable.h               |   3 +
 be/src/vec/columns/column_object.cpp               |  11 +
 be/src/vec/columns/column_object.h                 |   3 +
 be/src/vec/columns/column_string.cpp               |  37 +++
 be/src/vec/columns/column_string.h                 |   3 +
 be/src/vec/columns/column_struct.cpp               |   9 +
 be/src/vec/columns/column_struct.h                 |   3 +
 be/src/vec/columns/column_vector.cpp               |  23 ++
 be/src/vec/columns/column_vector.h                 |   2 +
 be/src/vec/columns/predicate_column.h              |   5 +
 be/src/vec/common/hash_table/hash_map.h            | 214 ++++++++++++
 be/src/vec/common/hash_table/hash_map_context.h    | 124 +++++--
 be/src/vec/common/hash_table/hash_table.h          |   1 -
 .../vec/common/hash_table/hash_table_set_build.h   |   9 +-
 be/src/vec/exec/join/join_op.h                     |  72 ++---
 be/src/vec/exec/join/process_hash_table_probe.h    |   9 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 357 +++------------------
 be/src/vec/exec/join/vhash_join_node.cpp           | 115 +++----
 be/src/vec/exec/join/vhash_join_node.h             | 156 ++-------
 be/src/vec/exec/vset_operation_node.cpp            |  68 ++--
 be/src/vec/exec/vset_operation_node.h              |   5 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |  11 +-
 41 files changed, 786 insertions(+), 781 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 6c96b160551..0f841e5a60f 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -162,7 +162,7 @@ public:
         return Status::OK();
     }
 
-    void insert(std::unordered_map<const vectorized::Block*, 
std::vector<int>>& datas) {
+    void insert(const std::unordered_set<const vectorized::Block*>& datas) {
         for (int i = 0; i < _build_expr_context.size(); ++i) {
             auto iter = _runtime_filters.find(i);
             if (iter == _runtime_filters.end()) {
@@ -170,30 +170,32 @@ public:
             }
 
             int result_column_id = 
_build_expr_context[i]->get_last_result_column_id();
-            for (auto it : datas) {
-                auto& column = 
it.first->get_by_position(result_column_id).column;
+            for (const auto* it : datas) {
+                auto column = it->get_by_position(result_column_id).column;
 
-                if (auto* nullable =
+                std::vector<int> indexs;
+                // indexs start from 1 because the first row is mocked for 
join hash map
+                if (const auto* nullable =
                             
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
-                    auto& column_nested = nullable->get_nested_column_ptr();
-                    auto& column_nullmap = nullable->get_null_map_column_ptr();
-                    std::vector<int> indexs;
-                    for (int row_num : it.second) {
-                        if (assert_cast<const 
vectorized::ColumnUInt8*>(column_nullmap.get())
-                                    ->get_bool(row_num)) {
+                    column = nullable->get_nested_column_ptr();
+                    const uint8_t* null_map = assert_cast<const 
vectorized::ColumnUInt8*>(
+                                                      
nullable->get_null_map_column_ptr().get())
+                                                      ->get_data()
+                                                      .data();
+                    for (int i = 1; i < column->size(); i++) {
+                        if (null_map[i]) {
                             continue;
                         }
-                        indexs.push_back(row_num);
+                        indexs.push_back(i);
                     }
-                    for (auto filter : iter->second) {
-                        filter->insert_batch(column_nested, indexs);
-                    }
-
                 } else {
-                    for (auto filter : iter->second) {
-                        filter->insert_batch(column, it.second);
+                    for (int i = 1; i < column->size(); i++) {
+                        indexs.push_back(i);
                     }
                 }
+                for (auto* filter : iter->second) {
+                    filter->insert_batch(column, indexs);
+                }
             }
         }
     }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index cb63f64ab42..0cdb34605e0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -39,10 +39,7 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
 
 
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase* 
parent,
                                                          RuntimeState* state)
-        : JoinBuildSinkLocalState(parent, state),
-          _build_block_idx(0),
-          _build_side_mem_used(0),
-          _build_side_last_mem_used(0) {}
+        : JoinBuildSinkLocalState(parent, state) {}
 
 Status HashJoinBuildSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
@@ -53,13 +50,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
-        _shared_state->build_blocks = p._shared_hash_table_context->blocks;
-    } else {
-        _shared_state->build_blocks.reset(new 
std::vector<vectorized::Block>());
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+        _shared_state->build_block = p._shared_hash_table_context->block;
     }
     _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
     _shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
@@ -84,11 +75,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
         _shared_hash_table_dependency->block_writing();
         p._shared_hashtable_controller->append_dependency(p.node_id(),
                                                           
_shared_hash_table_dependency);
-    } else if (p._is_broadcast_join) {
-        // avoid vector expand change block address.
-        // one block can store 4g data, _build_blocks can store 128*4g data.
-        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
     }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -158,25 +144,24 @@ void 
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     _shared_state->short_circuit_for_probe =
             (_shared_state->_has_null_in_build_side &&
              p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::INNER_JOIN &&
+            (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN 
&&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
              !p._is_mark_join) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-            (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
+            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
 
     //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
     //we could get the result is probe table + null-column(if need output)
     _shared_state->empty_right_table_need_probe_dispose =
-            (_shared_state->build_blocks->empty() && 
!p._have_other_join_conjunct &&
-             !p._is_mark_join) &&
+            (!_shared_state->build_block && !p._have_other_join_conjunct && 
!p._is_mark_join) &&
             (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN ||
              p._join_op == TJoinOp::LEFT_ANTI_JOIN);
 }
 
 Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
-                                                        vectorized::Block& 
block, uint8_t offset) {
+                                                        vectorized::Block& 
block) {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
@@ -209,29 +194,30 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     Status st = _dependency->extract_join_column<true>(block, null_map_val, 
raw_ptrs, res_col_ids);
 
     st = std::visit(
-            Overload {
-                    [&](std::monostate& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        LOG(FATAL) << "FATAL: uninited hash table";
-                        __builtin_unreachable();
-                        return Status::OK();
-                    },
-                    [&](auto&& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        vectorized::ProcessHashTableBuild<HashTableCtxType,
-                                                          
HashJoinBuildSinkLocalState>
-                                hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
-                        return hash_table_build_process
-                                .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
-                                        arg,
-                                        has_null_value || 
short_circuit_for_null_in_build_side
-                                                ? &null_map_val->get_data()
-                                                : nullptr,
-                                        
&_shared_state->_has_null_in_build_side);
-                    }},
-            *_shared_state->hash_table_variants,
+            Overload {[&](std::monostate& arg, auto join_op, auto 
has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          LOG(FATAL) << "FATAL: uninited hash table";
+                          __builtin_unreachable();
+                          return Status::OK();
+                      },
+                      [&](auto&& arg, auto&& join_op, auto has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          using HashTableCtxType = std::decay_t<decltype(arg)>;
+                          using JoinOpType = std::decay_t<decltype(join_op)>;
+                          vectorized::ProcessHashTableBuild<HashTableCtxType,
+                                                            
HashJoinBuildSinkLocalState>
+                                  hash_table_build_process(rows, block, 
raw_ptrs, this,
+                                                           
state->batch_size(), state);
+                          return hash_table_build_process
+                                  .template run<JoinOpType::value, 
has_null_value,
+                                                
short_circuit_for_null_in_build_side>(
+                                          arg,
+                                          has_null_value || 
short_circuit_for_null_in_build_side
+                                                  ? &null_map_val->get_data()
+                                                  : nullptr,
+                                          
&_shared_state->_has_null_in_build_side);
+                      }},
+            *_shared_state->hash_table_variants, 
_shared_state->join_op_variants,
             vectorized::make_bool_variant(_build_side_ignore_null),
             
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side));
 
@@ -323,7 +309,7 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                     }
                     return;
                 }
-                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                if (!try_get_hash_map_context_fixed<JoinFixedHashMap, 
HashCRC32, RowRefListType>(
                             *_shared_state->hash_table_variants, 
_build_expr_ctxs)) {
                     _shared_state->hash_table_variants
                             
->emplace<vectorized::SerializedHashTableContext<RowRefListType>>();
@@ -333,16 +319,6 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
             vectorized::make_bool_variant(p._have_other_join_conjunct));
 
     
DCHECK(!std::holds_alternative<std::monostate>(*_shared_state->hash_table_variants));
-
-    std::visit(vectorized::Overload {[&](std::monostate& arg) {
-                                         LOG(FATAL) << "FATAL: uninited hash 
table";
-                                         __builtin_unreachable();
-                                     },
-                                     [&](auto&& arg) {
-                                         
arg.hash_table->set_partitioned_threshold(
-                                                 
state->partitioned_hash_join_rows_threshold());
-                                     }},
-               *_shared_state->hash_table_variants);
 }
 
 HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int 
operator_id,
@@ -405,9 +381,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (local_state._shared_state->_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -420,52 +393,29 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(local_state._build_side_merge_block_timer);
+            if (local_state._build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(local_state._build_side_mem_used - 
local_state._build_side_last_mem_used >
-                     BUILD_BLOCK_MAX_SIZE)) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (local_state._build_side_mutable_block.rows() >
+                std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            local_state._shared_state->build_blocks->emplace_back(
-                    local_state._build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
-
-            local_state._build_side_mutable_block = vectorized::MutableBlock();
-            ++local_state._build_block_idx;
-            local_state._build_side_last_mem_used = 
local_state._build_side_mem_used;
         }
     }
 
     if (local_state._should_build_hash_table && source_state == 
SourceState::FINISHED) {
         if (!local_state._build_side_mutable_block.empty()) {
-            if (local_state._shared_state->build_blocks->size() ==
-                vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            local_state._shared_state->build_blocks->emplace_back(
+            local_state._shared_state->build_block = 
std::make_shared<vectorized::Block>(
                     local_state._build_side_mutable_block.to_block());
             COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-                           
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
-                                   .bytes());
+                           (*local_state._shared_state->build_block).bytes());
             RETURN_IF_ERROR(local_state.process_build_block(
-                    state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
-                    local_state._build_block_idx));
+                    state, (*local_state._shared_state->build_block)));
         }
         auto ret = std::visit(
                 Overload {[&](std::monostate&) -> Status {
@@ -556,7 +506,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     if (source_state == SourceState::FINISHED) {
         // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
         // when the build side is not empty.
-        if (!local_state._shared_state->build_blocks->empty() &&
+        if (!local_state._shared_state->build_block &&
             _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             local_state._shared_state->probe_ignore_null = true;
         }
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 10056a30e72..16b58adf5fc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -61,11 +61,11 @@ public:
     ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
     using Parent = HashJoinBuildSinkOperatorX;
     HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
-    ~HashJoinBuildSinkLocalState() = default;
+    ~HashJoinBuildSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
-    Status process_build_block(RuntimeState* state, vectorized::Block& block, 
uint8_t offset);
+    Status process_build_block(RuntimeState* state, vectorized::Block& block);
 
     void init_short_circuit_for_probe();
 
@@ -94,7 +94,6 @@ protected:
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     bool _should_build_hash_table = true;
-    uint8_t _build_block_idx = 0;
     int64_t _build_side_mem_used = 0;
     int64_t _build_side_last_mem_used = 0;
     vectorized::MutableBlock _build_side_mutable_block;
@@ -102,7 +101,7 @@ protected:
     bool _has_set_need_null_map_for_build = false;
     bool _build_side_ignore_null = false;
     size_t _build_rf_cardinality = 0;
-    std::unordered_map<const vectorized::Block*, std::vector<int>> 
_inserted_rows;
+    std::unordered_set<const vectorized::Block*> _inserted_blocks;
     std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
 
     RuntimeProfile::Counter* _build_table_timer;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 923f7dd7b94..181934e7b50 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -89,9 +89,7 @@ public:
     vectorized::DataTypes right_table_data_types();
     vectorized::DataTypes left_table_data_types();
     bool* has_null_in_build_side() { return 
&_shared_state->_has_null_in_build_side; }
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
-        return _shared_state->build_blocks;
-    }
+    std::shared_ptr<vectorized::Block> build_block() const { return 
_shared_state->build_block; }
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 90cc792d471..dd6224ef2a3 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -60,8 +60,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
     auto& mem_used = local_state._shared_state->mem_used;
-    auto& build_blocks = local_state._shared_state->build_blocks;
-    auto& build_block_index = local_state._shared_state->build_block_index;
+    auto& build_block = local_state._shared_state->build_block;
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
 
     if (in_block->rows() != 0) {
@@ -71,11 +70,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
     if (source_state == SourceState::FINISHED ||
         local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        build_blocks.emplace_back(local_state._mutable_block.to_block());
-        RETURN_IF_ERROR(_process_build_block(local_state, 
build_blocks[build_block_index],
-                                             build_block_index, state));
+        build_block = local_state._mutable_block.to_block();
+        RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
         local_state._mutable_block.clear();
-        ++build_block_index;
 
         if (source_state == SourceState::FINISHED) {
             if constexpr (is_intersect) {
@@ -101,7 +98,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
 
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::_process_build_block(
-        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& 
block, uint8_t offset,
+        SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
         RuntimeState* state) {
     size_t rows = block.rows();
     if (rows == 0) {
@@ -117,7 +114,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     vectorized::HashTableBuild<HashTableCtxType, is_intersect>
-                            hash_table_build_process(&local_state, rows, 
raw_ptrs, offset, state);
+                            hash_table_build_process(&local_state, rows, 
raw_ptrs, state);
                     static_cast<void>(hash_table_build_process(arg, 
local_state._arena));
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index a1b9d8b7079..c3bf21c3cb9 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -120,7 +120,7 @@ private:
     friend struct HashTableBuild;
 
     Status _process_build_block(SetSinkLocalState<is_intersect>& local_state,
-                                vectorized::Block& block, uint8_t offset, 
RuntimeState* state);
+                                vectorized::Block& block, RuntimeState* state);
     Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state,
                                  vectorized::Block& block, 
vectorized::ColumnRawPtrs& raw_ptrs);
 
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index d3840285c38..28e83739162 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -180,12 +180,12 @@ void 
SetSourceOperatorX<is_intersect>::_add_result_columns(
         SetSourceLocalState<is_intersect>& local_state, 
vectorized::RowRefListWithFlags& value,
         int& block_size) {
     auto& build_col_idx = local_state._shared_state->build_col_idx;
-    auto& build_blocks = local_state._shared_state->build_blocks;
+    auto& build_block = local_state._shared_state->build_block;
 
     auto it = value.begin();
     for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
-        auto& column = 
*build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (local_state._mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        auto& column = *build_block.get_by_position(idx->first).column;
+        if (local_state._mutable_cols[idx->second]->is_nullable() ^ 
column.is_nullable()) {
             DCHECK(local_state._mutable_cols[idx->second]->is_nullable());
             
((vectorized::ColumnNullable*)(local_state._mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index d5349307e5b..3d137667052 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -659,7 +659,7 @@ struct HashJoinSharedState : public JoinSharedState {
             std::make_shared<vectorized::HashTableVariants>();
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
+    std::shared_ptr<vectorized::Block> build_block;
     bool probe_ignore_null = false;
 };
 
@@ -752,8 +752,7 @@ struct SetSharedState {
     /// default init
     //record memory during running
     int64_t mem_used = 0;
-    std::vector<vectorized::Block> build_blocks; // build to source
-    int build_block_index = 0;                   // build to source
+    vectorized::Block build_block; // build to source
     //record element size in hashtable
     int64_t valid_element_in_hash_tbl = 0;
     //first:column_id, could point to origin column or cast column
@@ -824,7 +823,7 @@ public:
             return;
         }
 
-        if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32,
+        if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32,
                                             vectorized::RowRefListWithFlags>(
                     *hash_table_variants, child_exprs_lists[0])) {
             hash_table_variants->emplace<
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index cfb9163820f..ef9a28be1a8 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -238,10 +238,17 @@ public:
     /// Appends a batch elements from other column with the same type
     /// indices_begin + indices_end represent the row indices of column src
     /// Warning:
-    ///       if *indices == -1 means the row is null, only use in outer join, 
do not use in any other place
+    ///       if *indices == -1 means the row is null
     virtual void insert_indices_from(const IColumn& src, const int* 
indices_begin,
                                      const int* indices_end) = 0;
 
+    /// Appends a batch elements from other column with the same type
+    /// indices_begin + indices_end represent the row indices of column src
+    /// Warning:
+    ///       if *indices == 0 means the row is null, only use in outer join, 
do not use in any other place
+    virtual void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                          const uint32_t* indices_end) = 0;
+
     /// Appends data located in specified memory chunk if it is possible 
(throws an exception if it cannot be implemented).
     /// Is used to optimize some computations (in aggregation, for example).
     /// Parameter length could be ignored if column values have fixed size.
diff --git a/be/src/vec/columns/column_array.cpp 
b/be/src/vec/columns/column_array.cpp
index ca3b6c12da5..2916ac83108 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -804,6 +804,17 @@ void ColumnArray::insert_indices_from(const IColumn& src, 
const int* indices_beg
     }
 }
 
+void ColumnArray::insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                           const uint32_t* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x == 0) {
+            ColumnArray::insert_default();
+        } else {
+            ColumnArray::insert_from(src, *x);
+        }
+    }
+}
+
 ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets) 
const {
     if (replicate_offsets.empty()) return clone_empty();
 
diff --git a/be/src/vec/columns/column_array.h 
b/be/src/vec/columns/column_array.h
index c37fb48ba52..46541c3bbb0 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -224,6 +224,9 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     void replace_column_data(const IColumn& rhs, size_t row, size_t self_row = 
0) override {
         DCHECK(size() > self_row);
         const auto& r = assert_cast<const ColumnArray&>(rhs);
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index b25ae9f1577..b343a8f9cdb 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -199,6 +199,21 @@ public:
         }
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        const Self& src_vec = assert_cast<const Self&>(src);
+        auto new_size = indices_end - indices_begin;
+
+        for (uint32_t i = 0; i < new_size; ++i) {
+            auto offset = *(indices_begin + i);
+            if (offset == 0) {
+                data.emplace_back(T {});
+            } else {
+                data.emplace_back(src_vec.get_element(offset));
+            }
+        }
+    }
+
     void pop_back(size_t n) override { data.erase(data.end() - n, data.end()); 
}
     // it's impossible to use ComplexType as key , so we don't have to 
implement them
     [[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 307066a7ae9..280d2de8344 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -116,6 +116,11 @@ public:
         s += (indices_end - indices_begin);
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        s += (indices_end - indices_begin);
+    }
+
     void insert(const Field&) override { ++s; }
 
     void insert_data(const char*, size_t) override { ++s; }
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 6c1e8893a3d..e72e23bdcc7 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -132,6 +132,18 @@ public:
         }
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        auto origin_size = size();
+        auto new_size = indices_end - indices_begin;
+        data.resize(origin_size + new_size);
+        const T* __restrict src_data = reinterpret_cast<const 
T*>(src.get_raw_data().data);
+
+        for (uint32_t i = 0; i < new_size; ++i) {
+            data[origin_size + i] = src_data[indices_begin[i]];
+        }
+    }
+
     void insert_many_fix_len_data(const char* data_ptr, size_t num) override;
 
     void insert_many_raw_data(const char* pos, size_t num) override {
diff --git a/be/src/vec/columns/column_dictionary.h 
b/be/src/vec/columns/column_dictionary.h
index e00e5b425f8..1f107e629f4 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -82,6 +82,11 @@ public:
         LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary";
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        LOG(FATAL) << "insert_indices_from_join not supported in 
ColumnDictionary";
+    }
+
     void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported 
in ColumnDictionary"; }
 
     void update_hash_with_value(size_t n, SipHash& hash) const override {
diff --git a/be/src/vec/columns/column_fixed_length_object.h 
b/be/src/vec/columns/column_fixed_length_object.h
index dce6666f132..8a9075c3a94 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -105,6 +105,28 @@ public:
         }
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        const Self& src_vec = assert_cast<const Self&>(src);
+        auto origin_size = size();
+        auto new_size = indices_end - indices_begin;
+        if (_item_size == 0) {
+            _item_size = src_vec._item_size;
+        }
+        DCHECK(_item_size == src_vec._item_size) << "dst and src should have 
the same _item_size";
+        resize(origin_size + new_size);
+
+        for (uint32_t i = 0; i < new_size; ++i) {
+            auto offset = indices_begin[i];
+            if (offset) {
+                memcpy(&_data[(origin_size + i) * _item_size], 
&src_vec._data[offset * _item_size],
+                       _item_size);
+            } else {
+                memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
+            }
+        }
+    }
+
     void clear() override {
         _data.clear();
         _item_count = 0;
diff --git a/be/src/vec/columns/column_map.cpp 
b/be/src/vec/columns/column_map.cpp
index 47a41c3dcfe..60ce83edc10 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -196,6 +196,17 @@ void ColumnMap::insert_indices_from(const IColumn& src, 
const int* indices_begin
     }
 }
 
+void ColumnMap::insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                         const uint32_t* indices_end) {
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x == 0) {
+            ColumnMap::insert_default();
+        } else {
+            ColumnMap::insert_from(src, *x);
+        }
+    }
+}
+
 StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char 
const*& begin) const {
     size_t array_size = size_at(n);
     size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 551b6f9bfd6..b99cb90ae9c 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -133,6 +133,9 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     void append_data_by_selector(MutableColumnPtr& res,
                                  const IColumn::Selector& selector) const 
override {
         return append_data_by_selector_impl<ColumnMap>(res, selector);
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 42b88ac7ae9..c3558b36d93 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -308,6 +308,16 @@ void ColumnNullable::insert_indices_from(const IColumn& 
src, const int* indices_
     _need_update_has_null = true;
 }
 
+void ColumnNullable::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                              const uint32_t* indices_end) {
+    const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
+    
get_nested_column().insert_indices_from_join(src_concrete.get_nested_column(), 
indices_begin,
+                                                 indices_end);
+    
_get_null_map_column().insert_indices_from_join(src_concrete.get_null_map_column(),
+                                                    indices_begin, 
indices_end);
+    _need_update_has_null = true;
+}
+
 void ColumnNullable::insert(const Field& x) {
     if (x.is_null()) {
         get_nested_column().insert_default();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 3b97e708d11..5fba17c3635 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -123,6 +123,9 @@ public:
     void insert_range_from(const IColumn& src, size_t start, size_t length) 
override;
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     void insert(const Field& x) override;
     void insert_from(const IColumn& src, size_t n) override;
 
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index f3571c8ba29..d4ccab7f2e4 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1002,4 +1002,15 @@ void ColumnObject::insert_indices_from(const IColumn& 
src, const int* indices_be
                                    });
 }
 
+void ColumnObject::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                            const uint32_t* indices_end) {
+    // insert_indices_from with alignment
+    const ColumnObject& src_column = *check_and_get_column<ColumnObject>(src);
+    align_variant_by_name_and_type(*this, src_column, indices_end - 
indices_begin,
+                                   [indices_begin, indices_end](const IColumn& 
src, IColumn* dst) {
+                                       dst->insert_indices_from_join(src, 
indices_begin,
+                                                                     
indices_end);
+                                   });
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 6bff69b1e67..179a70c4ccb 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -287,6 +287,9 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     // May throw execption
     void try_insert(const Field& field);
 
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index 5d5abd64349..d205c7cd12e 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -161,6 +161,43 @@ void ColumnString::insert_indices_from(const IColumn& src, 
const int* indices_be
     }
 }
 
+void ColumnString::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                            const uint32_t* indices_end) {
+    const ColumnString& src_str = assert_cast<const ColumnString&>(src);
+    auto src_offset_data = src_str.offsets.data();
+
+    auto old_char_size = chars.size();
+    size_t total_chars_size = old_char_size;
+
+    auto dst_offsets_pos = offsets.size();
+    offsets.resize(offsets.size() + indices_end - indices_begin);
+    auto* dst_offsets_data = offsets.data();
+
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x != 0) {
+            total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
+        }
+        dst_offsets_data[dst_offsets_pos++] = total_chars_size;
+    }
+    check_chars_length(total_chars_size, offsets.size());
+
+    chars.resize(total_chars_size);
+
+    auto* src_data_ptr = src_str.chars.data();
+    auto* dst_data_ptr = chars.data();
+
+    size_t dst_chars_pos = old_char_size;
+    for (auto x = indices_begin; x != indices_end; ++x) {
+        if (*x != 0) {
+            const size_t size_to_append = src_offset_data[*x] - 
src_offset_data[*x - 1];
+            const size_t offset = src_offset_data[*x - 1];
+            memcpy_small_allow_read_write_overflow15(dst_data_ptr + 
dst_chars_pos,
+                                                     src_data_ptr + offset, 
size_to_append);
+            dst_chars_pos += size_to_append;
+        }
+    }
+}
+
 void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes, 
doris::PrimitiveType type,
                                           uint32_t rows, uint32_t offset,
                                           const uint8_t* __restrict null_data) 
const {
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index ae2bb9d25f9..b829eeaa377 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -475,6 +475,9 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     size_t filter(const Filter& filter) override;
 
diff --git a/be/src/vec/columns/column_struct.cpp 
b/be/src/vec/columns/column_struct.cpp
index 93c62139498..3502fdf581a 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -233,6 +233,15 @@ void ColumnStruct::insert_indices_from(const IColumn& src, 
const int* indices_be
     }
 }
 
+void ColumnStruct::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                            const uint32_t* indices_end) {
+    const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
+    for (size_t i = 0; i < columns.size(); ++i) {
+        columns[i]->insert_indices_from_join(src_concrete.get_column(i), 
indices_begin,
+                                             indices_end);
+    }
+}
+
 void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t 
length) {
     const size_t tuple_size = columns.size();
     for (size_t i = 0; i < tuple_size; ++i) {
diff --git a/be/src/vec/columns/column_struct.h 
b/be/src/vec/columns/column_struct.h
index 36854824198..b1dd0ba795f 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -125,6 +125,9 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
+
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
                          Permutation& res) const override {
         LOG(FATAL) << "get_permutation not implemented";
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index 744e74b4843..4575d089781 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -388,6 +388,29 @@ void ColumnVector<T>::insert_indices_from(const IColumn& 
src, const int* indices
     }
 }
 
+template <typename T>
+void ColumnVector<T>::insert_indices_from_join(const IColumn& src, const 
uint32_t* indices_begin,
+                                               const uint32_t* indices_end) {
+    auto origin_size = size();
+    auto new_size = indices_end - indices_begin;
+    data.resize(origin_size + new_size);
+
+    const T* __restrict src_data = reinterpret_cast<const 
T*>(src.get_raw_data().data);
+
+    if constexpr (std::is_same_v<T, UInt8>) {
+        // nullmap : indices_begin[i] == 0 means is null at the here, set true 
here
+        for (uint32_t i = 0; i < new_size; ++i) {
+            data[origin_size + i] =
+                    (indices_begin[i] == 0) + (indices_begin[i] != 0) * 
src_data[indices_begin[i]];
+        }
+    } else {
+        // real data : indices_begin[i] == 0 what at is meaningless
+        for (uint32_t i = 0; i < new_size; ++i) {
+            data[origin_size + i] = src_data[indices_begin[i]];
+        }
+    }
+}
+
 template <typename T>
 ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t 
result_size_hint) const {
     size_t size = data.size();
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 85d64740d19..f795a1057ab 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -382,6 +382,8 @@ public:
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override;
     void fill(const value_type& element, size_t num) {
         auto old_size = data.size();
         auto new_size = old_size + num;
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 3aec3b1540f..ce5101c8030 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -166,6 +166,11 @@ public:
         LOG(FATAL) << "insert_indices_from not supported in 
PredicateColumnType";
     }
 
+    void insert_indices_from_join(const IColumn& src, const uint32_t* 
indices_begin,
+                                  const uint32_t* indices_end) override {
+        LOG(FATAL) << "insert_indices_from_join not supported in 
PredicateColumnType";
+    }
+
     void pop_back(size_t n) override {
         LOG(FATAL) << "pop_back not supported in PredicateColumnType";
     }
diff --git a/be/src/vec/common/hash_table/hash_map.h 
b/be/src/vec/common/hash_table/hash_map.h
index 5b7cd6f4642..c26234b4e22 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -20,6 +20,8 @@
 
 #pragma once
 
+#include <span>
+
 #include "vec/common/hash_table/hash.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_allocator.h"
@@ -193,10 +195,222 @@ public:
     bool has_null_key_data() const { return false; }
 };
 
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower, 
Allocator> {
+public:
+    using Self = JoinHashMapTable;
+    using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+    using key_type = Key;
+    using value_type = typename Cell::value_type;
+    using mapped_type = typename Cell::Mapped;
+
+    using LookupResult = typename Base::LookupResult;
+
+    using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+    static uint32_t calc_bucket_size(size_t num_elem) {
+        size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem 
- 1) / 7;
+        return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
+    }
+
+    template <int JoinOpType>
+    void prepare_build(size_t num_elem, int batch_size) {
+        max_batch_size = batch_size;
+        bucket_size = calc_bucket_size(num_elem + 1);
+        first.resize(bucket_size, 0);
+        next.resize(num_elem);
+
+        if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            visited.resize(num_elem, 0);
+        }
+    }
+
+    uint32_t get_bucket_size() const { return bucket_size; }
+
+    void build(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
+               size_t num_elem) {
+        build_keys = keys;
+        for (size_t i = 1; i < num_elem; i++) {
+            uint32_t bucket_num = bucket_nums[i];
+            next[i] = first[bucket_num];
+            first[bucket_num] = i;
+        }
+    }
+
+    template <int JoinOpType>
+    auto find_batch(const Key* __restrict keys, const uint32_t* __restrict 
bucket_nums,
+                    int probe_idx, int probe_rows, uint32_t* __restrict 
probe_idxs,
+                    uint32_t* __restrict build_idxs) {
+        if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
+                      JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
+            return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums, 
probe_idx,
+                                                            probe_rows, 
probe_idxs, build_idxs);
+        }
+        if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
+            return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums, 
probe_idx, probe_rows,
+                                                          probe_idxs);
+        }
+        if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+                      JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+            return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx, 
probe_rows);
+        }
+        return std::pair {0, 0};
+    }
+
+    template <int JoinOpType>
+    bool iterate_map(std::vector<uint32_t>& build_idxs) const {
+        const auto batch_size = max_batch_size;
+        const auto elem_num = visited.size();
+        int count = 0;
+        build_idxs.resize(batch_size);
+
+        while (count < batch_size && iter_idx < elem_num) {
+            const auto matched = visited[iter_idx];
+            build_idxs[count] = iter_idx;
+            if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN) {
+                count += !matched;
+            } else {
+                count += matched;
+            }
+            iter_idx++;
+        }
+
+        build_idxs.resize(count);
+        return iter_idx >= elem_num;
+    }
+
+private:
+    auto _find_batch_right_semi_anti(const Key* __restrict keys,
+                                     const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                     int probe_rows) {
+        while (LIKELY(probe_idx < probe_rows)) {
+            auto build_idx = first[bucket_nums[probe_idx]];
+
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    visited[build_idx] = 1;
+                }
+                build_idx = next[build_idx];
+            }
+            probe_idx++;
+        }
+        return std::pair {probe_idx, 0};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_left_semi_anti(const Key* __restrict keys,
+                                    const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                    int probe_rows, uint32_t* __restrict 
probe_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            uint32_t bucket_num = bucket_nums[probe_idx];
+            auto build_idx = first[bucket_num];
+
+            while (build_idx) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    break;
+                }
+                build_idx = next[build_idx];
+            }
+            bool matched =
+                    JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx 
!= 0 : build_idx == 0;
+            matched_cnt += matched;
+            probe_idxs[matched_cnt - matched] = probe_idx++;
+        }
+        return std::pair {probe_idx, matched_cnt};
+    }
+
+    template <int JoinOpType>
+    auto _find_batch_inner_outer_join(const Key* __restrict keys,
+                                      const uint32_t* __restrict bucket_nums, 
int probe_idx,
+                                      int probe_rows, uint32_t* __restrict 
probe_idxs,
+                                      uint32_t* __restrict build_idxs) {
+        auto matched_cnt = 0;
+        const auto batch_size = max_batch_size;
+        size_t build_idx = 0;
+
+        auto do_the_probe = [&]() {
+            while (build_idx && matched_cnt < batch_size) {
+                if (keys[probe_idx] == build_keys[build_idx]) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = build_idx;
+                    if constexpr (JoinOpType == 
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+                                  JoinOpType == 
doris::TJoinOp::FULL_OUTER_JOIN) {
+                        visited[build_idx] = 1;
+                    }
+                    matched_cnt++;
+                }
+                build_idx = next[build_idx];
+            }
+
+            if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+                          JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+                // `(!matched_cnt || probe_idxs[matched_cnt - 1] != 
probe_idx)` means not match one build side
+                if (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx) {
+                    probe_idxs[matched_cnt] = probe_idx;
+                    build_idxs[matched_cnt] = 0;
+                    matched_cnt++;
+                }
+            }
+            probe_idx++;
+        };
+
+        if (probe_idx == current_probe_idx) {
+            current_probe_idx = -1;
+            build_idx = current_build_idx;
+            current_build_idx = 0;
+            do_the_probe();
+        }
+
+        while (probe_idx < probe_rows && matched_cnt < batch_size) {
+            uint32_t bucket_num = bucket_nums[probe_idx];
+            build_idx = first[bucket_num];
+            do_the_probe();
+        }
+
+        if (matched_cnt == batch_size && build_idx) {
+            probe_idx--;
+            current_probe_idx = probe_idx;
+            current_build_idx = build_idx;
+        }
+        return std::pair {probe_idx, matched_cnt};
+    }
+
+    const Key* __restrict build_keys;
+    std::vector<uint8_t> visited;
+
+    uint32_t bucket_size = 0;
+    int max_batch_size = 0;
+
+    int current_probe_idx = -1;
+    uint32_t current_build_idx = 0;
+
+    std::vector<uint32_t> first;
+    std::vector<uint32_t> next;
+
+    // use in iter hash map
+    mutable uint32_t iter_idx = 1;
+    Cell cell;
+    doris::vectorized::Arena* pool;
+};
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash, 
Grower, Allocator>;
 
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+using JoinFixedHashMap = JoinHashMapTable<Key, HashMapCell<Key, Mapped, Hash>, 
Hash>;
+
 template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
           typename Grower = HashTableGrower<>, typename Allocator = 
HashTableAllocator>
 using HashMapWithSavedHash =
diff --git a/be/src/vec/common/hash_table/hash_map_context.h 
b/be/src/vec/common/hash_table/hash_map_context.h
index 35df772b16d..64216986365 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -55,6 +55,9 @@ struct MethodBase {
     Arena arena;
     std::vector<size_t> hash_values;
 
+    // use in join case
+    std::vector<uint32_t> bucket_nums;
+
     MethodBase() { hash_table.reset(new HashMap()); }
     virtual ~MethodBase() = default;
 
@@ -69,8 +72,32 @@ struct MethodBase {
             iterator = hash_table->begin();
         }
     }
+
     virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                                      const uint8_t* null_map = nullptr) = 0;
+                                      const uint8_t* null_map = nullptr, bool 
is_join = false,
+                                      bool is_build = false, uint32_t 
bucket_size = 0) = 0;
+
+    void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const 
uint8_t* null_map) {
+        bucket_nums.resize(num_rows);
+
+        if (null_map == nullptr) {
+            init_join_bucket_num(num_rows, bucket_size);
+            return;
+        }
+        for (uint32_t k = 0; k < num_rows; ++k) {
+            if (null_map[k]) {
+                continue;
+            }
+
+            bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+        }
+    }
+
+    void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size) {
+        for (uint32_t k = 0; k < num_rows; ++k) {
+            bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+        }
+    }
 
     void init_hash_values(size_t num_rows, const uint8_t* null_map) {
         if (null_map == nullptr) {
@@ -148,7 +175,10 @@ struct MethodSerialized : public MethodBase<TData> {
     using Base::init_iterator;
     using State = ColumnsHashing::HashMethodSerialized<typename Base::Value, 
typename Base::Mapped>;
     using Base::try_presis_key;
-
+    // need keep until the hash probe end.
+    std::vector<StringRef> build_stored_keys;
+    Arena build_arena;
+    // refresh each time probe
     std::vector<StringRef> stored_keys;
 
     StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
@@ -163,40 +193,48 @@ struct MethodSerialized : public MethodBase<TData> {
         return {begin, sum_size};
     }
 
-    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
-        Base::arena.clear();
-        stored_keys.resize(num_rows);
+    void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                                   std::vector<StringRef>& keys, Arena& arena) 
{
+        arena.clear();
+        keys.resize(num_rows);
 
         size_t max_one_row_byte_size = 0;
         for (const auto& column : key_columns) {
             max_one_row_byte_size += column->get_max_row_byte_size();
         }
         size_t total_bytes = max_one_row_byte_size * num_rows;
-
         if (total_bytes > config::pre_serialize_keys_limit_bytes) {
             // reach mem limit, don't serialize in batch
             size_t keys_size = key_columns.size();
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i] =
-                        serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, Base::arena);
+                keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, arena);
             }
         } else {
-            auto* serialized_key_buffer =
-                    reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes));
+            auto* serialized_key_buffer = 
reinterpret_cast<uint8_t*>(arena.alloc(total_bytes));
 
             for (size_t i = 0; i < num_rows; ++i) {
-                stored_keys[i].data =
+                keys[i].data =
                         reinterpret_cast<char*>(serialized_key_buffer + i * 
max_one_row_byte_size);
-                stored_keys[i].size = 0;
+                keys[i].size = 0;
             }
 
             for (const auto& column : key_columns) {
-                column->serialize_vec(stored_keys, num_rows, 
max_one_row_byte_size);
+                column->serialize_vec(keys, num_rows, max_one_row_byte_size);
             }
         }
-        Base::keys = stored_keys.data();
-        Base::init_hash_values(num_rows, null_map);
+        Base::keys = keys.data();
+    }
+
+    void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
+                              const uint8_t* null_map = nullptr, bool is_join 
= false,
+                              bool is_build = false, uint32_t bucket_size = 0) 
override {
+        init_serialized_keys_impl(key_columns, num_rows, is_build ? 
build_stored_keys : stored_keys,
+                                  is_build ? build_arena : Base::arena);
+        if (is_join) {
+            Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+        } else {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
@@ -222,7 +260,8 @@ struct MethodStringNoCache : public MethodBase<TData> {
     std::vector<StringRef> stored_keys;
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_join 
= false,
+                              bool is_build = false, uint32_t bucket_size = 0) 
override {
         const IColumn& column = *key_columns[0];
         const auto& column_string = assert_cast<const ColumnString&>(
                 column.is_nullable()
@@ -237,7 +276,11 @@ struct MethodStringNoCache : public MethodBase<TData> {
         }
 
         Base::keys = stored_keys.data();
-        Base::init_hash_values(num_rows, null_map);
+        if (is_join) {
+            Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+        } else {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<StringRef>& keys, 
MutableColumns& key_columns,
@@ -258,7 +301,8 @@ struct MethodOneNumber : public MethodBase<TData> {
                                                       FieldType>;
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_join 
= false,
+                              bool is_build = false, uint32_t bucket_size = 0) 
override {
         Base::keys = (FieldType*)(key_columns[0]->is_nullable()
                                           ? assert_cast<const 
ColumnNullable*>(key_columns[0])
                                                     ->get_nested_column_ptr()
@@ -266,7 +310,11 @@ struct MethodOneNumber : public MethodBase<TData> {
                              ->get_raw_data()
                              .data;
         std::string name = key_columns[0]->get_name();
-        Base::init_hash_values(num_rows, null_map);
+        if (is_join) {
+            Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+        } else {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -292,17 +340,20 @@ struct MethodKeysFixed : public MethodBase<TData> {
     using State = ColumnsHashing::HashMethodKeysFixed<typename Base::Value, 
Key, Mapped,
                                                       has_nullable_keys>;
 
+    // need keep until the hash probe end. use only in join
+    std::vector<Key> build_stored_keys;
+    // refresh each time probe hash table
     std::vector<Key> stored_keys;
     Sizes key_sizes;
 
     MethodKeysFixed(Sizes key_sizes_) : key_sizes(std::move(key_sizes_)) {}
 
     template <typename T>
-    std::vector<T> pack_fixeds(size_t row_numbers, const ColumnRawPtrs& 
key_columns,
-                               const ColumnRawPtrs& nullmap_columns) {
+    void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns,
+                     const ColumnRawPtrs& nullmap_columns, std::vector<T>& 
result) {
         size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
+        result.resize(row_numbers);
 
-        std::vector<T> result(row_numbers);
         size_t offset = 0;
         if (bitmap_size > 0) {
             for (size_t j = 0; j < nullmap_columns.size(); j++) {
@@ -356,11 +407,11 @@ struct MethodKeysFixed : public MethodBase<TData> {
             }
             offset += key_sizes[j];
         }
-        return result;
     }
 
     void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t 
num_rows,
-                              const uint8_t* null_map = nullptr) override {
+                              const uint8_t* null_map = nullptr, bool is_join 
= false,
+                              bool is_build = false, uint32_t bucket_size = 0) 
override {
         ColumnRawPtrs actual_columns;
         ColumnRawPtrs null_maps;
         if (has_nullable_keys) {
@@ -378,9 +429,20 @@ struct MethodKeysFixed : public MethodBase<TData> {
         } else {
             actual_columns = key_columns;
         }
-        stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps);
-        Base::keys = stored_keys.data();
-        Base::init_hash_values(num_rows, null_map);
+
+        if (is_build) {
+            pack_fixeds<Key>(num_rows, actual_columns, null_maps, 
build_stored_keys);
+            Base::keys = build_stored_keys.data();
+        } else {
+            pack_fixeds<Key>(num_rows, actual_columns, null_maps, stored_keys);
+            Base::keys = stored_keys.data();
+        }
+
+        if (is_join) {
+            Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+        } else {
+            Base::init_hash_values(num_rows, null_map);
+        }
     }
 
     void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -488,14 +550,14 @@ struct MethodSingleNullableColumn : public 
SingleColumnMethod {
 #endif
 
 template <typename RowRefListType>
-using SerializedHashTableContext = 
MethodSerialized<PartitionedHashMap<StringRef, RowRefListType>>;
+using SerializedHashTableContext = 
MethodSerialized<JoinFixedHashMap<StringRef, RowRefListType>>;
 
 template <class T, typename RowRefListType>
 using PrimaryTypeHashTableContext =
-        MethodOneNumber<T, PartitionedHashMap<T, RowRefListType, 
HashCRC32<T>>>;
+        MethodOneNumber<T, JoinFixedHashMap<T, RowRefListType, HashCRC32<T>>>;
 
 template <class Key, bool has_null, typename Value>
 using FixedKeyHashTableContext =
-        MethodKeysFixed<PartitionedHashMap<Key, Value, HashCRC32<Key>>, 
has_null>;
+        MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>, 
has_null>;
 
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 39ee5ec9e0a..7287f2e0276 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -441,7 +441,6 @@ protected:
     Cell* buf {nullptr}; /// A piece of memory for all elements except the 
element with zero key.
     Grower grower;
     int64_t _resize_timer_ns;
-
     // the bucket count threshold above which it's converted to partioned hash 
table
     // > 0: enable convert dynamically
     // 0: convert is disabled
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h 
b/be/src/vec/common/hash_table/hash_table_set_build.h
index e3c1ed27b1f..48c9e2d1673 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -24,11 +24,9 @@ namespace doris::vectorized {
 template <class HashTableContext, bool is_intersect>
 struct HashTableBuild {
     template <typename Parent>
-    HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, 
uint8_t offset,
-                   RuntimeState* state)
+    HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, 
RuntimeState* state)
             : _mem_used(parent->mem_used()),
               _rows(rows),
-              _offset(offset),
               _build_raw_ptrs(build_raw_ptrs),
               _state(state) {}
 
@@ -48,9 +46,9 @@ struct HashTableBuild {
         size_t k = 0;
         auto creator = [&](const auto& ctor, auto& key, auto& origin) {
             HashTableContext::try_presis_key(key, origin, arena);
-            ctor(key, Mapped {k, _offset});
+            ctor(key, Mapped {k});
         };
-        auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; 
};
+        auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; };
 
         for (; k < _rows; ++k) {
             if (k % CHECK_FRECUENCY == 0) {
@@ -64,7 +62,6 @@ struct HashTableBuild {
 private:
     int64_t* _mem_used;
     const int _rows;
-    const uint8_t _offset;
     ColumnRawPtrs& _build_raw_ptrs;
     RuntimeState* _state;
 };
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index 1b8b8f2c695..858f5197b03 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -18,7 +18,6 @@
 #pragma once
 #include "vec/common/arena.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
 #include "vec/core/block.h"
 
 namespace doris::vectorized {
@@ -45,19 +44,19 @@ namespace doris::vectorized {
  */
 struct RowRef {
     uint32_t row_num = 0;
-    uint8_t block_offset;
 
     RowRef() = default;
-    RowRef(size_t row_num_count, uint8_t block_offset_)
-            : row_num(row_num_count), block_offset(block_offset_) {}
+    RowRef(size_t row_num_count) : row_num(row_num_count) {}
+    void clear() {};
 };
 
 struct RowRefWithFlag : public RowRef {
     bool visited;
 
     RowRefWithFlag() = default;
-    RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool 
is_visited = false)
-            : RowRef(row_num_count, block_offset_), visited(is_visited) {}
+    RowRefWithFlag(size_t row_num_count, bool is_visited = false)
+            : RowRef(row_num_count), visited(is_visited) {}
+    void clear() {};
 };
 
 /// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
@@ -93,14 +92,15 @@ public:
     ForwardIterator() : root(nullptr), first(false), batch(nullptr), 
position(0) {}
 
     ForwardIterator(RowRefListType* begin)
-            : root(begin), first(true), batch(root->next), position(0) {}
+            : root(begin), first(true), batch((&root->next)), position(0) {}
 
     RowRefType& operator*() {
         if (first) {
             return *root;
         }
-        return batch->row_refs[position];
+        return batch->operator[](position);
     }
+
     RowRefType* operator->() { return &(**this); }
 
     void operator++() {
@@ -109,21 +109,17 @@ public:
             return;
         }
 
-        if (batch) {
+        if (batch && position < batch->size()) {
             ++position;
-            if (position >= batch->size) {
-                batch = batch->next;
-                position = 0;
-            }
         }
     }
 
-    bool ok() const { return first || batch; }
+    bool ok() const { return first || (batch && position < batch->size()); }
 
 private:
     RowRefListType* root;
     bool first;
-    Batch<RowRefType>* batch;
+    std::vector<RowRefType>* batch;
     size_t position;
 };
 
@@ -131,76 +127,60 @@ struct RowRefList : RowRef {
     using RowRefType = RowRef;
 
     RowRefList() = default;
-    RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_, 
block_offset_) {}
+    RowRefList(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefList> begin() { return 
ForwardIterator<RowRefList>(this); }
 
     /// insert element after current one
-    void insert(RowRefType&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(std::move(row_ref)); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefList>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlag : RowRef {
     using RowRefType = RowRef;
 
     RowRefListWithFlag() = default;
-    RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) : 
RowRef(row_num_, block_offset_) {}
+    RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
 
     ForwardIterator<RowRefListWithFlag> const begin() {
         return ForwardIterator<RowRefListWithFlag>(this);
     }
 
     /// insert element after current one
-    void insert(RowRef&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
     bool visited = false;
 
 private:
     friend class ForwardIterator<RowRefListWithFlag>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 struct RowRefListWithFlags : RowRefWithFlag {
     using RowRefType = RowRefWithFlag;
 
     RowRefListWithFlags() = default;
-    RowRefListWithFlags(size_t row_num_, uint8_t block_offset_)
-            : RowRefWithFlag(row_num_, block_offset_) {}
+    RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
 
     ForwardIterator<RowRefListWithFlags> const begin() {
         return ForwardIterator<RowRefListWithFlags>(this);
     }
 
     /// insert element after current one
-    void insert(RowRefWithFlag&& row_ref, Arena& pool) {
-        if (!next) {
-            next = pool.alloc<Batch<RowRefType>>();
-            *next = Batch<RowRefType>(nullptr);
-        }
-        next = next->insert(std::move(row_ref), pool);
-    }
+    void insert(RowRefType&& row_ref, Arena& pool) { 
next.emplace_back(row_ref); }
+
+    void clear() { next.clear(); }
 
 private:
     friend class ForwardIterator<RowRefListWithFlags>;
-
-    Batch<RowRefType>* next = nullptr;
+    std::vector<RowRefType> next;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index 435cea84186..9c2fd6094b5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -75,7 +75,7 @@ struct ProcessHashTableProbe {
                                                UInt8* __restrict null_map_data,
                                                UInt8* __restrict filter_map, 
Block* output_block);
 
-    void _emplace_element(int8_t block_offset, int32_t block_row, int& 
current_offset);
+    void _emplace_element(int32_t block_row, int& current_offset);
 
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
@@ -94,14 +94,13 @@ struct ProcessHashTableProbe {
 
     Parent* _parent;
     const int _batch_size;
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     std::unique_ptr<Arena> _arena;
     std::vector<StringRef> _probe_keys;
 
     std::vector<uint32_t> _probe_indexs;
-    PaddedPODArray<int8_t> _build_block_offsets;
-    PaddedPODArray<int32_t> _build_block_rows;
-    std::vector<std::pair<int8_t, int>> _build_blocks_locs;
+    std::vector<uint32_t> _build_indexs;
+    std::vector<int> _build_blocks_locs;
     // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
     ColumnUInt8::Container* _tuple_is_null_left_flags;
     // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index f4a3010c49f..248e8f42328 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -32,7 +32,7 @@ template <int JoinOpType, typename Parent>
 ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* 
parent, int batch_size)
         : _parent(parent),
           _batch_size(batch_size),
-          _build_blocks(parent->build_blocks()),
+          _build_block(parent->build_block()),
           _tuple_is_null_left_flags(parent->is_outer_join()
                                             ? &(reinterpret_cast<ColumnUInt8&>(
                                                         
*parent->_tuple_is_null_left_flag_column)
@@ -69,51 +69,13 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
     if (!is_semi_anti_join || have_other_join_conjunct) {
-        if (_build_blocks->size() == 1) {
-            for (int i = 0; i < _right_col_len; i++) {
-                auto& column = *(*_build_blocks)[0].get_by_position(i).column;
-                if (output_slot_flags[i]) {
-                    mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
-                                                                  
_build_block_rows.data() + size);
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
-            }
-        } else {
-            for (int i = 0; i < _right_col_len; i++) {
-                if (output_slot_flags[i]) {
-                    for (int j = 0; j < size; j++) {
-                        if constexpr (probe_all) {
-                            if (_build_block_offsets[j] == -1) {
-                                DCHECK(mcol[i + 
_right_col_idx]->is_nullable());
-                                assert_cast<ColumnNullable*>(mcol[i + 
_right_col_idx].get())
-                                        ->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        } else {
-                            if (_build_block_offsets[j] == -1) {
-                                // the only case to reach here:
-                                // 1. left anti join with other conjuncts, and
-                                // 2. equal conjuncts does not match
-                                // since nullptr is emplaced back to 
visited_map,
-                                // the output value of the build side does not 
matter,
-                                // just insert default value
-                                mcol[i + _right_col_idx]->insert_default();
-                            } else {
-                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
-                                                        .get_by_position(i)
-                                                        .column;
-                                mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
-                            }
-                        }
-                    }
-                } else {
-                    mcol[i + _right_col_idx]->insert_many_defaults(size);
-                }
+        for (int i = 0; i < _right_col_len; i++) {
+            const auto& column = *_build_block->get_by_position(i).column;
+            if (output_slot_flags[i]) {
+                mcol[i + _right_col_idx]->insert_indices_from_join(column, 
_build_indexs.data(),
+                                                                   
_build_indexs.data() + size);
+            } else {
+                mcol[i + _right_col_idx]->insert_many_defaults(size);
             }
         }
     }
@@ -123,7 +85,7 @@ void ProcessHashTableProbe<JoinOpType, 
Parent>::build_side_output_column(
         _tuple_is_null_right_flags->resize(size);
         auto* __restrict null_data = _tuple_is_null_right_flags->data();
         for (int i = 0; i < size; ++i) {
-            null_data[i] = _build_block_rows[i] == -1;
+            null_data[i] = _build_indexs[i] == 0;
         }
     }
 }
@@ -166,8 +128,7 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
     _right_col_len = _parent->right_table_data_types().size();
     _row_count_from_last_probe = 0;
 
-    _build_block_rows.clear();
-    _build_block_offsets.clear();
+    _build_indexs.clear();
     _probe_indexs.clear();
     if (with_other_join_conjuncts) {
         // use in right join to change visited state after exec the vother 
join conjunct
@@ -176,14 +137,14 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
         _visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
         _same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
     }
-    _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
-    _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
+    _probe_indexs.resize(_batch_size);
+    _build_indexs.resize(_batch_size);
 
     if (!_parent->_ready_probe) {
         _parent->_ready_probe = true;
         hash_table_ctx.reset();
-        hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map);
+        hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
probe_rows, null_map, true,
+                                            false, 
hash_table_ctx.hash_table->get_bucket_size());
     }
     return typename HashTableType::State(_parent->_probe_columns);
 }
@@ -199,8 +160,7 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 
     SCOPED_TIMER(_search_hashtable_timer);
     for (; probe_row_match_iter.ok() && current_offset < _batch_size; 
++probe_row_match_iter) {
-        _emplace_element(probe_row_match_iter->block_offset, 
probe_row_match_iter->row_num,
-                         current_offset);
+        _emplace_element(probe_row_match_iter->row_num, current_offset);
         _probe_indexs.emplace_back(probe_index);
         if constexpr (with_other_join_conjuncts) {
             _visited_map.emplace_back(&probe_row_match_iter->visited);
@@ -218,11 +178,9 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_m
 }
 
 template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t 
block_offset,
-                                                                 int32_t 
block_row,
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t 
block_row,
                                                                  int& 
current_offset) {
-    _build_block_offsets.emplace_back(block_offset);
-    _build_block_rows.emplace_back(block_row);
+    _build_indexs.emplace_back(block_row);
     current_offset++;
 }
 
@@ -236,25 +194,17 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
                                                              size_t 
probe_rows) {
     auto& probe_index = _parent->_probe_index;
 
-    using KeyGetter = typename HashTableType::State;
     using Mapped = typename HashTableType::Mapped;
 
-    KeyGetter key_getter =
-            _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
-                                            need_null_map_for_probe ? 
null_map->data() : nullptr);
+    _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts,
+                                    need_null_map_for_probe ? null_map->data() 
: nullptr);
 
     auto& mcol = mutable_block.mutable_columns();
 
-    constexpr auto is_right_semi_anti_join =
-            JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == 
TJoinOp::RIGHT_SEMI_JOIN;
-
-    constexpr auto probe_all =
-            JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
-
     int last_probe_index = probe_index;
 
     int current_offset = 0;
-    bool all_match_one = true;
+    bool all_match_one = false;
     size_t probe_size = 0;
 
     auto& probe_row_match_iter = _probe_row_match<Mapped, 
with_other_conjuncts>(
@@ -283,121 +233,13 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::do_process(HashTableType& hash
 
     {
         SCOPED_TIMER(_search_hashtable_timer);
-        using FindResult = KeyGetter::FindResult;
-        FindResult empty = {nullptr, false};
-        while (current_offset < _batch_size && probe_index < probe_rows) {
-            if constexpr (ignore_null && need_null_map_for_probe) {
-                if ((*null_map)[probe_index]) {
-                    if constexpr (probe_all) {
-                        // only full outer / left outer need insert the data 
of right table
-                        _emplace_element(-1, -1, current_offset);
-                        _probe_indexs.emplace_back(probe_index);
-
-                        if constexpr (with_other_conjuncts) {
-                            _same_to_prev.emplace_back(false);
-                            _visited_map.emplace_back(nullptr);
-                        }
-                    } else {
-                        all_match_one = false;
-                    }
-                    probe_index++;
-                    continue;
-                }
-            }
-
-            const auto& find_result = need_null_map_for_probe && 
(*null_map)[probe_index]
-                                              ? empty
-                                              : 
hash_table_ctx.find(key_getter, probe_index);
-
-            auto current_probe_index = probe_index;
-            if constexpr (!with_other_conjuncts &&
-                          (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
-                           JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                           JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) {
-                bool need_go_ahead =
-                        (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ 
find_result.is_found();
-                if constexpr (is_mark_join) {
-                    ++current_offset;
-                    bool null_result = (need_null_map_for_probe && 
(*null_map)[probe_index]) ||
-                                       (!need_go_ahead && 
*_has_null_in_build_side);
-                    if (null_result) {
-                        mark_column->insert_null();
-                    } else {
-                        mark_column->insert_value(need_go_ahead);
-                    }
-                } else {
-                    current_offset += need_go_ahead;
-                }
-                ++probe_index;
-            } else {
-                if (find_result.is_found()) {
-                    auto& mapped = find_result.get_mapped();
-                    auto origin_offset = current_offset;
-
-                    // For mark join, if euqual-matched tuple count for one 
probe row
-                    // excceeds batch size, it's difficult to implement the 
logic to
-                    // split them into multiple sub blocks and handle them, 
keep the original
-                    // logic for now.
-                    if constexpr (is_mark_join && with_other_conjuncts) {
-                        for (auto it = mapped.begin(); it.ok(); ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
-                            _visited_map.emplace_back(&it->visited);
-                        }
-                        ++probe_index;
-                    } else if constexpr (with_other_conjuncts || 
!is_right_semi_anti_join) {
-                        auto multi_match_last_offset = current_offset;
-                        auto it = mapped.begin();
-                        for (; it.ok() && current_offset < _batch_size; ++it) {
-                            _emplace_element(it->block_offset, it->row_num, 
current_offset);
-
-                            if constexpr (with_other_conjuncts) {
-                                _visited_map.emplace_back(&it->visited);
-                            }
-                        }
-                        probe_row_match_iter = it;
-                        if (!it.ok()) {
-                            // If all matched rows for the current probe row 
are handled,
-                            // advance to next probe row.
-                            // If not(which means it excceed batch size), 
probe_index is not increased and
-                            // remaining matched rows for the current probe 
row will be
-                            // handled in the next call of this function
-                            ++probe_index;
-                        } else if constexpr (with_other_conjuncts) {
-                            multi_matched_output_row_count =
-                                    current_offset - multi_match_last_offset;
-                        }
-                    } else {
-                        ++probe_index;
-                    }
-                    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                        mapped.visited = true;
-                    }
-
-                    if constexpr (with_other_conjuncts) {
-                        _same_to_prev.emplace_back(false);
-                        for (int i = 0; i < current_offset - origin_offset - 
1; ++i) {
-                            _same_to_prev.emplace_back(true);
-                        }
-                    }
-                } else if constexpr (probe_all || JoinOpType == 
TJoinOp::LEFT_ANTI_JOIN ||
-                                     JoinOpType == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
-                                     (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && 
is_mark_join)) {
-                    // only full outer / left outer need insert the data of 
right table
-                    _emplace_element(-1, -1, current_offset);
-
-                    if constexpr (with_other_conjuncts) {
-                        _same_to_prev.emplace_back(false);
-                        _visited_map.emplace_back(nullptr);
-                    }
-                    ++probe_index;
-                } else {
-                    ++probe_index;
-                }
-            }
-            all_match_one &= (current_offset == _probe_indexs.size() + 1);
-            _probe_indexs.resize(current_offset, current_probe_index);
-        }
-        probe_size = probe_index - last_probe_index + 
probe_row_match_iter.ok();
+        auto [new_probe_idx, new_current_offset] =
+                hash_table_ctx.hash_table->template find_batch<JoinOpType>(
+                        hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
+                        probe_rows, _probe_indexs.data(), 
_build_indexs.data());
+        probe_index = new_probe_idx;
+        current_offset = new_current_offset;
+        probe_size = probe_index - last_probe_index;
     }
 
     build_side_output_column(mcol, *_right_output_slot_flags, current_offset, 
with_other_conjuncts);
@@ -776,129 +618,21 @@ template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
         HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* 
output_block,
         bool* eos) {
-    using Mapped = typename HashTableType::Mapped;
     SCOPED_TIMER(_probe_process_hashtable_timer);
-    if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
-                  std::is_same_v<Mapped, RowRefListWithFlags>) {
-        hash_table_ctx.init_iterator();
-        auto& mcol = mutable_block.mutable_columns();
-
-        bool right_semi_anti_without_other = _is_right_semi_anti && 
!_have_other_join_conjunct;
-        int right_col_idx =
-                right_semi_anti_without_other ? 0 : 
_parent->left_table_data_types().size();
-        int right_col_len = _parent->right_table_data_types().size();
-
-        auto& iter = hash_table_ctx.iterator;
-        auto block_size = 0;
-        auto& visited_iter =
-                
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
-        _build_blocks_locs.resize(_batch_size);
-        auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
-            _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, 
row_nums);
-        };
-
-        if (visited_iter.ok()) {
-            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                }
-            } else {
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                    } else {
-                        if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                    }
-                }
-            }
-            if (!visited_iter.ok()) {
-                ++iter;
-            }
-        }
-
-        for (; iter != hash_table_ctx.hash_table->end() && block_size < 
_batch_size; ++iter) {
-            auto& mapped = iter->get_second();
-            if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
-                if (mapped.visited) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        visited_iter = mapped.begin();
-                        for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                        if (visited_iter.ok()) {
-                            // block_size >= _batch_size, quit for loop
-                            break;
-                        }
-                    }
-                } else {
-                    if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
-                        visited_iter = mapped.begin();
-                        for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                        if (visited_iter.ok()) {
-                            // block_size >= _batch_size, quit for loop
-                            break;
-                        }
-                    }
-                }
-            } else {
-                visited_iter = mapped.begin();
-                for (; visited_iter.ok() && block_size < _batch_size; 
++visited_iter) {
-                    if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
-                        if (visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                    } else {
-                        if (!visited_iter->visited) {
-                            register_build_loc(visited_iter->block_offset, 
visited_iter->row_num);
-                        }
-                    }
-                }
-                if (visited_iter.ok()) {
-                    // block_size >= _batch_size, quit for loop
-                    break;
-                }
-            }
-        }
-        _build_blocks_locs.resize(block_size);
-
-        auto insert_build_rows = [&](int8_t offset) {
-            for (size_t j = 0; j < right_col_len; ++j) {
-                auto& column = 
*(*_build_blocks)[offset].get_by_position(j).column;
-                mcol[j + right_col_idx]->insert_indices_from(
-                        column, _build_block_rows.data(),
-                        _build_block_rows.data() + _build_block_rows.size());
-            }
-        };
-        if (_build_blocks->size() > 1) {
-            std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
-                      [](const auto a, const auto b) { return a.first > 
b.first; });
-            auto start = 0, end = 0;
-            while (start < _build_blocks_locs.size()) {
-                while (end < _build_blocks_locs.size() &&
-                       _build_blocks_locs[start].first == 
_build_blocks_locs[end].first) {
-                    end++;
-                }
-                auto offset = _build_blocks_locs[start].first;
-                _build_block_rows.resize(end - start);
-                for (int i = 0; start + i < end; i++) {
-                    _build_block_rows[i] = _build_blocks_locs[start + 
i].second;
-                }
-                start = end;
-                insert_build_rows(offset);
-            }
-        } else if (_build_blocks->size() == 1) {
-            const auto size = _build_blocks_locs.size();
-            _build_block_rows.resize(_build_blocks_locs.size());
-            for (int i = 0; i < size; i++) {
-                _build_block_rows[i] = _build_blocks_locs[i].second;
-            }
-            insert_build_rows(0);
+    auto& mcol = mutable_block.mutable_columns();
+    *eos = hash_table_ctx.hash_table->template 
iterate_map<JoinOpType>(_build_indexs);
+    auto block_size = _build_indexs.size();
+    int right_col_idx =
+            JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN
+                    ? _parent->left_table_data_types().size()
+                    : 0;
+    int right_col_len = _parent->right_table_data_types().size();
+
+    if (block_size) {
+        for (size_t j = 0; j < right_col_len; ++j) {
+            const auto& column = *_build_block->get_by_position(j).column;
+            mcol[j + right_col_idx]->insert_indices_from_join(
+                    column, _build_indexs.data(), _build_indexs.data() + 
_build_indexs.size());
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
@@ -917,15 +651,10 @@ Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable(
             }
             _tuple_is_null_left_flags->resize_fill(block_size, 1);
         }
-        *eos = iter == hash_table_ctx.hash_table->end();
-        output_block->swap(
-                mutable_block.to_block(right_semi_anti_without_other ? 
right_col_idx : 0));
+        output_block->swap(mutable_block.to_block(0));
         DCHECK(block_size <= _batch_size);
-        return Status::OK();
-    } else {
-        LOG(FATAL) << "Invalid RowRefList";
-        return Status::InvalidArgument("Invalid RowRefList");
     }
+    return Status::OK();
 }
 
 template <int JoinOpType, typename Parent>
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 70eb7dd8a94..9e8c99c8d56 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -98,12 +98,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const 
TPlanNode& tnode, const Descr
     _arena = std::make_shared<Arena>();
     _hash_table_variants = std::make_shared<HashTableVariants>();
     _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
-    _build_blocks.reset(new std::vector<Block>());
-
-    // avoid vector expand change block address.
-    // one block can store 4g data, _build_blocks can store 128*4g data.
-    // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-    _build_blocks->reserve(HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
 }
 
 Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -730,9 +724,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_build_timer);
 
-    // make one block for each 4 gigabytes
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
     if (_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
@@ -745,41 +736,25 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
         if (in_block->rows() != 0) {
             SCOPED_TIMER(_build_side_merge_block_timer);
+            if (_build_side_mutable_block.empty()) {
+                RETURN_IF_ERROR(_build_side_mutable_block.merge(
+                        *(in_block->create_same_struct_block(1, false))));
+            }
             RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
-        }
-
-        if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > 
BUILD_BLOCK_MAX_SIZE)) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+            if (_build_side_mutable_block.rows() > 
std::numeric_limits<uint32_t>::max()) {
+                return Status::NotSupported(
+                        "Hash join do not support build table rows"
+                        " over:" +
+                        std::to_string(std::numeric_limits<uint32_t>::max()));
             }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-
-            // TODO:: Rethink may we should do the process after we receive 
all build blocks ?
-            // which is better.
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
-
-            _build_side_mutable_block = MutableBlock();
-            ++_build_block_idx;
-            _build_side_last_mem_used = _build_side_mem_used;
         }
     }
 
     if (_should_build_hash_table && eos) {
         if (!_build_side_mutable_block.empty()) {
-            if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
-                return Status::NotSupported(strings::Substitute(
-                        "data size of right table in hash join > $0",
-                        BUILD_BLOCK_MAX_SIZE * 
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
-            }
-            _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-            COUNTER_UPDATE(_build_blocks_memory_usage, 
(*_build_blocks)[_build_block_idx].bytes());
-            RETURN_IF_ERROR(_process_build_block(state, 
(*_build_blocks)[_build_block_idx],
-                                                 _build_block_idx));
+            _build_block = 
std::make_shared<Block>(_build_side_mutable_block.to_block());
+            COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+            RETURN_IF_ERROR(_process_build_block(state, *_build_block));
         }
         auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
                                             LOG(FATAL) << "FATAL: uninited 
hash table";
@@ -801,7 +776,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = _arena;
-            _shared_hash_table_context->blocks = _build_blocks;
+            _shared_hash_table_context->block = _build_block;
             _shared_hash_table_context->hash_table_variants = 
_hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
                     _has_null_in_build_side;
@@ -833,7 +808,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                 *_hash_table_variants,
                 *std::static_pointer_cast<HashTableVariants>(
                         _shared_hash_table_context->hash_table_variants));
-        _build_blocks = _shared_hash_table_context->blocks;
+        _build_block = _shared_hash_table_context->block;
 
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
@@ -866,7 +841,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
 
     // Since the comparison of null values is meaningless, null aware left 
anti join should not output null
     // when the build side is not empty.
-    if (!_build_blocks->empty() && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+    if (_build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
         _probe_ignore_null = true;
     }
     _init_short_circuit_for_probe();
@@ -965,7 +940,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block, 
const std::vector<int>&
     }
 }
 
-Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, 
uint8_t offset) {
+Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
     SCOPED_TIMER(_build_table_timer);
     size_t rows = block.rows();
     if (UNLIKELY(rows == 0)) {
@@ -996,28 +971,30 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
     Status st = _extract_join_column<true>(block, null_map_val, raw_ptrs, 
res_col_ids);
 
     st = std::visit(
-            Overload {
-                    [&](std::monostate& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        LOG(FATAL) << "FATAL: uninited hash table";
-                        __builtin_unreachable();
-                        return Status::OK();
-                    },
-                    [&](auto&& arg, auto has_null_value,
-                        auto short_circuit_for_null_in_build_side) -> Status {
-                        using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
-                                hash_table_build_process(rows, block, 
raw_ptrs, this,
-                                                         state->batch_size(), 
offset, state);
-                        return hash_table_build_process
-                                .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
-                                        arg,
-                                        has_null_value || 
short_circuit_for_null_in_build_side
-                                                ? &null_map_val->get_data()
-                                                : nullptr,
-                                        &_has_null_in_build_side);
-                    }},
-            *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
+            Overload {[&](std::monostate& arg, auto join_op, auto 
has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          LOG(FATAL) << "FATAL: uninited hash table";
+                          __builtin_unreachable();
+                          return Status::OK();
+                      },
+                      [&](auto&& arg, auto&& join_op, auto has_null_value,
+                          auto short_circuit_for_null_in_build_side) -> Status 
{
+                          using HashTableCtxType = std::decay_t<decltype(arg)>;
+                          using JoinOpType = std::decay_t<decltype(join_op)>;
+
+                          ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
+                                  hash_table_build_process(rows, block, 
raw_ptrs, this,
+                                                           
state->batch_size(), state);
+                          return hash_table_build_process
+                                  .template run<JoinOpType::value, 
has_null_value,
+                                                
short_circuit_for_null_in_build_side>(
+                                          arg,
+                                          has_null_value || 
short_circuit_for_null_in_build_side
+                                                  ? &null_map_val->get_data()
+                                                  : nullptr,
+                                          &_has_null_in_build_side);
+                      }},
+            *_hash_table_variants, _join_op_variants, 
make_bool_variant(_build_side_ignore_null),
             make_bool_variant(_short_circuit_for_null_in_build_side));
 
     return st;
@@ -1086,7 +1063,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
                     return;
                 }
 
-                if (!try_get_hash_map_context_fixed<PartitionedHashMap, 
HashCRC32, RowRefListType>(
+                if (!try_get_hash_map_context_fixed<JoinFixedHashMap, 
HashCRC32, RowRefListType>(
                             *_hash_table_variants, _build_expr_ctxs)) {
                     
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
                 }
@@ -1094,16 +1071,6 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) 
{
             _join_op_variants, make_bool_variant(_have_other_join_conjunct));
 
     DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
-
-    std::visit(Overload {[&](std::monostate& arg) {
-                             LOG(FATAL) << "FATAL: uninited hash table";
-                             __builtin_unreachable();
-                         },
-                         [&](auto&& arg) {
-                             arg.hash_table->set_partitioned_threshold(
-                                     
state->partitioned_hash_join_rows_threshold());
-                         }},
-               *_hash_table_variants);
 }
 
 void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 06c28de46c6..5633d606d01 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -86,10 +86,10 @@ struct ProcessRuntimeFilterBuild {
         RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
                 state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
 
-        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_rows.empty()) {
+        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_blocks.empty()) {
             {
                 SCOPED_TIMER(parent->_push_compute_timer);
-                parent->_runtime_filter_slots->insert(parent->_inserted_rows);
+                
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
             }
         }
         {
@@ -106,140 +106,52 @@ using ProfileCounter = RuntimeProfile::Counter;
 template <class HashTableContext, typename Parent>
 struct ProcessHashTableBuild {
     ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
-                          Parent* parent, int batch_size, uint8_t offset, 
RuntimeState* state)
+                          Parent* parent, int batch_size, RuntimeState* state)
             : _rows(rows),
-              _skip_rows(0),
               _acquired_block(acquired_block),
               _build_raw_ptrs(build_raw_ptrs),
               _parent(parent),
               _batch_size(batch_size),
-              _offset(offset),
               _state(state),
               
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
 
-    template <bool ignore_null, bool short_circuit_for_null>
+    template <int JoinOpType, bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
-        using KeyGetter = typename HashTableContext::State;
-        using Mapped = typename HashTableContext::Mapped;
-
-        Defer defer {[&]() {
-            int64_t bucket_size = 
hash_table_ctx.hash_table->get_buffer_size_in_cells();
-            int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
-            int64_t bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-            COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
-            COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
-            COUNTER_SET(_parent->_build_collisions_counter,
-                        hash_table_ctx.hash_table->get_collisions());
-            COUNTER_SET(_parent->_build_buckets_fill_counter, 
filled_bucket_size);
-
-            auto hash_table_buckets = 
hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
-            std::string hash_table_buckets_info;
-            for (auto bucket_count : hash_table_buckets) {
-                hash_table_buckets_info += std::to_string(bucket_count) + ", ";
+        if (short_circuit_for_null || ignore_null) {
+            for (uint32_t i = 0; i < _rows; i++) {
+                if ((*null_map)[i]) {
+                    *has_null_key = true;
+                }
             }
-            _parent->add_hash_buckets_info(hash_table_buckets_info);
-
-            auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
-            hash_table_buckets_info.clear();
-            for (auto table_size : hash_table_sizes) {
-                hash_table_buckets_info += std::to_string(table_size) + ", ";
+            if (short_circuit_for_null && *has_null_key) {
+                return Status::OK();
             }
-            _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
-        }};
-
-        KeyGetter key_getter(_build_raw_ptrs);
-
-        SCOPED_TIMER(_parent->_build_table_insert_timer);
-        hash_table_ctx.hash_table->reset_resize_timer();
-
-        // only not build_unique, we need expanse hash table before insert data
-        // 1. There are fewer duplicate keys, reducing the number of resize 
hash tables
-        // can improve performance to a certain extent, about 2%-5%
-        // 2. There are many duplicate keys, and the hash table filled bucket 
is far less than
-        // the hash table build bucket, which may waste a lot of memory.
-        // TODO, use the NDV expansion of the key column in the optimizer 
statistics
-        if (!_parent->build_unique()) {
-            
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
-                    std::min<int>(_rows, 
config::hash_table_pre_expanse_max_rows)));
-        }
-
-        vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
-        bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
-        if (has_runtime_filter) {
-            inserted_rows.reserve(_batch_size);
         }
 
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
-                                            null_map ? null_map->data() : 
nullptr);
-
-        auto& arena = *_parent->arena();
-        auto old_build_arena_memory = arena.size();
-
-        size_t k = 0;
-        bool inserted = false;
-        auto creator = [&](const auto& ctor, auto& key, auto& origin) {
-            HashTableContext::try_presis_key(key, origin, arena);
-            inserted = true;
-            ctor(key, Mapped {k, _offset});
-        };
-
-        bool build_unique = _parent->build_unique();
-#define EMPLACE_IMPL(stmt)                                                    \
-    for (; k < _rows; ++k) {                                                  \
-        if (k % CHECK_FRECUENCY == 0) {                                       \
-            RETURN_IF_CANCELLED(_state);                                      \
-        }                                                                     \
-        if constexpr (short_circuit_for_null) {                               \
-            if ((*null_map)[k]) {                                             \
-                *has_null_key = true;                                         \
-                return Status::OK();                                          \
-            }                                                                 \
-        } else if constexpr (ignore_null) {                                   \
-            if ((*null_map)[k]) {                                             \
-                *has_null_key = true;                                         \
-                continue;                                                     \
-            }                                                                 \
-        }                                                                     \
-        inserted = false;                                                     \
-        [[maybe_unused]] auto& mapped =                                       \
-                hash_table_ctx.lazy_emplace(key_getter, k, creator, nullptr); \
-        stmt;                                                                 \
-    }
-
-        if (has_runtime_filter && build_unique) {
-            EMPLACE_IMPL(
-                    if (inserted) { inserted_rows.push_back(k); } else { 
_skip_rows++; });
-        } else if (has_runtime_filter && !build_unique) {
-            EMPLACE_IMPL(
-                    if (inserted) { inserted_rows.push_back(k); } else {
-                        mapped.insert({k, _offset}, *_parent->arena());
-                        inserted_rows.push_back(k);
-                    });
-        } else if (!has_runtime_filter && build_unique) {
-            EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
-        } else {
-            EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, 
*_parent->arena()); });
+        if (!_parent->runtime_filter_descs().empty()) {
+            _parent->_inserted_blocks.insert(&_acquired_block);
         }
-        _parent->_build_rf_cardinality += inserted_rows.size();
 
-        _parent->_build_arena_memory_usage->add(arena.size() - 
old_build_arena_memory);
+        SCOPED_TIMER(_parent->_build_table_insert_timer);
+        hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, 
_state->batch_size());
 
-        COUNTER_UPDATE(_parent->_build_table_expanse_timer,
-                       hash_table_ctx.hash_table->get_resize_timer_value());
-        COUNTER_UPDATE(_parent->_build_table_convert_timer,
-                       hash_table_ctx.hash_table->get_convert_timer_value());
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
+                                            null_map ? null_map->data() : 
nullptr, true, true,
+                                            
hash_table_ctx.hash_table->get_bucket_size());
+        hash_table_ctx.hash_table->build(hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(),
+                                         _rows);
+        hash_table_ctx.bucket_nums.resize(_state->batch_size());
+        hash_table_ctx.bucket_nums.shrink_to_fit();
 
         return Status::OK();
     }
 
 private:
-    const int _rows;
-    int _skip_rows;
+    const uint32_t _rows;
     Block& _acquired_block;
     ColumnRawPtrs& _build_raw_ptrs;
     Parent* _parent;
     int _batch_size;
-    uint8_t _offset;
     RuntimeState* _state;
 
     ProfileCounter* _build_side_compute_hash_timer;
@@ -325,8 +237,6 @@ using HashTableIteratorVariants =
         std::variant<std::monostate, ForwardIterator<RowRefList>,
                      ForwardIterator<RowRefListWithFlag>, 
ForwardIterator<RowRefListWithFlags>>;
 
-static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
-
 class HashJoinNode final : public VJoinNodeBase {
 public:
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
@@ -369,7 +279,7 @@ public:
     bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }
-    std::shared_ptr<std::vector<Block>> build_blocks() const { return 
_build_blocks; }
+    std::shared_ptr<Block> build_block() const { return _build_block; }
     std::vector<bool>* left_output_slot_flags() { return 
&_left_output_slot_flags; }
     std::vector<bool>* right_output_slot_flags() { return 
&_right_output_slot_flags; }
     bool* has_null_in_build_side() { return &_has_null_in_build_side; }
@@ -390,16 +300,16 @@ private:
         _short_circuit_for_probe =
                 (_has_null_in_build_side && _join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                  !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN 
&& !_is_mark_join) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-                (_build_blocks->empty() && _join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+                (!_build_block && _join_op == TJoinOp::INNER_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN && 
!_is_mark_join) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+                (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
 
         //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         //we could get the result is probe table + null-column(if need output)
         _empty_right_table_need_probe_dispose =
-                (_build_blocks->empty() && !_have_other_join_conjunct && 
!_is_mark_join) &&
+                (!_build_block && !_have_other_join_conjunct && 
!_is_mark_join) &&
                 (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN ||
                  _join_op == TJoinOp::LEFT_ANTI_JOIN);
     }
@@ -467,7 +377,7 @@ private:
     HashTableIteratorVariants _outer_join_pull_visited_iter;
     HashTableIteratorVariants _probe_row_match_iter;
 
-    std::shared_ptr<std::vector<Block>> _build_blocks;
+    std::shared_ptr<Block> _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     ColumnUInt8::MutablePtr _null_map_column;
@@ -501,7 +411,7 @@ private:
 
     Status _materialize_build_side(RuntimeState* state) override;
 
-    Status _process_build_block(RuntimeState* state, Block& block, uint8_t 
offset);
+    Status _process_build_block(RuntimeState* state, Block& block);
 
     Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer, 
std::vector<int>& res_col_ids);
@@ -539,7 +449,7 @@ private:
     friend struct ProcessRuntimeFilterBuild;
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+    std::unordered_set<const Block*> _inserted_blocks;
 
     std::vector<IRuntimeFilter*> _runtime_filters;
     size_t _build_rf_cardinality = 0;
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 791e92679d0..80990830e53 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -60,7 +60,6 @@ 
VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan
         : ExecNode(pool, tnode, descs),
           _valid_element_in_hash_tbl(0),
           _mem_used(0),
-          _build_block_index(0),
           _build_finished(false) {
     _hash_table_variants = std::make_unique<HashTableVariants>();
 }
@@ -221,7 +220,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
         }
         return;
     }
-    if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32, 
RowRefListWithFlags>(
+    if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32, 
RowRefListWithFlags>(
                 *_hash_table_variants, _child_expr_lists[0])) {
         
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
     }
@@ -230,36 +229,46 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block* 
block, bool eos) {
     SCOPED_TIMER(_exec_timer);
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
     if (block->rows() != 0) {
         _mem_used += block->allocated_bytes();
         RETURN_IF_ERROR(_mutable_block.merge(*block));
     }
 
-    if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
-        _build_blocks.emplace_back(_mutable_block.to_block());
-        RETURN_IF_ERROR(
-                process_build_block(_build_blocks[_build_block_index], 
_build_block_index, state));
+    if (block->rows() != 0) {
+        if (_build_block.empty()) {
+            
RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0, 
false))));
+        }
+        RETURN_IF_ERROR(_mutable_block.merge(*block));
+        if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
+            return Status::NotSupported(
+                    "Hash join do not support build table rows"
+                    " over:" +
+                    std::to_string(std::numeric_limits<uint32_t>::max()));
+        }
+    }
+
+    if (eos) {
+        if (!_mutable_block.empty()) {
+            _build_block = _mutable_block.to_block();
+        }
+        RETURN_IF_ERROR(process_build_block(_build_block, state));
         _mutable_block.clear();
-        ++_build_block_index;
 
-        if (eos) {
-            if constexpr (is_intersect) {
-                _valid_element_in_hash_tbl = 0;
-            } else {
-                std::visit(
-                        [&](auto&& arg) {
-                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                            if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
-                                _valid_element_in_hash_tbl = 
arg.hash_table->size();
-                            }
-                        },
-                        *_hash_table_variants);
-            }
-            _build_finished = true;
-            _can_read = _children.size() == 1;
+        if constexpr (is_intersect) {
+            _valid_element_in_hash_tbl = 0;
+        } else {
+            std::visit(
+                    [&](auto&& arg) {
+                        using HashTableCtxType = std::decay_t<decltype(arg)>;
+                        if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            _valid_element_in_hash_tbl = 
arg.hash_table->size();
+                        }
+                    },
+                    *_hash_table_variants);
         }
+        _build_finished = true;
+        _can_read = _children.size() == 1;
     }
     return Status::OK();
 }
@@ -312,8 +321,7 @@ Status 
VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
 }
 
 template <bool is_intersect>
-Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
uint8_t offset,
-                                                            RuntimeState* 
state) {
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block, 
RuntimeState* state) {
     size_t rows = block.rows();
     if (rows == 0) {
         return Status::OK();
@@ -328,7 +336,7 @@ Status 
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
                 using HashTableCtxType = std::decay_t<decltype(arg)>;
                 if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
                     HashTableBuild<HashTableCtxType, is_intersect> 
hash_table_build_process(
-                            this, rows, raw_ptrs, offset, state);
+                            this, rows, raw_ptrs, state);
                     st = hash_table_build_process(arg, _arena);
                 } else {
                     LOG(FATAL) << "FATAL: uninited hash table";
@@ -344,8 +352,8 @@ void 
VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
                                                          int& block_size) {
     auto it = value.begin();
     for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end(); 
++idx) {
-        auto& column = 
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
-        if (_mutable_cols[idx->second]->is_nullable() xor 
column.is_nullable()) {
+        const auto& column = *_build_block.get_by_position(idx->first).column;
+        if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) {
             DCHECK(_mutable_cols[idx->second]->is_nullable());
             ((ColumnNullable*)(_mutable_cols[idx->second].get()))
                     ->insert_from_not_nullable(column, it->row_num);
@@ -514,10 +522,6 @@ void VSetOperationNode<is_intersect>::debug_string(int 
indentation_level,
 template <bool is_intersect>
 void VSetOperationNode<is_intersect>::release_mem() {
     _hash_table_variants = nullptr;
-
-    std::vector<Block> tmp_build_blocks;
-    _build_blocks.swap(tmp_build_blocks);
-
     _probe_block.clear();
 }
 
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index ff016469f49..8ca04f2f71f 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -82,7 +82,7 @@ private:
     //It's time to abstract out the same methods and provide them directly to 
others;
     void hash_table_init();
     Status hash_table_build(RuntimeState* state);
-    Status process_build_block(Block& block, uint8_t offset, RuntimeState* 
state);
+    Status process_build_block(Block& block, RuntimeState* state);
     Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
     Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int 
child_id);
     void refresh_hash_table();
@@ -115,11 +115,10 @@ private:
     //record insert column id during probe
     std::vector<uint16_t> _probe_column_inserted_id;
 
-    std::vector<Block> _build_blocks;
+    Block _build_block;
     Block _probe_block;
     ColumnRawPtrs _probe_columns;
     std::vector<MutableColumnPtr> _mutable_cols;
-    int _build_block_index;
     bool _build_finished;
     std::vector<bool> _probe_finished_children_index;
     MutableBlock _mutable_block;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 6b31cf07ec9..e1c01709042 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -53,18 +53,15 @@ struct SharedRuntimeFilterContext {
 
 struct SharedHashTableContext {
     SharedHashTableContext()
-            : hash_table_variants(nullptr),
-              blocks(new std::vector<vectorized::Block>()),
-              signaled(false),
-              short_circuit_for_null_in_probe_side(false) {}
+            : hash_table_variants(nullptr), 
block(std::make_shared<vectorized::Block>()) {}
 
     Status status;
     std::shared_ptr<Arena> arena;
     std::shared_ptr<void> hash_table_variants;
-    std::shared_ptr<std::vector<Block>> blocks;
+    std::shared_ptr<Block> block;
     std::map<int, SharedRuntimeFilterContext> runtime_filters;
-    bool signaled;
-    bool short_circuit_for_null_in_probe_side;
+    bool signaled {};
+    bool short_circuit_for_null_in_probe_side {};
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to