This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 37dbda6209c [pipelineX](refactor) Use class template to simplify join 
(#25369)
37dbda6209c is described below

commit 37dbda6209c82b3e99efd08eeb73d4e072d8c437
Author: Gabriel <[email protected]>
AuthorDate: Fri Oct 13 16:51:55 2023 +0800

    [pipelineX](refactor) Use class template to simplify join (#25369)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  67 ++--
 be/src/pipeline/exec/hashjoin_build_sink.h         |  29 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  67 +++-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  43 ++-
 .../pipeline/exec/partition_sort_sink_operator.cpp |   2 +-
 .../exec/partition_sort_source_operator.cpp        |  23 +-
 .../pipeline/exec/partition_sort_source_operator.h |   1 +
 be/src/pipeline/pipeline_x/dependency.h            |  21 +-
 be/src/vec/exec/join/process_hash_table_probe.h    |  20 +-
 .../vec/exec/join/process_hash_table_probe_impl.h  | 353 ++++++++++-----------
 be/src/vec/exec/join/vhash_join_node.cpp           | 134 +-------
 be/src/vec/exec/join/vhash_join_node.h             | 216 ++++---------
 be/src/vec/exec/join/vjoin_node_base.h             |   2 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |   1 +
 14 files changed, 444 insertions(+), 535 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c9fe153af08..939363776b0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -52,11 +52,15 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
     _shared_state->probe_key_sz = p._build_key_sz;
-    _shared_state->build_blocks.reset(new std::vector<vectorized::Block>());
-    // avoid vector expand change block address.
-    // one block can store 4g data, _build_blocks can store 128*4g data.
-    // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
-    
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+    if (p._is_broadcast_join && 
state->enable_share_hash_table_for_broadcast_join()) {
+        _shared_state->build_blocks = p._shared_hash_table_context->blocks;
+    } else {
+        _shared_state->build_blocks.reset(new 
std::vector<vectorized::Block>());
+        // avoid vector expand change block address.
+        // one block can store 4g data, _build_blocks can store 128*4g data.
+        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
+        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+    }
     _shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
     _shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
     _build_expr_ctxs.resize(p._build_expr_ctxs.size());
@@ -79,6 +83,11 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     if (!_should_build_hash_table) {
         _shared_hash_table_dependency->block_writing();
         p._shared_hashtable_controller->append_dependency(p.id(), 
_shared_hash_table_dependency);
+    } else if (p._is_broadcast_join) {
+        // avoid vector expand change block address.
+        // one block can store 4g data, _build_blocks can store 128*4g data.
+        // if probe data bigger than 512g, runtime filter maybe will core dump 
when insert data.
+        
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
     }
 
     _memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -135,6 +144,18 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+vectorized::Sizes& HashJoinBuildSinkLocalState::build_key_sz() {
+    return _parent->cast<HashJoinBuildSinkOperatorX>()._build_key_sz;
+}
+
+bool HashJoinBuildSinkLocalState::build_unique() const {
+    return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
+}
+
+std::vector<TRuntimeFilterDesc>& 
HashJoinBuildSinkLocalState::runtime_filter_descs() const {
+    return _parent->cast<HashJoinBuildSinkOperatorX>()._runtime_filter_descs;
+}
+
 void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->short_circuit_for_probe =
@@ -201,9 +222,9 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                     [&](auto&& arg, auto has_null_value,
                         auto short_circuit_for_null_in_build_side) -> Status {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        vectorized::HashJoinBuildContext context(this);
-                        vectorized::ProcessHashTableBuild<HashTableCtxType>
-                                hash_table_build_process(rows, block, 
raw_ptrs, &context,
+                        vectorized::ProcessHashTableBuild<HashTableCtxType,
+                                                          
HashJoinBuildSinkLocalState>
+                                hash_table_build_process(rows, block, 
raw_ptrs, this,
                                                          state->batch_size(), 
offset, state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
@@ -246,11 +267,6 @@ void 
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
                                                    JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
                                                    JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN,
                                            vectorized::RowRefListWithFlag, 
vectorized::RowRefList>>;
-                _shared_state->probe_row_match_iter
-                        
.emplace<vectorized::ForwardIterator<RowRefListType>>();
-                _shared_state->outer_join_pull_visited_iter
-                        
.emplace<vectorized::ForwardIterator<RowRefListType>>();
-
                 if (_build_expr_ctxs.size() == 1 && 
!p._store_null_in_hash_table[0]) {
                     // Single column optimization
                     switch (_build_expr_ctxs[0]->root()->result_type()) {
@@ -519,18 +535,16 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                     state, 
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
                     local_state._build_block_idx));
         }
-        auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
-                                            LOG(FATAL) << "FATAL: uninited 
hash table";
-                                            __builtin_unreachable();
-                                        },
-                                        [&](auto&& arg) -> Status {
-                                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                                            vectorized::RuntimeFilterContext 
context(&local_state);
-                                            
vectorized::ProcessRuntimeFilterBuild<HashTableCtxType>
-                                                    
runtime_filter_build_process(&context);
-                                            return 
runtime_filter_build_process(state, arg);
-                                        }},
-                              *local_state._shared_state->hash_table_variants);
+        auto ret = std::visit(
+                Overload {[&](std::monostate&) -> Status {
+                              LOG(FATAL) << "FATAL: uninited hash table";
+                              __builtin_unreachable();
+                          },
+                          [&](auto&& arg) -> Status {
+                              vectorized::ProcessRuntimeFilterBuild 
runtime_filter_build_process;
+                              return runtime_filter_build_process(state, arg, 
&local_state);
+                          }},
+                *local_state._shared_state->hash_table_variants);
         if (!ret.ok()) {
             if (_shared_hashtable_controller) {
                 _shared_hash_table_context->status = ret;
@@ -542,7 +556,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             _shared_hash_table_context->status = Status::OK();
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = 
local_state._shared_state->arena;
-            _shared_hash_table_context->blocks = 
local_state._shared_state->build_blocks;
             _shared_hash_table_context->hash_table_variants =
                     local_state._shared_state->hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
@@ -578,8 +591,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 *std::static_pointer_cast<vectorized::HashTableVariants>(
                         _shared_hash_table_context->hash_table_variants));
 
-        local_state._shared_state->build_blocks = 
_shared_hash_table_context->blocks;
-
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
                     Overload {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 54848f7c868..459b66718c2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -70,12 +70,25 @@ public:
     void init_short_circuit_for_probe();
     HashJoinBuildSinkOperatorX* join_build() { return 
(HashJoinBuildSinkOperatorX*)_parent; }
 
+    vectorized::Sizes& build_key_sz();
+    bool build_unique() const;
+    std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
+    std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
+
+    void add_hash_buckets_info(const std::string& info) const {
+        _profile->add_info_string("HashTableBuckets", info);
+    }
+    void add_hash_buckets_filled_info(const std::string& info) const {
+        _profile->add_info_string("HashTableFilledBuckets", info);
+    }
+
 protected:
     void _hash_table_init(RuntimeState* state);
     void _set_build_ignore_flag(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
     friend class HashJoinBuildSinkOperatorX;
-    friend struct vectorized::HashJoinBuildContext;
-    friend struct vectorized::RuntimeFilterContext;
+    template <class HashTableContext, typename Parent>
+    friend struct vectorized::ProcessHashTableBuild;
+    friend struct vectorized::ProcessRuntimeFilterBuild;
 
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -139,16 +152,13 @@ public:
     }
 
     bool should_dry_run(RuntimeState* state) override {
-        auto tmp = _is_broadcast_join && !state->get_sink_local_state(id())
-                                                  
->cast<HashJoinBuildSinkLocalState>()
-                                                  ._should_build_hash_table;
-        return tmp;
+        return _is_broadcast_join && !state->get_sink_local_state(id())
+                                              
->cast<HashJoinBuildSinkLocalState>()
+                                              ._should_build_hash_table;
     }
 
 private:
     friend class HashJoinBuildSinkLocalState;
-    friend struct vectorized::HashJoinBuildContext;
-    friend struct vectorized::RuntimeFilterContext;
 
     // build expr
     vectorized::VExprContextSPtrs _build_expr_ctxs;
@@ -165,9 +175,6 @@ private:
 
     vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-
-    std::atomic_bool _probe_open_finish = false;
-    bool _probe_ignore_null = false;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 1688b17778e..c4b009f7389 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -59,12 +59,68 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info)
     return Status::OK();
 }
 
+Status HashJoinProbeLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(profile()->total_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(JoinProbeLocalState::open(state));
+
+    _process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
+    auto& p = _parent->cast<HashJoinProbeOperatorX>();
+    std::visit(
+            [&](auto&& join_op_variants, auto have_other_join_conjunct) {
+                using JoinOpType = std::decay_t<decltype(join_op_variants)>;
+                using RowRefListType = std::conditional_t<
+                        have_other_join_conjunct, 
vectorized::RowRefListWithFlags,
+                        std::conditional_t<JoinOpType::value == 
TJoinOp::RIGHT_ANTI_JOIN ||
+                                                   JoinOpType::value == 
TJoinOp::RIGHT_SEMI_JOIN ||
+                                                   JoinOpType::value == 
TJoinOp::RIGHT_OUTER_JOIN ||
+                                                   JoinOpType::value == 
TJoinOp::FULL_OUTER_JOIN,
+                                           vectorized::RowRefListWithFlag, 
vectorized::RowRefList>>;
+                
_probe_row_match_iter.emplace<vectorized::ForwardIterator<RowRefListType>>();
+                _outer_join_pull_visited_iter
+                        
.emplace<vectorized::ForwardIterator<RowRefListType>>();
+                
_process_hashtable_ctx_variants->emplace<vectorized::ProcessHashTableProbe<
+                        JoinOpType::value, HashJoinProbeLocalState>>(this, 
state->batch_size());
+            },
+            _shared_state->join_op_variants,
+            vectorized::make_bool_variant(p._have_other_join_conjunct));
+    return Status::OK();
+}
+
 void HashJoinProbeLocalState::prepare_for_next() {
     _probe_index = 0;
     _ready_probe = false;
     _prepare_probe_block();
 }
 
+bool HashJoinProbeLocalState::have_other_join_conjunct() const {
+    return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct;
+}
+
+bool HashJoinProbeLocalState::is_right_semi_anti() const {
+    return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti;
+}
+
+bool HashJoinProbeLocalState::is_outer_join() const {
+    return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join;
+}
+
+std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() {
+    return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags;
+}
+
+std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() {
+    return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags;
+}
+
+vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() {
+    return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types;
+}
+
+vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() {
+    return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types;
+}
+
 Status HashJoinProbeLocalState::close(RuntimeState* state) {
     SCOPED_TIMER(profile()->total_time_counter());
     SCOPED_TIMER(_close_timer);
@@ -111,16 +167,7 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState* 
state) {
     if (_probe_inited) {
         return;
     }
-    _process_hashtable_ctx_variants = 
std::make_unique<vectorized::HashTableCtxVariants>();
-    std::visit(
-            [&](auto&& join_op_variants) {
-                using JoinOpType = std::decay_t<decltype(join_op_variants)>;
-                _probe_context.reset(new 
vectorized::HashJoinProbeContext(this));
-                _process_hashtable_ctx_variants
-                        
->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>(
-                                _probe_context.get(), state->batch_size());
-            },
-            _shared_state->join_op_variants);
+
     _probe_inited = true;
 }
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 4d96b68b7d4..ed241141e5d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -45,6 +45,22 @@ public:
     Status open(RuntimeState*) override { return Status::OK(); }
 };
 
