This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b5dcdcbf065 [Improvement](join) support lazy materialize on mark join 
(#50250)
b5dcdcbf065 is described below

commit b5dcdcbf065d4b06b314995f2d22577481cfe77d
Author: Pxl <[email protected]>
AuthorDate: Wed Apr 23 14:29:11 2025 +0800

    [Improvement](join) support lazy materialize on mark join (#50250)
    
    ### What problem does this PR solve?
    1. support lazy materialize on mark join
    2. add ProbeIntermediateRows to profile
    
    
    
![图片](https://github.com/user-attachments/assets/d641debb-6bfc-4ed4-a9ae-d015952db650)
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp          | 7 +------
 be/src/pipeline/exec/hashjoin_probe_operator.h            | 4 +++-
 be/src/pipeline/exec/join/process_hash_table_probe_impl.h | 8 +++++---
 be/src/pipeline/exec/join_probe_operator.cpp              | 2 ++
 be/src/pipeline/exec/join_probe_operator.h                | 1 +
 be/src/pipeline/exec/nested_loop_join_probe_operator.cpp  | 1 +
 6 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 6138369fe23..175a255b134 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -526,17 +526,12 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* 
state) {
     // _other_join_conjuncts are evaluated in the context of the rows produced 
by this node
     for (auto& conjunct : _other_join_conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
-    }
-
-    for (auto conjunct : _other_join_conjuncts) {
         
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
     }
 
     for (auto& conjunct : _mark_join_conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
-        if (_have_other_join_conjunct) {
-            
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
-        }
+        
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
     }
 
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, 
_child->row_desc()));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d1aa2280454..51758d8b8fb 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -154,8 +154,10 @@ public:
 
     bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
 
+    bool can_do_lazy_materialized() const { return _have_other_join_conjunct 
|| _is_mark_join; }
+
     bool is_lazy_materialized_column(int column_id) const {
-        return _have_other_join_conjunct &&
+        return can_do_lazy_materialized() &&
                !_should_not_lazy_materialized_column_ids.contains(column_id);
     }
 
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index ae27cc1f956..40ab0c73768 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -269,6 +269,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
     }
 
     output_block->swap(mutable_block.to_block());
+    DCHECK_EQ(current_offset, output_block->rows());
+    COUNTER_UPDATE(_parent->_intermediate_rows_counter, current_offset);
 
     if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
         bool ignore_null_map =
@@ -315,13 +317,13 @@ Status 
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
     vectorized::ColumnPtr filter_ptr = 
output_block->get_by_position(filter_column_id).column;
     RETURN_IF_ERROR(
             vectorized::Block::filter_block(output_block, filter_column_id, 
column_to_keep));
+    if (!_parent_operator->can_do_lazy_materialized()) {
+        return Status::OK();
+    }
 
     auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
                                    vectorized::ColumnVector<unsigned int>& 
row_indexs,
                                    int column_offset, vectorized::Block* 
source_block) {
-        if (!_have_other_join_conjunct) {
-            return;
-        }
         std::vector<int> column_ids;
         for (int i = 0; i < output_slot_flags.size(); ++i) {
             if (output_slot_flags[i] &&
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index 3b0eb9ee99e..b99997c0e55 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -34,6 +34,8 @@ Status JoinProbeLocalState<SharedStateArg, 
Derived>::init(RuntimeState* state,
     _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
     _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
     _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", 
TUnit::UNIT, 1);
+    _intermediate_rows_counter =
+            ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeIntermediateRows", 
TUnit::UNIT, 1);
     _finish_probe_phase_timer = ADD_TIMER(Base::profile(), 
"FinishProbePhaseTime");
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index 6286b321661..705d319efb4 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -44,6 +44,7 @@ protected:
     size_t _mark_column_id = -1;
 
     RuntimeProfile::Counter* _probe_rows_counter = nullptr;
+    RuntimeProfile::Counter* _intermediate_rows_counter = nullptr;
     RuntimeProfile::Counter* _join_filter_timer = nullptr;
     RuntimeProfile::Counter* _build_output_block_timer = nullptr;
     RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index c602c6d82d4..05323d0a2ab 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -177,6 +177,7 @@ Status 
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
                     _join_block, state->batch_size());
         }
     }
+    COUNTER_UPDATE(_probe_rows_counter, _join_block.rows());
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to