This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b5dcdcbf065 [Improvement](join) support lazy materialize on mark join
(#50250)
b5dcdcbf065 is described below
commit b5dcdcbf065d4b06b314995f2d22577481cfe77d
Author: Pxl <[email protected]>
AuthorDate: Wed Apr 23 14:29:11 2025 +0800
[Improvement](join) support lazy materialize on mark join (#50250)
### What problem does this PR solve?
1. support lazy materialize on mark join
2. add ProbeIntermediateRows to profile

### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 7 +------
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +++-
be/src/pipeline/exec/join/process_hash_table_probe_impl.h | 8 +++++---
be/src/pipeline/exec/join_probe_operator.cpp | 2 ++
be/src/pipeline/exec/join_probe_operator.h | 1 +
be/src/pipeline/exec/nested_loop_join_probe_operator.cpp | 1 +
6 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 6138369fe23..175a255b134 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -526,17 +526,12 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState*
state) {
// _other_join_conjuncts are evaluated in the context of the rows produced
by this node
for (auto& conjunct : _other_join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
- }
-
- for (auto conjunct : _other_join_conjuncts) {
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
}
for (auto& conjunct : _mark_join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
- if (_have_other_join_conjunct) {
-
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
- }
+
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state,
_child->row_desc()));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d1aa2280454..51758d8b8fb 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -154,8 +154,10 @@ public:
bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
+ bool can_do_lazy_materialized() const { return _have_other_join_conjunct
|| _is_mark_join; }
+
bool is_lazy_materialized_column(int column_id) const {
- return _have_other_join_conjunct &&
+ return can_do_lazy_materialized() &&
!_should_not_lazy_materialized_column_ids.contains(column_id);
}
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index ae27cc1f956..40ab0c73768 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -269,6 +269,8 @@ Status
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
}
output_block->swap(mutable_block.to_block());
+ DCHECK_EQ(current_offset, output_block->rows());
+ COUNTER_UPDATE(_parent->_intermediate_rows_counter, current_offset);
if (is_mark_join && JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
bool ignore_null_map =
@@ -315,13 +317,13 @@ Status
ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
vectorized::ColumnPtr filter_ptr =
output_block->get_by_position(filter_column_id).column;
RETURN_IF_ERROR(
vectorized::Block::filter_block(output_block, filter_column_id,
column_to_keep));
+ if (!_parent_operator->can_do_lazy_materialized()) {
+ return Status::OK();
+ }
auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
vectorized::ColumnVector<unsigned int>&
row_indexs,
int column_offset, vectorized::Block*
source_block) {
- if (!_have_other_join_conjunct) {
- return;
- }
std::vector<int> column_ids;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i] &&
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp
b/be/src/pipeline/exec/join_probe_operator.cpp
index 3b0eb9ee99e..b99997c0e55 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -34,6 +34,8 @@ Status JoinProbeLocalState<SharedStateArg,
Derived>::init(RuntimeState* state,
_join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
_build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
_probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows",
TUnit::UNIT, 1);
+ _intermediate_rows_counter =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeIntermediateRows",
TUnit::UNIT, 1);
_finish_probe_phase_timer = ADD_TIMER(Base::profile(),
"FinishProbePhaseTime");
return Status::OK();
}
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index 6286b321661..705d319efb4 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -44,6 +44,7 @@ protected:
size_t _mark_column_id = -1;
RuntimeProfile::Counter* _probe_rows_counter = nullptr;
+ RuntimeProfile::Counter* _intermediate_rows_counter = nullptr;
RuntimeProfile::Counter* _join_filter_timer = nullptr;
RuntimeProfile::Counter* _build_output_block_timer = nullptr;
RuntimeProfile::Counter* _finish_probe_phase_timer = nullptr;
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index c602c6d82d4..05323d0a2ab 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -177,6 +177,7 @@ Status
NestedLoopJoinProbeLocalState::generate_join_block_data(RuntimeState* sta
_join_block, state->batch_size());
}
}
+ COUNTER_UPDATE(_probe_rows_counter, _join_block.rows());
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]