+class HashJoinProbeLocalState;
+
+using HashTableCtxVariants = std::variant<
+        std::monostate,
+        vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN, 
HashJoinProbeLocalState>,
+        vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
+                                          HashJoinProbeLocalState>>;
+
 class HashJoinProbeOperatorX;
 class HashJoinProbeLocalState final
         : public JoinProbeLocalState<HashJoinDependency, 
HashJoinProbeLocalState> {
@@ -55,6 +71,7 @@ public:
     ~HashJoinProbeLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
     void prepare_for_next();
@@ -64,13 +81,25 @@ public:
                                         SourceState& source_state, 
vectorized::Block* temp_block,
                                         bool check_rows_count = true);
 
-    HashJoinProbeOperatorX* join_probe() { return 
(HashJoinProbeOperatorX*)_parent; }
+    bool have_other_join_conjunct() const;
+    bool is_right_semi_anti() const;
+    bool is_outer_join() const;
+    std::vector<bool>* left_output_slot_flags();
+    std::vector<bool>* right_output_slot_flags();
+    vectorized::DataTypes right_table_data_types();
+    vectorized::DataTypes left_table_data_types();
+    bool* has_null_in_build_side() { return 
&_shared_state->_has_null_in_build_side; }
+    std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
+        return _shared_state->build_blocks;
+    }
+    vectorized::Sizes probe_key_sz() const { return 
_shared_state->probe_key_sz; }
 
 private:
     void _prepare_probe_block();
     bool _need_probe_null_map(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
     friend class HashJoinProbeOperatorX;
-    friend struct vectorized::HashJoinProbeContext;
+    template <int JoinOpType, typename Parent>
+    friend struct vectorized::ProcessHashTableProbe;
 
     int _probe_index = -1;
     bool _ready_probe = false;
@@ -88,12 +117,15 @@ private:
 
     bool _need_null_map_for_probe = false;
     bool _has_set_need_null_map_for_probe = false;
-    std::unique_ptr<vectorized::HashJoinProbeContext> _probe_context;
     vectorized::ColumnUInt8::MutablePtr _null_map_column;
     // for cases when a probe row matches more than batch size build rows.
     bool _is_any_probe_match_row_output = false;
-    std::unique_ptr<vectorized::HashTableCtxVariants> 
_process_hashtable_ctx_variants =
-            std::make_unique<vectorized::HashTableCtxVariants>();
+    std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants =
+            std::make_unique<HashTableCtxVariants>();
+
+    // for full/right outer join
+    vectorized::HashTableIteratorVariants _outer_join_pull_visited_iter;
+    vectorized::HashTableIteratorVariants _probe_row_match_iter;
 
     RuntimeProfile::Counter* _probe_expr_call_timer;
     RuntimeProfile::Counter* _probe_next_timer;
@@ -125,7 +157,6 @@ private:
                         RuntimeProfile::Counter& expr_call_timer,
                         std::vector<int>& res_col_ids) const;
     friend class HashJoinProbeLocalState;
-    friend struct vectorized::HashJoinProbeContext;
 
     // other expr
     vectorized::VExprContextSPtrs _other_join_conjuncts;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 907114829ee..4bd66be9dbf 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -152,7 +152,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
 
         COUNTER_SET(local_state._hash_table_size_counter, 
int64_t(local_state._num_partition));
         //so all data from child have sink completed
-        local_state._dependency->set_ready_for_read();
+        local_state._dependency->set_eos();
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index a80d3277006..f9003c65268 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -52,8 +52,8 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
             //if buffer have no data, block reading and wait for signal again
-            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
-                                                                   
output_block->columns()));
+            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
+                    local_state._conjuncts, output_block, 
output_block->columns()));
             if (local_state._shared_state->blocks_buffer.empty()) {
                 local_state._dependency->block_reading();
             }
@@ -67,13 +67,12 @@ Status 
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
     // if we move the _blocks_buffer output at last(behind 286 line),
     // it's maybe eos but not output all data: when _blocks_buffer.empty() and 
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
     RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, 
output_block,
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
                                                            
output_block->columns()));
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
         if (local_state._shared_state->blocks_buffer.empty() &&
-            local_state._shared_state->sort_idx >=
-                    local_state._shared_state->partition_sorts.size()) {
+            local_state._sort_idx >= 
local_state._shared_state->partition_sorts.size()) {
             source_state = SourceState::FINISHED;
         }
     }
@@ -91,20 +90,18 @@ Status 
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
     SCOPED_TIMER(local_state._get_sorted_timer);
     //sorter output data one by one
     bool current_eos = false;
-    if (local_state._shared_state->sort_idx < 
local_state._shared_state->partition_sorts.size()) {
-        RETURN_IF_ERROR(
-                
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
-                        ->get_next(state, output_block, &current_eos));
+    if (local_state._sort_idx < 
local_state._shared_state->partition_sorts.size()) {
+        
RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next(
+                state, output_block, &current_eos));
     }
     if (current_eos) {
         //current sort have eos, so get next idx
         local_state._shared_state->previous_row->reset();
-        auto rows = 
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
+        auto rows = 
local_state._shared_state->partition_sorts[local_state._sort_idx]
                             ->get_output_rows();
         local_state._num_rows_returned += rows;
-        
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset(
-                nullptr);
-        local_state._shared_state->sort_idx++;
+        
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
+        local_state._sort_idx++;
     }
 
     return Status::OK();
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h 
b/be/src/pipeline/exec/partition_sort_source_operator.h
index f7d950838c6..23393a870a7 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.h
+++ b/be/src/pipeline/exec/partition_sort_source_operator.h
@@ -68,6 +68,7 @@ private:
     RuntimeProfile::Counter* _get_sorted_timer;
     RuntimeProfile::Counter* _get_next_timer;
     int64_t _num_rows_returned;
+    int _sort_idx = 0;
 };
 
 class PartitionSortSourceOperatorX final : public 
OperatorX<PartitionSortSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index c5abc5142cc..c1ed4074772 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -565,14 +565,10 @@ struct HashJoinSharedState : public JoinSharedState {
     // maybe share hash table with other fragment instances
     std::shared_ptr<vectorized::HashTableVariants> hash_table_variants =
             std::make_shared<vectorized::HashTableVariants>();
-    // for full/right outer join
-    vectorized::HashTableIteratorVariants outer_join_pull_visited_iter;
-    vectorized::HashTableIteratorVariants probe_row_match_iter;
     vectorized::Sizes probe_key_sz;
     const std::vector<TupleDescriptor*> build_side_child_desc;
     size_t build_exprs_size = 0;
-    std::shared_ptr<std::vector<vectorized::Block>> build_blocks =
-            std::make_shared<std::vector<vectorized::Block>>();
+    std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
     bool probe_ignore_null = false;
 };
 
@@ -626,20 +622,31 @@ public:
     std::mutex buffer_mutex;
     std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
     std::unique_ptr<vectorized::SortCursorCmp> previous_row = nullptr;
-    int sort_idx = 0;
 };
 
 class PartitionSortDependency final : public WriteDependency {
 public:
     using SharedState = PartitionSortNodeSharedState;
-    PartitionSortDependency(int id) : WriteDependency(id, 
"PartitionSortDependency") {}
+    PartitionSortDependency(int id) : WriteDependency(id, 
"PartitionSortDependency"), _eos(false) {}
     ~PartitionSortDependency() override = default;
     void* shared_state() override { return (void*)&_partition_sort_state; };
     void set_ready_for_write() override {}
     void block_writing() override {}
 
+    [[nodiscard]] Dependency* read_blocked_by() override {
+        if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
+            _read_dependency_watcher.elapsed_time() > 
SLOW_DEPENDENCY_THRESHOLD) {
+            LOG(WARNING) << "========Dependency may be blocked by some 
reasons: " << name() << " "
+                         << id();
+        }
+        return _ready_for_read || _eos ? nullptr : this;
+    }
+
+    void set_eos() { _eos = true; }
+
 private:
     PartitionSortNodeSharedState _partition_sort_state;
+    std::atomic<bool> _eos;
 };
 
 class AsyncWriterDependency final : public WriteDependency {
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h 
b/be/src/vec/exec/join/process_hash_table_probe.h
index bf6a68f690a..e20e7c83189 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -37,9 +37,9 @@ using MutableColumns = std::vector<MutableColumnPtr>;
 using NullMap = ColumnUInt8::Container;
 using ConstNullMapPtr = const NullMap*;
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 struct ProcessHashTableProbe {
-    ProcessHashTableProbe(HashJoinProbeContext* join_context, int batch_size);
+    ProcessHashTableProbe(Parent* parent, int batch_size);
     ~ProcessHashTableProbe() = default;
 
     // output build side result column
@@ -78,8 +78,9 @@ struct ProcessHashTableProbe {
     void _emplace_element(int8_t block_offset, int32_t block_row, int& 
current_offset);
 
     template <typename HashTableType>
-    HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, 
size_t probe_rows,
-                                          bool with_other_join_conjuncts, 
const uint8_t* null_map);
+    typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
+                                                   bool 
with_other_join_conjuncts,
+                                                   const uint8_t* null_map);
 
     template <typename Mapped, bool with_other_join_conjuncts>
     ForwardIterator<Mapped>& _probe_row_match(int& current_offset, int& 
probe_index,
@@ -91,9 +92,9 @@ struct ProcessHashTableProbe {
     Status process_data_in_hashtable(HashTableType& hash_table_ctx, 
MutableBlock& mutable_block,
                                      Block* output_block, bool* eos);
 
-    vectorized::HashJoinProbeContext* _join_context;
+    Parent* _parent;
     const int _batch_size;
-    const std::vector<Block>& _build_blocks;
+    std::shared_ptr<std::vector<Block>> _build_blocks;
     std::unique_ptr<Arena> _arena;
     std::vector<StringRef> _probe_keys;
 
@@ -118,6 +119,13 @@ struct ProcessHashTableProbe {
     int _right_col_len;
     int _row_count_from_last_probe;
 
+    bool _have_other_join_conjunct;
+    bool _is_right_semi_anti;
+    Sizes _probe_key_sz;
+    std::vector<bool>* _left_output_slot_flags;
+    std::vector<bool>* _right_output_slot_flags;
+    bool* _has_null_in_build_side;
+
     RuntimeProfile::Counter* _rows_returned_counter;
     RuntimeProfile::Counter* _search_hashtable_timer;
     RuntimeProfile::Counter* _build_side_output_timer;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 54ca59b6eae..16ef76d02c5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "common/status.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
 #include "process_hash_table_probe.h"
 #include "runtime/thread_context.h" // IWYU pragma: keep
 #include "util/simd/bits.h"
@@ -27,32 +28,35 @@
 
 namespace doris::vectorized {
 
-template <int JoinOpType>
-ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeContext* 
join_context,
-                                                         int batch_size)
-        : _join_context(join_context),
+template <int JoinOpType, typename Parent>
+ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent* 
parent, int batch_size)
+        : _parent(parent),
           _batch_size(batch_size),
-          _build_blocks(*join_context->_build_blocks),
-          _tuple_is_null_left_flags(
-                  join_context->_is_outer_join
-                          ? &(reinterpret_cast<ColumnUInt8&>(
-                                      
**join_context->_tuple_is_null_left_flag_column)
-                                      .get_data())
-                          : nullptr),
-          _tuple_is_null_right_flags(
-                  join_context->_is_outer_join
-                          ? &(reinterpret_cast<ColumnUInt8&>(
-                                      
**join_context->_tuple_is_null_right_flag_column)
-                                      .get_data())
-                          : nullptr),
-          _rows_returned_counter(join_context->_rows_returned_counter),
-          _search_hashtable_timer(join_context->_search_hashtable_timer),
-          _build_side_output_timer(join_context->_build_side_output_timer),
-          _probe_side_output_timer(join_context->_probe_side_output_timer),
-          
_probe_process_hashtable_timer(join_context->_probe_process_hashtable_timer) {}
-
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
+          _build_blocks(parent->build_blocks()),
+          _tuple_is_null_left_flags(parent->is_outer_join()
+                                            ? &(reinterpret_cast<ColumnUInt8&>(
+                                                        
*parent->_tuple_is_null_left_flag_column)
+                                                        .get_data())
+                                            : nullptr),
+          _tuple_is_null_right_flags(parent->is_outer_join()
+                                             ? 
&(reinterpret_cast<ColumnUInt8&>(
+                                                         
*parent->_tuple_is_null_right_flag_column)
+                                                         .get_data())
+                                             : nullptr),
+          _have_other_join_conjunct(parent->have_other_join_conjunct()),
+          _is_right_semi_anti(parent->is_right_semi_anti()),
+          _probe_key_sz(parent->probe_key_sz()),
+          _left_output_slot_flags(parent->left_output_slot_flags()),
+          _right_output_slot_flags(parent->right_output_slot_flags()),
+          _has_null_in_build_side(parent->has_null_in_build_side()),
+          _rows_returned_counter(parent->_rows_returned_counter),
+          _search_hashtable_timer(parent->_search_hashtable_timer),
+          _build_side_output_timer(parent->_build_side_output_timer),
+          _probe_side_output_timer(parent->_probe_side_output_timer),
+          
_probe_process_hashtable_timer(parent->_probe_process_hashtable_timer) {}
+
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::build_side_output_column(
         MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int 
size,
         bool have_other_join_conjunct) {
     SCOPED_TIMER(_build_side_output_timer);
@@ -66,9 +70,9 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
             JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == 
TJoinOp::FULL_OUTER_JOIN;
 
     if (!is_semi_anti_join || have_other_join_conjunct) {
-        if (_build_blocks.size() == 1) {
+        if (_build_blocks->size() == 1) {
             for (int i = 0; i < _right_col_len; i++) {
-                auto& column = *_build_blocks[0].get_by_position(i).column;
+                auto& column = *(*_build_blocks)[0].get_by_position(i).column;
                 if (output_slot_flags[i]) {
                     mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_block_rows.data(),
                                                                   
_build_block_rows.data() + size);
@@ -86,7 +90,7 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
                                 assert_cast<ColumnNullable*>(mcol[i + 
_right_col_idx].get())
                                         ->insert_default();
                             } else {
-                                auto& column = 
*_build_blocks[_build_block_offsets[j]]
+                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
                                                         .get_by_position(i)
                                                         .column;
                                 mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
@@ -101,7 +105,7 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
                                 // just insert default value
                                 mcol[i + _right_col_idx]->insert_default();
                             } else {
-                                auto& column = 
*_build_blocks[_build_block_offsets[j]]
+                                auto& column = 
*(*_build_blocks)[_build_block_offsets[j]]
                                                         .get_by_position(i)
                                                         .column;
                                 mcol[i + _right_col_idx]->insert_from(column, 
_build_block_rows[j]);
@@ -125,13 +129,13 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
     }
 }
 
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::probe_side_output_column(
         MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int 
size,
         int last_probe_index, size_t probe_size, bool all_match_one,
         bool have_other_join_conjunct) {
     SCOPED_TIMER(_probe_side_output_timer);
-    auto& probe_block = *_join_context->_probe_block;
+    auto& probe_block = _parent->_probe_block;
     for (int i = 0; i < output_slot_flags.size(); ++i) {
         if (output_slot_flags[i]) {
             auto& column = probe_block.get_by_position(i).column;
@@ -152,15 +156,15 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
     }
 }
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 template <typename HashTableType>
-HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_side(
+typename HashTableType::State ProcessHashTableProbe<JoinOpType, 
Parent>::_init_probe_side(
         HashTableType& hash_table_ctx, size_t probe_rows, bool 
with_other_join_conjuncts,
         const uint8_t* null_map) {
-    _right_col_idx = _join_context->_is_right_semi_anti && 
!with_other_join_conjuncts
+    _right_col_idx = _is_right_semi_anti && !with_other_join_conjuncts
                              ? 0
-                             : _join_context->_left_table_data_types->size();
-    _right_col_len = _join_context->_right_table_data_types->size();
+                             : _parent->left_table_data_types().size();
+    _right_col_len = _parent->right_table_data_types().size();
     _row_count_from_last_probe = 0;
 
     _build_block_rows.clear();
@@ -177,24 +181,20 @@ HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
     _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
     _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
 
-    if (!*_join_context->_ready_probe) {
-        *_join_context->_ready_probe = true;
+    if (!_parent->_ready_probe) {
+        _parent->_ready_probe = true;
         hash_table_ctx.reset();
-        hash_table_ctx.init_serialized_keys(*_join_context->_probe_columns,
-                                            _join_context->_probe_key_sz, 
probe_rows, null_map);
+        hash_table_ctx.init_serialized_keys(_parent->_probe_columns, 
_probe_key_sz, probe_rows,
+                                            null_map);
     }
-    return typename HashTableType::State(*_join_context->_probe_columns,
-                                         _join_context->_probe_key_sz);
+    return typename HashTableType::State(_parent->_probe_columns, 
_probe_key_sz);
 }
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 template <typename Mapped, bool with_other_join_conjuncts>
-ForwardIterator<Mapped>& 
ProcessHashTableProbe<JoinOpType>::_probe_row_match(int& current_offset,
-                                                                             
int& probe_index,
-                                                                             
size_t& probe_size,
-                                                                             
bool& all_match_one) {
-    auto& probe_row_match_iter =
-            
std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter);
+ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType, 
Parent>::_probe_row_match(
+        int& current_offset, int& probe_index, size_t& probe_size, bool& 
all_match_one) {
+    auto& probe_row_match_iter = 
std::get<ForwardIterator<Mapped>>(_parent->_probe_row_match_iter);
     if (!probe_row_match_iter.ok()) {
         return probe_row_match_iter;
     }
@@ -219,22 +219,24 @@ ForwardIterator<Mapped>& 
ProcessHashTableProbe<JoinOpType>::_probe_row_match(int
     return probe_row_match_iter;
 }
 
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::_emplace_element(int8_t block_offset, 
int32_t block_row,
-                                                         int& current_offset) {
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t 
block_offset,
+                                                                 int32_t 
block_row,
+                                                                 int& 
current_offset) {
     _build_block_offsets.emplace_back(block_offset);
     _build_block_rows.emplace_back(block_row);
     current_offset++;
 }
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType,
           bool with_other_conjuncts, bool is_mark_join>
-Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& 
hash_table_ctx,
-                                                     ConstNullMapPtr null_map,
-                                                     MutableBlock& 
mutable_block,
-                                                     Block* output_block, 
size_t probe_rows) {
-    auto& probe_index = *_join_context->_probe_index;
+Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& 
hash_table_ctx,
+                                                             ConstNullMapPtr 
null_map,
+                                                             MutableBlock& 
mutable_block,
+                                                             Block* 
output_block,
+                                                             size_t 
probe_rows) {
+    auto& probe_index = _parent->_probe_index;
 
     using KeyGetter = typename HashTableType::State;
     using Mapped = typename HashTableType::Mapped;
@@ -318,9 +320,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                         (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ 
find_result.is_found();
                 if constexpr (is_mark_join) {
                     ++current_offset;
-                    bool null_result =
-                            (need_null_map_for_probe && 
(*null_map)[probe_index]) ||
-                            (!need_go_ahead && 
_join_context->_has_null_value_in_build_side);
+                    bool null_result = (need_null_map_for_probe && 
(*null_map)[probe_index]) ||
+                                       (!need_go_ahead && 
*_has_null_in_build_side);
                     if (null_result) {
                         mark_column->insert_null();
                     } else {
@@ -401,14 +402,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
         probe_size = probe_index - last_probe_index + 
probe_row_match_iter.ok();
     }
 
-    build_side_output_column(mcol, *_join_context->_right_output_slot_flags, 
current_offset,
-                             with_other_conjuncts);
+    build_side_output_column(mcol, *_right_output_slot_flags, current_offset, 
with_other_conjuncts);
 
     if constexpr (with_other_conjuncts || (JoinOpType != 
TJoinOp::RIGHT_SEMI_JOIN &&
                                            JoinOpType != 
TJoinOp::RIGHT_ANTI_JOIN)) {
         RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
-                mcol, *_join_context->_left_output_slot_flags, current_offset, 
last_probe_index,
-                probe_size, all_match_one, with_other_conjuncts));
+                mcol, *_left_output_slot_flags, current_offset, 
last_probe_index, probe_size,
+                all_match_one, with_other_conjuncts));
     }
 
     output_block->swap(mutable_block.to_block());
@@ -421,8 +421,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     return Status::OK();
 }
 
-template <int JoinOpType>
-Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
+template <int JoinOpType, typename Parent>
+Status ProcessHashTableProbe<JoinOpType, Parent>::do_other_join_conjuncts(
         Block* output_block, bool is_mark_join, int 
multi_matched_output_row_count,
         bool is_the_last_sub_block) {
     // dispose the other join conjunct exec
@@ -431,14 +431,14 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
         return Status::OK();
     }
 
-    SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer);
+    SCOPED_TIMER(_parent->_process_other_join_conjunct_timer);
     int orig_columns = output_block->columns();
     IColumn::Filter other_conjunct_filter(row_count, 1);
     {
         bool can_be_filter_all = false;
-        RETURN_IF_ERROR(VExprContext::execute_conjuncts(
-                *_join_context->_other_join_conjuncts, nullptr, output_block,
-                &other_conjunct_filter, &can_be_filter_all));
+        
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, 
nullptr,
+                                                        output_block, 
&other_conjunct_filter,
+                                                        &can_be_filter_all));
     }
 
     auto filter_column = ColumnUInt8::create();
@@ -465,7 +465,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
                                                   null_map_data, filter_map, 
output_block);
             // This is the last sub block of splitted block, and no 
equal-conjuncts-matched tuple
             // is output in all sub blocks, need to output a tuple for this 
probe row
-            if (is_the_last_sub_block && 
!*_join_context->_is_any_probe_match_row_output) {
+            if (is_the_last_sub_block && 
!_parent->_is_any_probe_match_row_output) {
                 filter_map[0] = true;
                 null_map_data[0] = true;
             }
@@ -514,7 +514,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
 
         // It contains the first sub block of splited equal-conjuncts-matched 
tuples of the current probe row
         if (multi_matched_output_row_count > 0) {
-            *_join_context->_is_any_probe_match_row_output = false;
+            _parent->_is_any_probe_match_row_output = false;
             _process_splited_equal_matched_tuples(row_count - 
multi_matched_output_row_count,
                                                   
multi_matched_output_row_count, filter_column_ptr,
                                                   null_map_data, filter_map, 
output_block);
@@ -534,7 +534,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
         size_t start_row_idx = 1;
         // We are handling euqual-conjuncts matched tuples that are splitted 
into multiple blocks
         if (_row_count_from_last_probe > 0) {
-            if (*_join_context->_is_any_probe_match_row_output) {
+            if (_parent->_is_any_probe_match_row_output) {
                 // if any matched tuple for this probe row is output,
                 // ignore all the following tuples for this probe row.
                 for (int row_idx = 0; row_idx < _row_count_from_last_probe; 
++row_idx) {
@@ -563,14 +563,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
         if (multi_matched_output_row_count > 0) {
             // If a matched row is output, all the equal-matched tuples in
             // the following sub blocks should be ignored
-            *_join_context->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
-        } else if (_row_count_from_last_probe > 0 &&
-                   !*_join_context->_is_any_probe_match_row_output) {
+            _parent->_is_any_probe_match_row_output = filter_map[row_count - 
1];
+        } else if (_row_count_from_last_probe > 0 && 
!_parent->_is_any_probe_match_row_output) {
             // We are handling euqual-conjuncts matched tuples that are 
splitted into multiple blocks,
             // and no matched tuple has been output in all previous run.
             // If a tuple is output in this run, all the following mathced 
tuples should be ignored
             if (filter_map[_row_count_from_last_probe - 1]) {
-                *_join_context->_is_any_probe_match_row_output = true;
+                _parent->_is_any_probe_match_row_output = true;
             }
         }
 
@@ -609,7 +608,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
 
         size_t start_row_idx = 1;
         // We are handling euqual-conjuncts matched tuples that are splitted 
into multiple blocks
-        if (_row_count_from_last_probe > 0 && 
*_join_context->_is_any_probe_match_row_output) {
+        if (_row_count_from_last_probe > 0 && 
_parent->_is_any_probe_match_row_output) {
             // if any matched tuple for this probe row is output,
             // ignore all the following tuples for this probe row.
             for (int row_idx = 0; row_idx < _row_count_from_last_probe; 
++row_idx) {
@@ -658,15 +657,15 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
             int end_row_idx = 0;
             if (_row_count_from_last_probe > 0) {
                 end_row_idx = row_count - multi_matched_output_row_count;
-                if (!*_join_context->_is_any_probe_match_row_output) {
+                if (!_parent->_is_any_probe_match_row_output) {
                     // We are handling euqual-conjuncts matched tuples that 
are splitted into multiple blocks,
                     // and no matched tuple has been output in all previous 
run.
                     // If a tuple is output in this run, all the following 
mathced tuples should be ignored
                     if (filter_map[_row_count_from_last_probe - 1]) {
-                        *_join_context->_is_any_probe_match_row_output = true;
+                        _parent->_is_any_probe_match_row_output = true;
                         filter_map[_row_count_from_last_probe - 1] = false;
                     }
-                    if (is_the_last_sub_block && 
!*_join_context->_is_any_probe_match_row_output) {
+                    if (is_the_last_sub_block && 
!_parent->_is_any_probe_match_row_output) {
                         // This is the last sub block of splitted block, and 
no equal-conjuncts-matched tuple
                         // is output in all sub blocks, output a tuple for 
this probe row
                         filter_map[0] = true;
@@ -676,7 +675,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
                     // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
                     // If a matched row is output, all the equal-matched 
tuples in
                     // the following sub blocks should be ignored
-                    *_join_context->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
+                    _parent->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
                     filter_map[row_count - 1] = false;
                 }
             } else if (multi_matched_output_row_count > 0) {
@@ -684,7 +683,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
                 // It contains the first sub block of splited 
equal-conjuncts-matched tuples of the current probe row
                 // If a matched row is output, all the equal-matched tuples in
                 // the following sub blocks should be ignored
-                *_join_context->_is_any_probe_match_row_output = 
filter_map[row_count - 1];
+                _parent->_is_any_probe_match_row_output = filter_map[row_count 
- 1];
                 filter_map[row_count - 1] = false;
             } else {
                 end_row_idx = row_count;
@@ -744,8 +743,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(
 // and when processing the last sub block, check whether there are any
 // equal-conjuncts-matched tuple is output in all sub blocks,
 // if not, just pick a tuple and output.
-template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
+template <int JoinOpType, typename Parent>
+void ProcessHashTableProbe<JoinOpType, 
Parent>::_process_splited_equal_matched_tuples(
         int start_row_idx, int row_count, const UInt8* __restrict 
other_hit_column,
         UInt8* __restrict null_map_data, UInt8* __restrict filter_map, Block* 
output_block) {
     int end_row_idx = start_row_idx + row_count;
@@ -770,16 +769,15 @@ void 
ProcessHashTableProbe<JoinOpType>::_process_splited_equal_matched_tuples(
             *_visited_map[i] |= other_hit;
         }
     }
-    *_join_context->_is_any_probe_match_row_output |=
+    _parent->_is_any_probe_match_row_output |=
             simd::contain_byte(filter_map + start_row_idx, row_count, 1);
 }
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 template <typename HashTableType>
-Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& 
hash_table_ctx,
-                                                                    
MutableBlock& mutable_block,
-                                                                    Block* 
output_block,
-                                                                    bool* eos) 
{
+Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
+        HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* 
output_block,
+        bool* eos) {
     using Mapped = typename HashTableType::Mapped;
     SCOPED_TIMER(_probe_process_hashtable_timer);
     if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
@@ -787,16 +785,15 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
         hash_table_ctx.init_iterator();
         auto& mcol = mutable_block.mutable_columns();
 
-        bool right_semi_anti_without_other =
-                _join_context->_is_right_semi_anti && 
!_join_context->_have_other_join_conjunct;
+        bool right_semi_anti_without_other = _is_right_semi_anti && 
!_have_other_join_conjunct;
         int right_col_idx =
-                right_semi_anti_without_other ? 0 : 
_join_context->_left_table_data_types->size();
-        int right_col_len = _join_context->_right_table_data_types->size();
+                right_semi_anti_without_other ? 0 : 
_parent->left_table_data_types().size();
+        int right_col_len = _parent->right_table_data_types().size();
 
         auto& iter = hash_table_ctx.iterator;
         auto block_size = 0;
         auto& visited_iter =
-                
std::get<ForwardIterator<Mapped>>(*_join_context->_outer_join_pull_visited_iter);
+                
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
         _build_blocks_locs.resize(_batch_size);
         auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
             _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset, 
row_nums);
@@ -874,13 +871,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
 
         auto insert_build_rows = [&](int8_t offset) {
             for (size_t j = 0; j < right_col_len; ++j) {
-                auto& column = 
*_build_blocks[offset].get_by_position(j).column;
+                auto& column = 
*(*_build_blocks)[offset].get_by_position(j).column;
                 mcol[j + right_col_idx]->insert_indices_from(
                         column, _build_block_rows.data(),
                         _build_block_rows.data() + _build_block_rows.size());
             }
         };
-        if (_build_blocks.size() > 1) {
+        if (_build_blocks->size() > 1) {
             std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
                       [](const auto a, const auto b) { return a.first > 
b.first; });
             auto start = 0, end = 0;
@@ -897,7 +894,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
                 start = end;
                 insert_build_rows(offset);
             }
-        } else if (_build_blocks.size() == 1) {
+        } else if (_build_blocks->size() == 1) {
             const auto size = _build_blocks_locs.size();
             _build_block_rows.resize(_build_blocks_locs.size());
             for (int i = 0; i < size; i++) {
@@ -907,7 +904,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
-        if (_join_context->_is_right_semi_anti && 
_join_context->_have_other_join_conjunct) {
+        if (_is_right_semi_anti && _have_other_join_conjunct) {
             auto target_size = mcol[right_col_idx]->size();
             for (int i = 0; i < right_col_idx; ++i) {
                 mcol[i]->resize(target_size);
@@ -933,13 +930,11 @@ Status 
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
     }
 }
 
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 template <bool need_null_map_for_probe, bool ignore_null, typename 
HashTableType>
-Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& 
hash_table_ctx,
-                                                  ConstNullMapPtr null_map,
-                                                  MutableBlock& mutable_block, 
Block* output_block,
-                                                  size_t probe_rows, bool 
is_mark_join,
-                                                  bool 
have_other_join_conjunct) {
+Status ProcessHashTableProbe<JoinOpType, Parent>::process(
+        HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& 
mutable_block,
+        Block* output_block, size_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct) {
     Status res;
     if constexpr (!std::is_same_v<typename HashTableType::Mapped, 
RowRefListWithFlags>) {
         if (have_other_join_conjunct) {
@@ -973,74 +968,78 @@ struct ExtractType<T(U)> {
     using Type = U;
 };
 
-#define INSTANTIATION(JoinOpType, T)                                           
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<false, false, 
ExtractType<void(T)>::Type>(     \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<false, true, 
ExtractType<void(T)>::Type>(      \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<true, false, 
ExtractType<void(T)>::Type>(      \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-    template Status                                                            
               \
-    ProcessHashTableProbe<JoinOpType>::process<true, true, 
ExtractType<void(T)>::Type>(       \
-            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,            \
-            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,            \
-            bool is_mark_join, bool have_other_join_conjunct);                 
               \
-                                                                               
               \
-    template Status                                                            
               \
-    
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>(
 \
-            ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock & 
mutable_block,        \
-            Block * output_block, bool* eos)
-
-#define INSTANTIATION_FOR(JoinOpType)                                          
            \
-    template struct ProcessHashTableProbe<JoinOpType>;                         
            \
-                                                                               
            \
-    INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefList>));       
            \
-    INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefList>));               
            \
-    INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefList>));              
            \
-    INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefList>));              
            \
-    INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefList>));              
            \
-    INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefList>));             
            \
-    INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefList>));             
            \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, 
RowRefList>));            \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, 
RowRefList>));           \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, 
RowRefList>));           \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, 
RowRefList>));          \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, 
RowRefList>));           \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, 
RowRefList>));          \
-    INSTANTIATION(JoinOpType, 
(SerializedHashTableContext<RowRefListWithFlag>));           \
-    INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlag>));       
            \
-    INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlag>));      
            \
-    INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlag>));      
            \
-    INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlag>));      
            \
-    INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlag>));     
            \
-    INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlag>));     
            \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, 
RowRefListWithFlag>));    \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, 
RowRefListWithFlag>));   \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
-    INSTANTIATION(JoinOpType, 
(SerializedHashTableContext<RowRefListWithFlags>));          \
-    INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlags>));      
            \
-    INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlags>));     
            \
-    INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlags>));     
            \
-    INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlags>));     
            \
-    INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlags>));    
            \
-    INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlags>));    
            \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, 
RowRefListWithFlags>));   \
-    INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>));  \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
-    INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>)); \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
-    INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>))
+#define INSTANTIATION(JoinOpType, Parent, T)                                   
                   \
+    template Status                                                            
                   \
+    ProcessHashTableProbe<JoinOpType, Parent>::process<false, false, 
ExtractType<void(T)>::Type>( \
+            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,                \
+            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,                \
+            bool is_mark_join, bool have_other_join_conjunct);                 
                   \
+    template Status                                                            
                   \
+    ProcessHashTableProbe<JoinOpType, Parent>::process<false, true, 
ExtractType<void(T)>::Type>(  \
+            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,                \
+            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,                \
+            bool is_mark_join, bool have_other_join_conjunct);                 
                   \
+    template Status                                                            
                   \
+    ProcessHashTableProbe<JoinOpType, Parent>::process<true, false, 
ExtractType<void(T)>::Type>(  \
+            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,                \
+            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,                \
+            bool is_mark_join, bool have_other_join_conjunct);                 
                   \
+    template Status                                                            
                   \
+    ProcessHashTableProbe<JoinOpType, Parent>::process<true, true, 
ExtractType<void(T)>::Type>(   \
+            ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr 
null_map,                \
+            MutableBlock & mutable_block, Block * output_block, size_t 
probe_rows,                \
+            bool is_mark_join, bool have_other_join_conjunct);                 
                   \
+                                                                               
                   \
+    template Status ProcessHashTableProbe<JoinOpType, 
Parent>::process_data_in_hashtable<         \
+            ExtractType<void(T)>::Type>(ExtractType<void(T)>::Type & 
hash_table_ctx,              \
+                                        MutableBlock & mutable_block, Block * 
output_block,       \
+                                        bool* eos)
+
+#define INSTANTIATION_FOR1(JoinOpType, Parent)                                 
                    \
+    template struct ProcessHashTableProbe<JoinOpType, Parent>;                 
                    \
+                                                                               
                    \
+    INSTANTIATION(JoinOpType, Parent, 
(SerializedHashTableContext<RowRefList>));                   \
+    INSTANTIATION(JoinOpType, Parent, (I8HashTableContext<RowRefList>));       
                    \
+    INSTANTIATION(JoinOpType, Parent, (I16HashTableContext<RowRefList>));      
                    \
+    INSTANTIATION(JoinOpType, Parent, (I32HashTableContext<RowRefList>));      
                    \
+    INSTANTIATION(JoinOpType, Parent, (I64HashTableContext<RowRefList>));      
                    \
+    INSTANTIATION(JoinOpType, Parent, (I128HashTableContext<RowRefList>));     
                    \
+    INSTANTIATION(JoinOpType, Parent, (I256HashTableContext<RowRefList>));     
                    \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, 
RowRefList>));            \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, 
RowRefList>));           \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, 
RowRefList>));           \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefList>));          \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefList>));           \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefList>));          \
+    INSTANTIATION(JoinOpType, Parent, 
(SerializedHashTableContext<RowRefListWithFlag>));           \
+    INSTANTIATION(JoinOpType, Parent, 
(I8HashTableContext<RowRefListWithFlag>));                   \
+    INSTANTIATION(JoinOpType, Parent, 
(I16HashTableContext<RowRefListWithFlag>));                  \
+    INSTANTIATION(JoinOpType, Parent, 
(I32HashTableContext<RowRefListWithFlag>));                  \
+    INSTANTIATION(JoinOpType, Parent, 
(I64HashTableContext<RowRefListWithFlag>));                  \
+    INSTANTIATION(JoinOpType, Parent, 
(I128HashTableContext<RowRefListWithFlag>));                 \
+    INSTANTIATION(JoinOpType, Parent, 
(I256HashTableContext<RowRefListWithFlag>));                 \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, 
RowRefListWithFlag>));    \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, 
RowRefListWithFlag>));   \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlag>));   \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlag>));  \
+    INSTANTIATION(JoinOpType, Parent, 
(SerializedHashTableContext<RowRefListWithFlags>));          \
+    INSTANTIATION(JoinOpType, Parent, 
(I8HashTableContext<RowRefListWithFlags>));                  \
+    INSTANTIATION(JoinOpType, Parent, 
(I16HashTableContext<RowRefListWithFlags>));                 \
+    INSTANTIATION(JoinOpType, Parent, 
(I32HashTableContext<RowRefListWithFlags>));                 \
+    INSTANTIATION(JoinOpType, Parent, 
(I64HashTableContext<RowRefListWithFlags>));                 \
+    INSTANTIATION(JoinOpType, Parent, 
(I128HashTableContext<RowRefListWithFlags>));                \
+    INSTANTIATION(JoinOpType, Parent, 
(I256HashTableContext<RowRefListWithFlags>));                \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<true, 
RowRefListWithFlags>));   \
+    INSTANTIATION(JoinOpType, Parent, (I64FixedKeyHashTableContext<false, 
RowRefListWithFlags>));  \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
+    INSTANTIATION(JoinOpType, Parent, (I128FixedKeyHashTableContext<false, 
RowRefListWithFlags>)); \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<true, 
RowRefListWithFlags>));  \
+    INSTANTIATION(JoinOpType, Parent, (I256FixedKeyHashTableContext<false, 
RowRefListWithFlags>))
+
+#define INSTANTIATION_FOR(JoinOpType)             \
+    INSTANTIATION_FOR1(JoinOpType, HashJoinNode); \
+    INSTANTIATION_FOR1(JoinOpType, pipeline::HashJoinProbeLocalState)
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index e3647762d46..f48a7e278fe 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -84,120 +84,6 @@ template Status HashJoinNode::_extract_join_column<false>(
         std::vector<IColumn const*, std::allocator<IColumn const*>>&,
         std::vector<int, std::allocator<int>> const&);
 
-RuntimeFilterContext::RuntimeFilterContext(HashJoinNode* join_node)
-        : _runtime_filter_descs(join_node->_runtime_filter_descs),
-          _runtime_filter_slots(join_node->_runtime_filter_slots),
-          _build_expr_ctxs(join_node->_build_expr_ctxs),
-          _build_rf_cardinality(join_node->_build_rf_cardinality),
-          _inserted_rows(join_node->_inserted_rows),
-          _push_down_timer(join_node->_push_down_timer),
-          _push_compute_timer(join_node->_push_compute_timer) {}
-
-RuntimeFilterContext::RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState*
 local_state)
-        : 
_runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
-          _runtime_filter_slots(local_state->_runtime_filter_slots),
-          _build_expr_ctxs(local_state->_build_expr_ctxs),
-          _build_rf_cardinality(local_state->_build_rf_cardinality),
-          _inserted_rows(local_state->_inserted_rows),
-          _push_down_timer(local_state->_push_down_timer),
-          _push_compute_timer(local_state->_push_compute_timer) {}
-
-HashJoinProbeContext::HashJoinProbeContext(HashJoinNode* join_node)
-        : _have_other_join_conjunct(join_node->_have_other_join_conjunct),
-          _is_right_semi_anti(join_node->_is_right_semi_anti),
-          _is_outer_join(join_node->_is_outer_join),
-          
_tuple_is_null_left_flag_column(&join_node->_tuple_is_null_left_flag_column),
-          
_tuple_is_null_right_flag_column(&join_node->_tuple_is_null_right_flag_column),
-          _other_join_conjuncts(&join_node->_other_join_conjuncts),
-          _right_table_data_types(&join_node->_right_table_data_types),
-          _left_table_data_types(&join_node->_left_table_data_types),
-          _search_hashtable_timer(join_node->_search_hashtable_timer),
-          _build_side_output_timer(join_node->_build_side_output_timer),
-          _probe_side_output_timer(join_node->_probe_side_output_timer),
-          
_probe_process_hashtable_timer(join_node->_probe_process_hashtable_timer),
-          
_process_other_join_conjunct_timer(join_node->_process_other_join_conjunct_timer),
-          _rows_returned_counter(join_node->_rows_returned_counter),
-          _probe_arena_memory_usage(join_node->_probe_arena_memory_usage),
-          _arena(join_node->_arena),
-          
_outer_join_pull_visited_iter(&join_node->_outer_join_pull_visited_iter),
-          _probe_row_match_iter(&join_node->_probe_row_match_iter),
-          _build_blocks(join_node->_build_blocks),
-          _probe_block(&join_node->_probe_block),
-          _probe_columns(&join_node->_probe_columns),
-          _probe_index(&join_node->_probe_index),
-          _ready_probe(&join_node->_ready_probe),
-          _probe_key_sz(join_node->_probe_key_sz),
-          _left_output_slot_flags(&join_node->_left_output_slot_flags),
-          _right_output_slot_flags(&join_node->_right_output_slot_flags),
-          
_is_any_probe_match_row_output(&join_node->_is_any_probe_match_row_output),
-          _has_null_value_in_build_side(join_node->_has_null_in_build_side) {}
-
-HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* 
local_state)
-        : 
_have_other_join_conjunct(local_state->join_probe()->_have_other_join_conjunct),
-          _is_right_semi_anti(local_state->join_probe()->_is_right_semi_anti),
-          _is_outer_join(local_state->join_probe()->_is_outer_join),
-          
_tuple_is_null_left_flag_column(&local_state->_tuple_is_null_left_flag_column),
-          
_tuple_is_null_right_flag_column(&local_state->_tuple_is_null_right_flag_column),
-          _other_join_conjuncts(&local_state->_other_join_conjuncts),
-          
_right_table_data_types(&local_state->join_probe()->_right_table_data_types),
-          
_left_table_data_types(&local_state->join_probe()->_left_table_data_types),
-          _search_hashtable_timer(local_state->_search_hashtable_timer),
-          _build_side_output_timer(local_state->_build_side_output_timer),
-          _probe_side_output_timer(local_state->_probe_side_output_timer),
-          
_probe_process_hashtable_timer(local_state->_probe_process_hashtable_timer),
-          
_process_other_join_conjunct_timer(local_state->_process_other_join_conjunct_timer),
-          _rows_returned_counter(local_state->_rows_returned_counter),
-          _probe_arena_memory_usage(local_state->_probe_arena_memory_usage),
-          _arena(local_state->_shared_state->arena),
-          
_outer_join_pull_visited_iter(&local_state->_shared_state->outer_join_pull_visited_iter),
-          
_probe_row_match_iter(&local_state->_shared_state->probe_row_match_iter),
-          _build_blocks(local_state->_shared_state->build_blocks),
-          _probe_block(&local_state->_probe_block),
-          _probe_columns(&local_state->_probe_columns),
-          _probe_index(&local_state->_probe_index),
-          _ready_probe(&local_state->_ready_probe),
-          _probe_key_sz(local_state->_shared_state->probe_key_sz),
-          
_left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags),
-          
_right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags),
-          
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output),
-          
_has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side)
 {}
-
-HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node)
-        : _hash_table_memory_usage(join_node->_hash_table_memory_usage),
-          _build_buckets_counter(join_node->_build_buckets_counter),
-          _build_collisions_counter(join_node->_build_collisions_counter),
-          _build_buckets_fill_counter(join_node->_build_buckets_fill_counter),
-          _build_table_insert_timer(join_node->_build_table_insert_timer),
-          _build_table_expanse_timer(join_node->_build_table_expanse_timer),
-          _build_table_convert_timer(join_node->_build_table_convert_timer),
-          
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer),
-          _build_arena_memory_usage(join_node->_build_arena_memory_usage),
-          _profile(join_node->runtime_profile()),
-          _build_key_sz(join_node->_build_key_sz),
-          _build_unique(join_node->_build_unique),
-          _runtime_filter_descs(join_node->_runtime_filter_descs),
-          _inserted_rows(join_node->_inserted_rows),
-          _arena(join_node->_arena),
-          _build_rf_cardinality(join_node->_build_rf_cardinality) {}
-
-HashJoinBuildContext::HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState*
 local_state)
-        : _hash_table_memory_usage(local_state->_hash_table_memory_usage),
-          _build_buckets_counter(local_state->_build_buckets_counter),
-          _build_collisions_counter(local_state->_build_collisions_counter),
-          
_build_buckets_fill_counter(local_state->_build_buckets_fill_counter),
-          _build_table_insert_timer(local_state->_build_table_insert_timer),
-          _build_table_expanse_timer(local_state->_build_table_expanse_timer),
-          _build_table_convert_timer(local_state->_build_table_convert_timer),
-          
_build_side_compute_hash_timer(local_state->_build_side_compute_hash_timer),
-          _build_arena_memory_usage(local_state->_build_arena_memory_usage),
-          _profile(local_state->profile()),
-          _build_key_sz(local_state->join_build()->_build_key_sz),
-          _build_unique(local_state->join_build()->_build_unique),
-          
_runtime_filter_descs(local_state->join_build()->_runtime_filter_descs),
-          _inserted_rows(local_state->_inserted_rows),
-          _arena(local_state->_shared_state->arena),
-          _build_rf_cardinality(local_state->_build_rf_cardinality) {}
-
 HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : VJoinNodeBase(pool, tnode, descs),
           _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
@@ -896,11 +782,8 @@ Status HashJoinNode::sink(doris::RuntimeState* state, 
vectorized::Block* in_bloc
                                             __builtin_unreachable();
                                         },
                                         [&](auto&& arg) -> Status {
-                                            using HashTableCtxType = 
std::decay_t<decltype(arg)>;
-                                            RuntimeFilterContext context(this);
-                                            
ProcessRuntimeFilterBuild<HashTableCtxType>
-                                                    
runtime_filter_build_process(&context);
-                                            return 
runtime_filter_build_process(state, arg);
+                                            ProcessRuntimeFilterBuild 
runtime_filter_build_process;
+                                            return 
runtime_filter_build_process(state, arg, this);
                                         }},
                               *_hash_table_variants);
         if (!ret.ok()) {
@@ -1119,10 +1002,9 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
                     [&](auto&& arg, auto has_null_value,
                         auto short_circuit_for_null_in_build_side) -> Status {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
-                        HashJoinBuildContext context(this);
-                        ProcessHashTableBuild<HashTableCtxType> 
hash_table_build_process(
-                                rows, block, raw_ptrs, &context, 
state->batch_size(), offset,
-                                state);
+                        ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
+                                hash_table_build_process(rows, block, 
raw_ptrs, this,
+                                                         state->batch_size(), 
offset, state);
                         return hash_table_build_process
                                 .template run<has_null_value, 
short_circuit_for_null_in_build_side>(
                                         arg,
@@ -1277,9 +1159,9 @@ void 
HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
     std::visit(
             [&](auto&& join_op_variants) {
                 using JoinOpType = std::decay_t<decltype(join_op_variants)>;
-                _probe_context.reset(new HashJoinProbeContext(this));
-                
_process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>(
-                        _probe_context.get(), state->batch_size());
+                _process_hashtable_ctx_variants
+                        ->emplace<ProcessHashTableProbe<JoinOpType::value, 
HashJoinNode>>(
+                                this, state->batch_size());
             },
             _join_op_variants);
 }
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 19ddfc4626a..b9264bc1457 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -69,99 +69,52 @@ constexpr size_t CHECK_FRECUENCY = 65536;
 
 struct UInt128;
 struct UInt256;
-template <int JoinOpType>
+template <int JoinOpType, typename Parent>
 struct ProcessHashTableProbe;
 class HashJoinNode;
 
-struct RuntimeFilterContext {
-    RuntimeFilterContext(HashJoinNode* join_node);
-    RuntimeFilterContext(pipeline::HashJoinBuildSinkLocalState* local_state);
-    std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
-    std::shared_ptr<VRuntimeFilterSlots>& _runtime_filter_slots;
-    VExprContextSPtrs& _build_expr_ctxs;
-    size_t& _build_rf_cardinality;
-    std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
-    RuntimeProfile::Counter* _push_down_timer;
-    RuntimeProfile::Counter* _push_compute_timer;
-};
-
-template <class HashTableContext>
 struct ProcessRuntimeFilterBuild {
-    ProcessRuntimeFilterBuild(RuntimeFilterContext* context) : 
_context(context) {}
-
-    Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx) {
-        if (_context->_runtime_filter_descs.empty()) {
+    template <class HashTableContext, typename Parent>
+    Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, 
Parent* parent) {
+        if (parent->runtime_filter_descs().empty()) {
             return Status::OK();
         }
-        _context->_runtime_filter_slots = 
std::make_shared<VRuntimeFilterSlots>(
-                _context->_build_expr_ctxs, _context->_runtime_filter_descs);
+        parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
+                parent->_build_expr_ctxs, parent->runtime_filter_descs());
 
-        RETURN_IF_ERROR(_context->_runtime_filter_slots->init(
-                state, hash_table_ctx.hash_table->size(), 
_context->_build_rf_cardinality));
+        RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
+                state, hash_table_ctx.hash_table->size(), 
parent->_build_rf_cardinality));
 
-        if (!_context->_runtime_filter_slots->empty() && 
!_context->_inserted_rows.empty()) {
+        if (!parent->_runtime_filter_slots->empty() && 
!parent->_inserted_rows.empty()) {
             {
-                SCOPED_TIMER(_context->_push_compute_timer);
-                
_context->_runtime_filter_slots->insert(_context->_inserted_rows);
+                SCOPED_TIMER(parent->_push_compute_timer);
+                parent->_runtime_filter_slots->insert(parent->_inserted_rows);
             }
         }
         {
-            SCOPED_TIMER(_context->_push_down_timer);
-            RETURN_IF_ERROR(_context->_runtime_filter_slots->publish());
+            SCOPED_TIMER(parent->_push_down_timer);
+            RETURN_IF_ERROR(parent->_runtime_filter_slots->publish());
         }
 
         return Status::OK();
     }
-
-private:
-    RuntimeFilterContext* _context;
-};
-
-struct HashJoinBuildContext {
-    HashJoinBuildContext(HashJoinNode* join_node);
-    HashJoinBuildContext(pipeline::HashJoinBuildSinkLocalState* local_state);
-
-    void add_hash_buckets_info(const std::string& info) const {
-        _profile->add_info_string("HashTableBuckets", info);
-    }
-    void add_hash_buckets_filled_info(const std::string& info) const {
-        _profile->add_info_string("HashTableFilledBuckets", info);
-    }
-    RuntimeProfile::Counter* _hash_table_memory_usage;
-    RuntimeProfile::Counter* _build_buckets_counter;
-    RuntimeProfile::Counter* _build_collisions_counter;
-    RuntimeProfile::Counter* _build_buckets_fill_counter;
-    RuntimeProfile::Counter* _build_table_insert_timer;
-    RuntimeProfile::Counter* _build_table_expanse_timer;
-    RuntimeProfile::Counter* _build_table_convert_timer;
-    RuntimeProfile::Counter* _build_side_compute_hash_timer;
-    RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage;
-    RuntimeProfile* _profile;
-
-    Sizes& _build_key_sz;
-    bool& _build_unique;
-    std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
-    std::unordered_map<const Block*, std::vector<int>>& _inserted_rows;
-    std::shared_ptr<Arena>& _arena;
-    size_t& _build_rf_cardinality;
 };
 
 using ProfileCounter = RuntimeProfile::Counter;
 
-template <class HashTableContext>
+template <class HashTableContext, typename Parent>
 struct ProcessHashTableBuild {
     ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& 
build_raw_ptrs,
-                          HashJoinBuildContext* join_context, int batch_size, 
uint8_t offset,
-                          RuntimeState* state)
+                          Parent* parent, int batch_size, uint8_t offset, 
RuntimeState* state)
             : _rows(rows),
               _skip_rows(0),
               _acquired_block(acquired_block),
               _build_raw_ptrs(build_raw_ptrs),
-              _join_context(join_context),
+              _parent(parent),
               _batch_size(batch_size),
               _offset(offset),
               _state(state),
-              
_build_side_compute_hash_timer(join_context->_build_side_compute_hash_timer) {}
+              
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
 
     template <bool ignore_null, bool short_circuit_for_null>
     Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, 
bool* has_null_key) {
@@ -172,30 +125,30 @@ struct ProcessHashTableBuild {
             int64_t bucket_size = 
hash_table_ctx.hash_table->get_buffer_size_in_cells();
             int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
             int64_t bucket_bytes = 
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
-            COUNTER_SET(_join_context->_hash_table_memory_usage, bucket_bytes);
-            COUNTER_SET(_join_context->_build_buckets_counter, bucket_size);
-            COUNTER_SET(_join_context->_build_collisions_counter,
+            COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
+            COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
+            COUNTER_SET(_parent->_build_collisions_counter,
                         hash_table_ctx.hash_table->get_collisions());
-            COUNTER_SET(_join_context->_build_buckets_fill_counter, 
filled_bucket_size);
+            COUNTER_SET(_parent->_build_buckets_fill_counter, 
filled_bucket_size);
 
             auto hash_table_buckets = 
hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
             std::string hash_table_buckets_info;
             for (auto bucket_count : hash_table_buckets) {
                 hash_table_buckets_info += std::to_string(bucket_count) + ", ";
             }
-            _join_context->add_hash_buckets_info(hash_table_buckets_info);
+            _parent->add_hash_buckets_info(hash_table_buckets_info);
 
             auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
             hash_table_buckets_info.clear();
             for (auto table_size : hash_table_sizes) {
                 hash_table_buckets_info += std::to_string(table_size) + ", ";
             }
-            
_join_context->add_hash_buckets_filled_info(hash_table_buckets_info);
+            _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
         }};
 
-        KeyGetter key_getter(_build_raw_ptrs, _join_context->_build_key_sz);
+        KeyGetter key_getter(_build_raw_ptrs, _parent->build_key_sz());
 
-        SCOPED_TIMER(_join_context->_build_table_insert_timer);
+        SCOPED_TIMER(_parent->_build_table_insert_timer);
         hash_table_ctx.hash_table->reset_resize_timer();
 
         // only not build_unique, we need expanse hash table before insert data
@@ -204,21 +157,21 @@ struct ProcessHashTableBuild {
         // 2. There are many duplicate keys, and the hash table filled bucket 
is far less than
         // the hash table build bucket, which may waste a lot of memory.
         // TODO, use the NDV expansion of the key column in the optimizer 
statistics
-        if (!_join_context->_build_unique) {
+        if (!_parent->build_unique()) {
             
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
                     std::min<int>(_rows, 
config::hash_table_pre_expanse_max_rows)));
         }
 
-        vector<int>& inserted_rows = 
_join_context->_inserted_rows[&_acquired_block];
-        bool has_runtime_filter = 
!_join_context->_runtime_filter_descs.empty();
+        vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
+        bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
         if (has_runtime_filter) {
             inserted_rows.reserve(_batch_size);
         }
 
-        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, 
_join_context->_build_key_sz, _rows,
+        hash_table_ctx.init_serialized_keys(_build_raw_ptrs, 
_parent->build_key_sz(), _rows,
                                             null_map ? null_map->data() : 
nullptr);
 
-        auto& arena = *(_join_context->_arena);
+        auto& arena = *_parent->arena();
         auto old_build_arena_memory = arena.size();
 
         size_t k = 0;
@@ -229,7 +182,7 @@ struct ProcessHashTableBuild {
             ctor(key, Mapped {k, _offset});
         };
 
-        bool build_unique = _join_context->_build_unique;
+        bool build_unique = _parent->build_unique();
 #define EMPLACE_IMPL(stmt)                                                    \
     for (; k < _rows; ++k) {                                                  \
         if (k % CHECK_FRECUENCY == 0) {                                       \
@@ -258,21 +211,21 @@ struct ProcessHashTableBuild {
         } else if (has_runtime_filter && !build_unique) {
             EMPLACE_IMPL(
                     if (inserted) { inserted_rows.push_back(k); } else {
-                        mapped.insert({k, _offset}, *(_join_context->_arena));
+                        mapped.insert({k, _offset}, *_parent->arena());
                         inserted_rows.push_back(k);
                     });
         } else if (!has_runtime_filter && build_unique) {
             EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
         } else {
-            EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, 
*(_join_context->_arena)); });
+            EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset}, 
*_parent->arena()); });
         }
-        _join_context->_build_rf_cardinality += inserted_rows.size();
+        _parent->_build_rf_cardinality += inserted_rows.size();
 
-        _join_context->_build_arena_memory_usage->add(arena.size() - 
old_build_arena_memory);
+        _parent->_build_arena_memory_usage->add(arena.size() - 
old_build_arena_memory);
 
-        COUNTER_UPDATE(_join_context->_build_table_expanse_timer,
+        COUNTER_UPDATE(_parent->_build_table_expanse_timer,
                        hash_table_ctx.hash_table->get_resize_timer_value());
-        COUNTER_UPDATE(_join_context->_build_table_convert_timer,
+        COUNTER_UPDATE(_parent->_build_table_convert_timer,
                        hash_table_ctx.hash_table->get_convert_timer_value());
 
         return Status::OK();
@@ -283,7 +236,7 @@ private:
     int _skip_rows;
     Block& _acquired_block;
     ColumnRawPtrs& _build_raw_ptrs;
-    HashJoinBuildContext* _join_context;
+    Parent* _parent;
     int _batch_size;
     uint8_t _offset;
     RuntimeState* _state;
@@ -348,16 +301,16 @@ using HashTableVariants = std::variant<
 class VExprContext;
 
 using HashTableCtxVariants =
-        std::variant<std::monostate, 
ProcessHashTableProbe<TJoinOp::INNER_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::CROSS_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>,
-                     ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>,
-                     
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>>;
+        std::variant<std::monostate, 
ProcessHashTableProbe<TJoinOp::INNER_JOIN, HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::CROSS_JOIN, HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN, 
HashJoinNode>,
+                     ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, 
HashJoinNode>>;
 
 using HashTableIteratorVariants =
         std::variant<std::monostate, ForwardIterator<RowRefList>,
@@ -365,52 +318,6 @@ using HashTableIteratorVariants =
 
 static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
 
-struct HashJoinProbeContext {
-    HashJoinProbeContext(HashJoinNode* join_node);
-    HashJoinProbeContext(pipeline::HashJoinProbeLocalState* local_state);
-    bool _have_other_join_conjunct;
-    const bool _is_right_semi_anti;
-    const bool _is_outer_join;
-
-    MutableColumnPtr* _tuple_is_null_left_flag_column;
-    MutableColumnPtr* _tuple_is_null_right_flag_column;
-
-    // other expr
-    VExprContextSPtrs* _other_join_conjuncts;
-
-    DataTypes* _right_table_data_types;
-    DataTypes* _left_table_data_types;
-
-    RuntimeProfile::Counter* _search_hashtable_timer;
-    RuntimeProfile::Counter* _build_side_output_timer;
-    RuntimeProfile::Counter* _probe_side_output_timer;
-    RuntimeProfile::Counter* _probe_process_hashtable_timer;
-    RuntimeProfile::Counter* _process_other_join_conjunct_timer;
-    RuntimeProfile::Counter* _rows_returned_counter;
-    RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage;
-
-    std::shared_ptr<Arena> _arena;
-
-    // for full/right outer join
-    HashTableIteratorVariants* _outer_join_pull_visited_iter;
-    HashTableIteratorVariants* _probe_row_match_iter;
-
-    std::shared_ptr<std::vector<Block>> _build_blocks;
-    Block* _probe_block;
-    ColumnRawPtrs* _probe_columns;
-    int* _probe_index;
-    bool* _ready_probe;
-
-    Sizes _probe_key_sz;
-
-    std::vector<bool>* _left_output_slot_flags;
-    std::vector<bool>* _right_output_slot_flags;
-
-    // for cases when a probe row matches more than batch size build rows.
-    bool* _is_any_probe_match_row_output;
-    bool _has_null_value_in_build_side {};
-};
-
 class HashJoinNode final : public VJoinNodeBase {
 public:
     HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
@@ -450,13 +357,27 @@ public:
         return _runtime_filter_slots->ready_finish_publish();
     }
 
+    bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
+    bool is_right_semi_anti() const { return _is_right_semi_anti; }
+    bool is_outer_join() const { return _is_outer_join; }
+    std::shared_ptr<std::vector<Block>> build_blocks() const { return 
_build_blocks; }
+    Sizes probe_key_sz() const { return _probe_key_sz; }
+    std::vector<bool>* left_output_slot_flags() { return 
&_left_output_slot_flags; }
+    std::vector<bool>* right_output_slot_flags() { return 
&_right_output_slot_flags; }
+    bool* has_null_in_build_side() { return &_has_null_in_build_side; }
+    DataTypes right_table_data_types() { return _right_table_data_types; }
+    DataTypes left_table_data_types() { return _left_table_data_types; }
+    vectorized::Sizes& build_key_sz() { return _build_key_sz; }
+    bool build_unique() const { return _build_unique; }
+    std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { return 
_runtime_filter_descs; }
+    std::shared_ptr<vectorized::Arena> arena() { return _arena; }
+
 protected:
     void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* 
status) override;
 
 private:
-    friend struct HashJoinProbeContext;
-    friend struct HashJoinBuildContext;
-    friend struct RuntimeFilterContext;
+    template <int JoinOpType, typename Parent>
+    friend struct ProcessHashTableProbe;
 
     void _init_short_circuit_for_probe() override {
         _short_circuit_for_probe =
@@ -605,13 +526,12 @@ private:
                                          bool* eos, Block* temp_block,
                                          bool check_rows_count = true);
 
-    template <class HashTableContext>
+    template <class HashTableContext, typename Parent>
     friend struct ProcessHashTableBuild;
 
-    template <int JoinOpType>
+    template <int JoinOpType, typename Parent>
     friend struct ProcessHashTableProbe;
 
-    template <class HashTableContext>
     friend struct ProcessRuntimeFilterBuild;
 
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
@@ -620,8 +540,6 @@ private:
     std::vector<IRuntimeFilter*> _runtime_filters;
     size_t _build_rf_cardinality = 0;
     std::atomic_bool _probe_open_finish = false;
-
-    std::unique_ptr<HashJoinProbeContext> _probe_context;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 9bb946bc0ef..b3758407f88 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -105,7 +105,7 @@ protected:
     TJoinOp::type _join_op;
     JoinOpVariants _join_op_variants;
 
-    bool _have_other_join_conjunct;
+    const bool _have_other_join_conjunct;
     const bool _match_all_probe; // output all rows coming from the probe 
input. Full/Left Join
     const bool _match_all_build; // output all rows coming from the build 
input. Full/Right Join
     bool _build_unique;          // build a hash table without duplicated 
rows. Left semi/anti Join
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index fae87c3731e..05c0bc609eb 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -54,6 +54,7 @@ struct SharedRuntimeFilterContext {
 struct SharedHashTableContext {
     SharedHashTableContext()
             : hash_table_variants(nullptr),
+              blocks(new std::vector<vectorized::Block>()),
               signaled(false),
               short_circuit_for_null_in_probe_side(false) {}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to