This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bad5649d6d5 [improvement](be) Optimize nested loop join
materialization (#62956)
bad5649d6d5 is described below
commit bad5649d6d51a747d204f74cd9120be3a4e88129
Author: Pxl <[email protected]>
AuthorDate: Wed May 20 15:12:19 2026 +0800
[improvement](be) Optimize nested loop join materialization (#62956)
This pull request introduces support for lazy materialization in Nested
Loop Join (NLJ) operators, enabling the engine to selectively
materialize only the necessary columns (slots) during query execution.
This optimization can significantly reduce memory usage and improve
performance, especially for wide tables or queries with many columns.
The changes span both the backend (BE) and frontend (FE), including new
APIs, data structures, and Thrift serialization.
---
.../operator/nested_loop_join_probe_operator.cpp | 642 ++++++++++++++++++++-
.../operator/nested_loop_join_probe_operator.h | 64 +-
.../glue/translator/PhysicalPlanTranslator.java | 35 +-
.../apache/doris/planner/NestedLoopJoinNode.java | 51 ++
gensrc/thrift/PlanNodes.thrift | 5 +
.../test_nestedloop_lazy_materialization.groovy | 256 ++++++++
6 files changed, 1040 insertions(+), 13 deletions(-)
diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.cpp
b/be/src/exec/operator/nested_loop_join_probe_operator.cpp
index 7a3be55cbb1..ccc7140c726 100644
--- a/be/src/exec/operator/nested_loop_join_probe_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_probe_operator.cpp
@@ -23,7 +23,9 @@
#include "common/exception.h"
#include "core/block/block.h"
#include "core/column/column.h"
+#include "core/column/column_const.h"
#include "core/column/column_filter_helper.h"
+#include "core/column/column_nullable.h"
#include "exec/operator/operator.h"
namespace doris {
@@ -31,6 +33,61 @@ class RuntimeState;
} // namespace doris
namespace doris {
+namespace {
+constexpr int8_t MARK_FALSE = 0;
+constexpr int8_t MARK_TRUE = 1;
+constexpr int8_t MARK_NULL = -1;
+
+ColumnPtr make_const_column_from_row(const ColumnWithTypeAndName& source,
size_t row, size_t rows) {
+ return ColumnConst::create(source.column->cut(row, 1), rows);
+}
+
+ColumnPtr align_eval_column_nullable(const ColumnWithTypeAndName& target,
const ColumnPtr& column) {
+ if (target.type->is_nullable() && !column->is_nullable()) {
+ return make_nullable(column);
+ }
+ return column;
+}
+
+void append_many_from_source(MutableColumnPtr& dst_column, const
ColumnWithTypeAndName& src_column,
+ size_t row, size_t rows) {
+ if (!src_column.column->is_nullable() && dst_column->is_nullable()) {
+ const auto origin_size = dst_column->size();
+ auto* nullable_column = assert_cast<ColumnNullable*>(dst_column.get());
+
nullable_column->get_nested_column_ptr()->insert_many_from(*src_column.column,
row, rows);
+
nullable_column->get_null_map_column().get_data().resize_fill(origin_size +
rows, 0);
+ } else {
+ dst_column->insert_many_from(*src_column.column, row, rows);
+ }
+}
+
+void append_filtered_from_source(MutableColumnPtr& dst_column,
+ const ColumnWithTypeAndName& src_column,
+ const IColumn::Filter& filter, size_t
selected_rows) {
+ if (selected_rows == 0) {
+ return;
+ }
+ auto filtered_column = src_column.column->filter(filter, selected_rows);
+ if (!src_column.column->is_nullable() && dst_column->is_nullable()) {
+ const auto origin_size = dst_column->size();
+ auto* nullable_column = assert_cast<ColumnNullable*>(dst_column.get());
+
nullable_column->get_nested_column_ptr()->insert_range_from(*filtered_column, 0,
+
selected_rows);
+
nullable_column->get_null_map_column().get_data().resize_fill(origin_size +
selected_rows,
+ 0);
+ } else {
+ dst_column->insert_range_from(*filtered_column, 0, selected_rows);
+ }
+}
+
+void append_mark_value(MutableColumnPtr& dst_column, int8_t mark_value) {
+ auto* nullable_column = assert_cast<ColumnNullable*>(dst_column.get());
+ auto& value_column =
assert_cast<ColumnUInt8&>(nullable_column->get_nested_column());
+ value_column.get_data().push_back(mark_value == MARK_TRUE);
+ nullable_column->get_null_map_column().get_data().push_back(mark_value ==
MARK_NULL);
+}
+} // namespace
+
NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState*
state,
OperatorXBase*
parent)
: JoinProbeLocalState<NestedLoopJoinSharedState,
NestedLoopJoinProbeLocalState>(state,
@@ -59,6 +116,10 @@ Status NestedLoopJoinProbeLocalState::open(RuntimeState*
state) {
for (size_t i = 0; i < _join_conjuncts.size(); i++) {
RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state,
_join_conjuncts[i]));
}
+ _mark_join_conjuncts.resize(p._mark_join_conjuncts.size());
+ for (size_t i = 0; i < _mark_join_conjuncts.size(); i++) {
+ RETURN_IF_ERROR(p._mark_join_conjuncts[i]->clone(state,
_mark_join_conjuncts[i]));
+ }
_construct_mutable_join_block();
return Status::OK();
}
@@ -177,6 +238,508 @@ void process_build_block(int64_t build_block_pos, Block&
block, const Block& bui
block.set_columns(std::move(dst_columns));
}
+void NestedLoopJoinProbeLocalState::_replace_lazy_placeholder_columns(size_t
rows) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ for (size_t i = 0; i < _join_block.columns(); ++i) {
+ if (p._materialize_column_ids.find(cast_set<int>(i)) !=
p._materialize_column_ids.end()) {
+ continue;
+ }
+ const auto& column = _join_block.get_by_position(i);
+ _join_block.replace_by_position(i,
+
column.type->create_column_const_with_default_value(rows));
+ }
+}
+
+Status NestedLoopJoinProbeLocalState::_append_lazy_rows(const IColumn::Filter&
filter,
+ size_t selected_rows,
bool fixed_side_probe,
+ int64_t fixed_side_pos,
+ const Block&
probe_block,
+ const Block&
build_block) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ const size_t old_rows = _join_block.rows();
+ const size_t new_rows = old_rows + selected_rows;
+
+ auto dst_columns = _join_block.mutate_columns();
+ for (int column_id : p._materialize_column_ids) {
+ const auto column_idx = cast_set<size_t>(column_id);
+ if (column_idx < p._num_probe_side_columns) {
+ const auto& src_column = probe_block.get_by_position(column_idx);
+ if (fixed_side_probe) {
+ append_many_from_source(dst_columns[column_idx], src_column,
fixed_side_pos,
+ selected_rows);
+ } else {
+ append_filtered_from_source(dst_columns[column_idx],
src_column, filter,
+ selected_rows);
+ }
+ } else {
+ const auto build_column_idx = column_idx -
p._num_probe_side_columns;
+ const auto& src_column =
build_block.get_by_position(build_column_idx);
+ if (fixed_side_probe) {
+ append_filtered_from_source(dst_columns[column_idx],
src_column, filter,
+ selected_rows);
+ } else {
+ append_many_from_source(dst_columns[column_idx], src_column,
fixed_side_pos,
+ selected_rows);
+ }
+ }
+ }
+ _join_block.set_columns(std::move(dst_columns));
+ _replace_lazy_placeholder_columns(new_rows);
+ DCHECK_EQ(_join_block.rows(), new_rows);
+ return Status::OK();
+}
+
+Status
NestedLoopJoinProbeLocalState::_append_lazy_probe_row_with_build_defaults(
+ const Block& probe_block, int64_t probe_row_pos) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ const size_t new_rows = _join_block.rows() + 1;
+
+ auto dst_columns = _join_block.mutate_columns();
+ for (int column_id : p._materialize_column_ids) {
+ const auto column_idx = cast_set<size_t>(column_id);
+ if (column_idx < p._num_probe_side_columns) {
+ const auto& src_column = probe_block.get_by_position(column_idx);
+ append_many_from_source(dst_columns[column_idx], src_column,
probe_row_pos, 1);
+ } else {
+ dst_columns[column_idx]->insert_many_defaults(1);
+ }
+ }
+ _join_block.set_columns(std::move(dst_columns));
+ _replace_lazy_placeholder_columns(new_rows);
+ DCHECK_EQ(_join_block.rows(), new_rows);
+ return Status::OK();
+}
+
+Status
NestedLoopJoinProbeLocalState::_append_lazy_mark_probe_row_with_build_defaults(
+ const Block& probe_block, int64_t probe_row_pos, int8_t mark_value) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ const size_t mark_column_id = p._num_probe_side_columns +
p._num_build_side_columns;
+ const size_t new_rows = _join_block.rows() + 1;
+
+ auto dst_columns = _join_block.mutate_columns();
+ for (int column_id : p._materialize_column_ids) {
+ const auto column_idx = cast_set<size_t>(column_id);
+ if (column_idx < p._num_probe_side_columns) {
+ const auto& src_column = probe_block.get_by_position(column_idx);
+ append_many_from_source(dst_columns[column_idx], src_column,
probe_row_pos, 1);
+ } else if (column_idx == mark_column_id) {
+ append_mark_value(dst_columns[column_idx], mark_value);
+ } else {
+ dst_columns[column_idx]->insert_many_defaults(1);
+ }
+ }
+ _join_block.set_columns(std::move(dst_columns));
+ _replace_lazy_placeholder_columns(new_rows);
+ DCHECK_EQ(_join_block.rows(), new_rows);
+ return Status::OK();
+}
+
+Status
NestedLoopJoinProbeLocalState::_append_lazy_build_rows_with_probe_defaults(
+ const Block& build_block, const IColumn::Filter& filter, size_t
selected_rows) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ const size_t new_rows = _join_block.rows() + selected_rows;
+
+ auto dst_columns = _join_block.mutate_columns();
+ for (int column_id : p._materialize_column_ids) {
+ const auto column_idx = cast_set<size_t>(column_id);
+ if (column_idx < p._num_probe_side_columns) {
+ dst_columns[column_idx]->insert_many_defaults(selected_rows);
+ } else {
+ const auto build_column_idx = column_idx -
p._num_probe_side_columns;
+ const auto& src_column =
build_block.get_by_position(build_column_idx);
+ append_filtered_from_source(dst_columns[column_idx], src_column,
filter, selected_rows);
+ }
+ }
+ _join_block.set_columns(std::move(dst_columns));
+ _replace_lazy_placeholder_columns(new_rows);
+ DCHECK_EQ(_join_block.rows(), new_rows);
+ return Status::OK();
+}
+
+Status NestedLoopJoinProbeLocalState::_finalize_lazy_probe_row(RuntimeState*
state,
+ const Block&
probe_block,
+ int64_t
probe_row_pos,
+ bool* consumed)
{
+ *consumed = true;
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ if ((!p._enable_lazy_probe_finalize && !p._enable_lazy_mark_finalize) ||
probe_row_pos < 0 ||
+ cast_set<size_t>(probe_row_pos) >= probe_block.rows()) {
+ return Status::OK();
+ }
+
+ if (p._enable_lazy_mark_finalize) {
+ if (_join_block.rows() >= state->batch_size()) {
+ *consumed = false;
+ return Status::OK();
+ }
+ int8_t mark_value = _cur_probe_row_mark_flags[probe_row_pos];
+ if (p._join_op == TJoinOp::LEFT_ANTI_JOIN && mark_value != MARK_NULL) {
+ mark_value = mark_value == MARK_TRUE ? MARK_FALSE : MARK_TRUE;
+ }
+
RETURN_IF_ERROR(_append_lazy_mark_probe_row_with_build_defaults(probe_block,
probe_row_pos,
+
mark_value));
+ return Status::OK();
+ }
+
+ const bool matched = _cur_probe_row_visited_flags[probe_row_pos];
+ bool should_output = false;
+ if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN) {
+ should_output = !matched;
+ } else if (p._join_op == TJoinOp::LEFT_SEMI_JOIN) {
+ should_output = matched;
+ } else if (p._join_op == TJoinOp::LEFT_ANTI_JOIN ||
+ p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ should_output = !matched;
+ }
+ if (!should_output) {
+ return Status::OK();
+ }
+ if (_join_block.rows() >= state->batch_size()) {
+ *consumed = false;
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(_append_lazy_probe_row_with_build_defaults(probe_block,
probe_row_pos));
+ return Status::OK();
+}
+
+Status NestedLoopJoinProbeLocalState::_finalize_lazy_build_side(RuntimeState*
state) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ if (!p._enable_lazy_build_finalize) {
+ return Status::OK();
+ }
+
+ while (_join_block.rows() < state->batch_size() &&
+ _output_null_idx_build_side < _shared_state->build_blocks.size()) {
+ const auto& build_block =
_shared_state->build_blocks[_output_null_idx_build_side];
+ const auto* __restrict visited_flags =
+ assert_cast<ColumnUInt8*>(
+
_shared_state->build_side_visited_flags[_output_null_idx_build_side].get())
+ ->get_data()
+ .data();
+ const size_t rows = build_block.rows();
+ const size_t output_capacity = state->batch_size() -
_join_block.rows();
+
+ IColumn::Filter filter(rows, 0);
+ auto* __restrict filter_data = filter.data();
+ size_t selected_rows = 0;
+ size_t row_idx = _output_null_row_idx_build_side;
+ for (; row_idx < rows && selected_rows < output_capacity; ++row_idx) {
+ const bool matched = visited_flags[row_idx] != 0;
+ const bool selected = p._join_op == TJoinOp::RIGHT_SEMI_JOIN ?
matched : !matched;
+ filter_data[row_idx] = selected;
+ selected_rows += selected;
+ }
+
+ if (selected_rows > 0) {
+
RETURN_IF_ERROR(_append_lazy_build_rows_with_probe_defaults(build_block, filter,
+
selected_rows));
+ }
+ if (row_idx == rows) {
+ ++_output_null_idx_build_side;
+ _output_null_row_idx_build_side = 0;
+ } else {
+ _output_null_row_idx_build_side = row_idx;
+ }
+ }
+ return Status::OK();
+}
+
+Status NestedLoopJoinProbeLocalState::_advance_lazy_probe_row(RuntimeState*
state,
+ const Block&
probe_block) {
+ if (_current_build_pos == _shared_state->build_blocks.size() &&
+ _probe_block_pos < probe_block.rows()) {
+ bool consumed = true;
+ RETURN_IF_ERROR(_finalize_lazy_probe_row(state, probe_block,
_probe_block_pos, &consumed));
+ if (!consumed) {
+ return Status::OK();
+ }
+ }
+
+ if (_probe_block_pos < probe_block.rows()) {
+ _probe_side_process_count++;
+ }
+
+ _reset_with_next_probe_row();
+ if (_probe_block_pos < probe_block.rows()) {
+ return Status::OK();
+ }
+
+ if (_shared_state->probe_side_eos) {
+ _matched_rows_done = true;
+ } else {
+ _need_more_input_data = true;
+ }
+ return Status::OK();
+}
+
+void NestedLoopJoinProbeLocalState::_append_lazy_probe_eval_columns(
+ ColumnsWithTypeAndName& eval_columns, const Block& probe_block, bool
fixed_side_probe,
+ int64_t fixed_side_pos, size_t rows) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
+ const auto& block_column = _join_block.get_by_position(i);
+ const auto& src_column = probe_block.get_by_position(i);
+ if (p._lazy_eval_column_ids.find(cast_set<int>(i)) ==
p._lazy_eval_column_ids.end()) {
+ eval_columns.emplace_back(
+
block_column.type->create_column_const_with_default_value(rows),
+ block_column.type, block_column.name);
+ } else if (fixed_side_probe) {
+ eval_columns.emplace_back(
+ align_eval_column_nullable(
+ block_column,
+ make_const_column_from_row(src_column,
fixed_side_pos, rows)),
+ block_column.type, block_column.name);
+ } else {
+ eval_columns.emplace_back(align_eval_column_nullable(block_column,
src_column.column),
+ block_column.type, block_column.name);
+ }
+ }
+}
+
+void NestedLoopJoinProbeLocalState::_append_lazy_build_eval_columns(
+ ColumnsWithTypeAndName& eval_columns, const Block& build_block, bool
fixed_side_probe,
+ int64_t fixed_side_pos, size_t rows) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ for (size_t i = 0; i < p._num_build_side_columns; ++i) {
+ const auto column_idx = p._num_probe_side_columns + i;
+ const auto& block_column = _join_block.get_by_position(column_idx);
+ const auto& src_column = build_block.get_by_position(i);
+ if (p._lazy_eval_column_ids.find(cast_set<int>(column_idx)) ==
+ p._lazy_eval_column_ids.end()) {
+ eval_columns.emplace_back(
+
block_column.type->create_column_const_with_default_value(rows),
+ block_column.type, block_column.name);
+ } else if (fixed_side_probe) {
+ eval_columns.emplace_back(align_eval_column_nullable(block_column,
src_column.column),
+ block_column.type, block_column.name);
+ } else {
+ eval_columns.emplace_back(
+ align_eval_column_nullable(
+ block_column,
+ make_const_column_from_row(src_column,
fixed_side_pos, rows)),
+ block_column.type, block_column.name);
+ }
+ }
+}
+
+bool
NestedLoopJoinProbeLocalState::_should_delay_lazy_probe_build_block(size_t
candidate_rows,
+
size_t batch_size) const {
+ return _lazy_should_output_matched_rows() && _join_block.rows() +
candidate_rows > batch_size;
+}
+
+bool NestedLoopJoinProbeLocalState::_lazy_should_output_matched_rows() const {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ return p._join_op == TJoinOp::INNER_JOIN || p._join_op ==
TJoinOp::CROSS_JOIN ||
+ p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN ||
+ p._join_op == TJoinOp::FULL_OUTER_JOIN;
+}
+
+void NestedLoopJoinProbeLocalState::_mark_lazy_build_rows_visited(size_t
build_block_idx,
+ const
IColumn::Filter& filter) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ if (!p._enable_lazy_build_finalize) {
+ return;
+ }
+
+ auto& build_side_flag = assert_cast<ColumnUInt8*>(
+
_shared_state->build_side_visited_flags[build_block_idx].get())
+ ->get_data();
+ auto* __restrict build_side_flag_data = build_side_flag.data();
+ const auto* __restrict filter_data = filter.data();
+ DCHECK_EQ(build_side_flag.size(), filter.size());
+ for (size_t i = 0; i < filter.size(); ++i) {
+ build_side_flag_data[i] |= filter_data[i];
+ }
+}
+
+void NestedLoopJoinProbeLocalState::_update_lazy_mark_join_state(
+ const IColumn::Filter& mark_filter, const ColumnUInt8& mark_null_map,
+ const IColumn::Filter& other_filter) {
+ auto& mark_state = _cur_probe_row_mark_flags[_probe_block_pos];
+ DCHECK_EQ(mark_filter.size(), mark_null_map.size());
+ DCHECK_EQ(mark_filter.size(), other_filter.size());
+ if (mark_state == MARK_TRUE) {
+ return;
+ }
+
+ const auto* __restrict mark_filter_data = mark_filter.data();
+ const auto* __restrict mark_null_data = mark_null_map.get_data().data();
+ const auto* __restrict other_filter_data = other_filter.data();
+
+ for (size_t i = 0; i < mark_filter.size(); ++i) {
+ if (!other_filter_data[i]) {
+ continue;
+ }
+ if (mark_null_data[i]) {
+ mark_state = MARK_NULL;
+ } else if (mark_filter_data[i]) {
+ mark_state = MARK_TRUE;
+ return;
+ }
+ }
+}
+
+Status NestedLoopJoinProbeLocalState::_process_lazy_probe_build_block(Block*
probe_block,
+ const
Block& build_block,
+ size_t
build_block_idx,
+ bool
ignore_null) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ // A TRUE mark is terminal for lazy MARK LEFT SEMI/ANTI joins.
+ if (p._enable_lazy_mark_finalize &&
_cur_probe_row_mark_flags[_probe_block_pos] == MARK_TRUE) {
+ return Status::OK();
+ }
+
+ const size_t candidate_rows = build_block.rows();
+ if (candidate_rows == 0) {
+ return Status::OK();
+ }
+
+ ColumnsWithTypeAndName eval_columns;
+ eval_columns.reserve(_join_block.columns());
+ _append_lazy_probe_eval_columns(eval_columns, *probe_block, true,
_probe_block_pos,
+ candidate_rows);
+ _append_lazy_build_eval_columns(eval_columns, build_block, true, 0,
candidate_rows);
+ Block eval_block(std::move(eval_columns));
+
+ IColumn::Filter filter(candidate_rows, 1);
+ bool can_filter_all = false;
+ {
+ SCOPED_TIMER(_join_conjuncts_evaluation_timer);
+ RETURN_IF_ERROR(VExprContext::execute_conjuncts(_join_conjuncts,
nullptr, ignore_null,
+ &eval_block, &filter,
&can_filter_all));
+ }
+ if (can_filter_all) {
+ return Status::OK();
+ }
+
+ const size_t selected_rows =
+ candidate_rows -
+ simd::count_zero_num(reinterpret_cast<int8_t*>(filter.data()),
candidate_rows);
+ DCHECK_GT(selected_rows, 0);
+
+ if (p._enable_lazy_mark_finalize) {
+ if (_mark_join_conjuncts.empty()) {
+ _cur_probe_row_mark_flags[_probe_block_pos] = MARK_TRUE;
+ } else {
+ IColumn::Filter mark_filter(candidate_rows, 1);
+ auto mark_null_map = ColumnUInt8::create(candidate_rows, 0);
+
RETURN_IF_ERROR(VExprContext::execute_conjuncts(_mark_join_conjuncts,
&eval_block,
+ *mark_null_map,
mark_filter));
+ _update_lazy_mark_join_state(mark_filter, *mark_null_map, filter);
+ }
+ } else if (p._enable_lazy_probe_finalize) {
+ _cur_probe_row_visited_flags[_probe_block_pos] = true;
+ }
+ _mark_lazy_build_rows_visited(build_block_idx, filter);
+ if (_lazy_should_output_matched_rows()) {
+ RETURN_IF_ERROR(_append_lazy_rows(filter, selected_rows, true,
_probe_block_pos,
+ *probe_block, build_block));
+ }
+ return Status::OK();
+}
+
+Status
NestedLoopJoinProbeLocalState::_generate_lazy_block_base_probe(RuntimeState*
state,
+ Block*
probe_block,
+ bool
ignore_null) {
+ auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
+ while (_join_block.rows() < state->batch_size()) {
+ while (_current_build_pos == _shared_state->build_blocks.size() ||
+ _probe_block_pos == probe_block->rows()) {
+ RETURN_IF_ERROR(_advance_lazy_probe_row(state, *probe_block));
+ if (_join_block.rows() >= state->batch_size()) {
+ break;
+ }
+ if (_probe_block_pos >= probe_block->rows()) {
+ break;
+ }
+ }
+
+ if (_join_block.rows() >= state->batch_size() || _matched_rows_done ||
+ _need_more_input_data) {
+ break;
+ }
+
+ const size_t build_block_idx = _current_build_pos++;
+ const auto& build_block = _shared_state->build_blocks[build_block_idx];
+ if (_should_delay_lazy_probe_build_block(build_block.rows(),
state->batch_size())) {
+ --_current_build_pos;
+ break;
+ }
+ RETURN_IF_ERROR(_process_lazy_probe_build_block(probe_block,
build_block, build_block_idx,
+ ignore_null));
+ if (p._enable_lazy_mark_finalize &&
+ _cur_probe_row_mark_flags[_probe_block_pos] == MARK_TRUE) {
+ _current_build_pos = _shared_state->build_blocks.size();
+ }
+ }
+ return Status::OK();
+}
+
+Status
NestedLoopJoinProbeLocalState::_generate_lazy_block_base_build(RuntimeState*
state,
+ Block*
probe_block) {
+ DCHECK(use_generate_block_base_build());
+ const auto& build_block = _shared_state->build_blocks[0];
+ const size_t build_rows = build_block.rows();
+ const auto probe_rows = static_cast<int>(probe_block->rows());
+
+ if (probe_rows == 0) {
+ if (_shared_state->probe_side_eos) {
+ _matched_rows_done = true;
+ } else {
+ _need_more_input_data = true;
+ }
+ return Status::OK();
+ }
+
+ size_t processed_rows = 0;
+ while (processed_rows + probe_rows <= state->batch_size()) {
+ if (_probe_block_pos == probe_rows) {
+ _current_build_row_pos++;
+ _probe_block_pos = 0;
+
+ if (_current_build_row_pos >= build_rows) {
+ if (_shared_state->probe_side_eos) {
+ _matched_rows_done = true;
+ } else {
+ _need_more_input_data = true;
+ _current_build_row_pos = 0;
+ }
+ break;
+ }
+ }
+
+ if (_matched_rows_done || _need_more_input_data) {
+ break;
+ }
+
+ processed_rows += probe_rows;
+ ColumnsWithTypeAndName eval_columns;
+ eval_columns.reserve(_join_block.columns());
+ _append_lazy_probe_eval_columns(eval_columns, *probe_block, false, 0,
probe_rows);
+ _append_lazy_build_eval_columns(eval_columns, build_block, false,
_current_build_row_pos,
+ probe_rows);
+ Block eval_block(std::move(eval_columns));
+
+ IColumn::Filter filter(probe_rows, 1);
+ bool can_filter_all = false;
+ {
+ SCOPED_TIMER(_join_conjuncts_evaluation_timer);
+ RETURN_IF_ERROR(VExprContext::execute_conjuncts(_join_conjuncts,
nullptr, false,
+ &eval_block,
&filter, &can_filter_all));
+ }
+ if (!can_filter_all) {
+ const size_t selected_rows =
+ probe_rows -
+
simd::count_zero_num(reinterpret_cast<int8_t*>(filter.data()), probe_rows);
+ DCHECK_GT(selected_rows, 0);
+ RETURN_IF_ERROR(_append_lazy_rows(filter, selected_rows, false,
_current_build_row_pos,
+ *probe_block, build_block));
+ }
+ _probe_block_pos = probe_rows;
+ }
+ return Status::OK();
+}
+
template <bool set_build_side_flag, bool set_probe_side_flag>
void NestedLoopJoinProbeLocalState::_generate_block_base_probe(RuntimeState*
state,
Block*
probe_block) {
@@ -317,6 +880,17 @@ Status
NestedLoopJoinProbeLocalState::generate_inner_join_block_data(RuntimeStat
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
auto* probe_block = _child_block.get();
+ if (p._enable_lazy_materialize) {
+ if (!_matched_rows_done && !_need_more_input_data) {
+ if (use_generate_block_base_build()) {
+ RETURN_IF_ERROR(_generate_lazy_block_base_build(state,
probe_block));
+ } else {
+ RETURN_IF_ERROR(_generate_lazy_block_base_probe(state,
probe_block, false));
+ }
+ }
+ return Status::OK();
+ }
+
if (!_matched_rows_done && !_need_more_input_data) {
if (use_generate_block_base_build()) {
_generate_block_base_build(state, probe_block);
@@ -348,6 +922,16 @@ Status
NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
auto* probe_block = _child_block.get();
+ if (p._enable_lazy_materialize) {
+ if (!_matched_rows_done && !_need_more_input_data) {
+ RETURN_IF_ERROR(_generate_lazy_block_base_probe(state,
probe_block, ignore_null));
+ }
+ if (_matched_rows_done) {
+ RETURN_IF_ERROR(_finalize_lazy_build_side(state));
+ }
+ return Status::OK();
+ }
+
if (!_matched_rows_done && !_need_more_input_data) {
// We should try to join rows if there still are some rows from probe
side.
// _probe_offset_stack and _build_offset_stack use u16 for storage
@@ -394,6 +978,7 @@ Status
NestedLoopJoinProbeLocalState::generate_other_join_block_data(RuntimeStat
}
template <bool BuildSide, bool IsSemi>
+//
NOLINTNEXTLINE(readability-function-size,readability-function-cognitive-complexity):
existing finalization handles multiple join variants.
void NestedLoopJoinProbeLocalState::_finalize_current_phase(Block& block,
size_t batch_size) {
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
auto dst_columns = block.mutate_columns();
@@ -545,7 +1130,11 @@
NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con
: JoinProbeOperatorX<NestedLoopJoinProbeLocalState>(pool, tnode,
operator_id, descs),
_is_output_probe_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only
&&
tnode.nested_loop_join_node.is_output_left_side_only),
- _old_version_flag(!tnode.__isset.nested_loop_join_node) {
+ _has_materialized_slot_ids(tnode.__isset.nested_loop_join_node &&
+
tnode.nested_loop_join_node.__isset.materialized_slot_ids),
+ _materialized_slot_ids(_has_materialized_slot_ids
+ ?
tnode.nested_loop_join_node.materialized_slot_ids
+ : std::vector<SlotId> {}) {
_keep_origin = _is_output_probe_side_only;
}
@@ -562,6 +1151,12 @@ Status NestedLoopJoinProbeOperatorX::init(const
TPlanNode& tnode, RuntimeState*
VExpr::create_expr_tree(tnode.nested_loop_join_node.vjoin_conjunct, context));
_join_conjuncts.emplace_back(context);
}
+ if (tnode.nested_loop_join_node.__isset.mark_join_conjuncts &&
+ !tnode.nested_loop_join_node.mark_join_conjuncts.empty()) {
+
RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.nested_loop_join_node.mark_join_conjuncts,
+ _mark_join_conjuncts));
+ DORIS_CHECK(_is_mark_join);
+ }
return Status::OK();
}
@@ -571,9 +1166,46 @@ Status
NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) {
for (auto& conjunct : _join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
+ for (auto& conjunct : _mark_join_conjuncts) {
+ RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
+ }
_num_probe_side_columns = _child->row_desc().num_materialized_slots();
_num_build_side_columns =
_build_side_child->row_desc().num_materialized_slots();
- return VExpr::open(_join_conjuncts, state);
+ for (const auto& conjunct : _join_conjuncts) {
+ conjunct->root()->collect_slot_column_ids(_lazy_eval_column_ids);
+ }
+ for (const auto& conjunct : _mark_join_conjuncts) {
+ conjunct->root()->collect_slot_column_ids(_lazy_eval_column_ids);
+ }
+ if (_has_materialized_slot_ids) {
+ for (const auto slot_id : _materialized_slot_ids) {
+ const int column_id =
intermediate_row_desc().get_column_id(slot_id);
+ DORIS_CHECK(column_id >= 0);
+ _materialize_column_ids.insert(column_id);
+ }
+ }
+ _enable_lazy_mark_finalize = _is_mark_join && (_join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
+ _join_op ==
TJoinOp::LEFT_ANTI_JOIN);
+ if (_enable_lazy_mark_finalize) {
+ _materialize_column_ids.insert(
+ cast_set<int>(_num_probe_side_columns +
_num_build_side_columns));
+ }
+ _enable_lazy_probe_finalize =
+ _join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
+ _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::FULL_OUTER_JOIN;
+ _enable_lazy_build_finalize =
+ _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::RIGHT_SEMI_JOIN ||
+ _join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN;
+ bool supported_lazy_join = _join_op == TJoinOp::INNER_JOIN || _join_op ==
TJoinOp::CROSS_JOIN ||
+ _enable_lazy_probe_finalize ||
_enable_lazy_build_finalize ||
+ _enable_lazy_mark_finalize;
+ _enable_lazy_materialize = _has_materialized_slot_ids &&
!_is_output_probe_side_only &&
+ (!_is_mark_join || _enable_lazy_mark_finalize)
&&
+ supported_lazy_join && !projections().empty() &&
+ &projections_row_desc() ==
&intermediate_row_desc();
+ RETURN_IF_ERROR(VExpr::open(_join_conjuncts, state));
+ return VExpr::open(_mark_join_conjuncts, state);
}
bool NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state)
const {
@@ -592,6 +1224,11 @@ Status
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo
local_state._cur_probe_row_visited_flags.resize(block->rows());
std::fill(local_state._cur_probe_row_visited_flags.begin(),
local_state._cur_probe_row_visited_flags.end(), 0);
+ if (_enable_lazy_mark_finalize) {
+ local_state._cur_probe_row_mark_flags.resize(block->rows());
+ std::fill(local_state._cur_probe_row_mark_flags.begin(),
+ local_state._cur_probe_row_mark_flags.end(), MARK_FALSE);
+ }
local_state._probe_block_pos = 0;
local_state._need_more_input_data = false;
local_state._shared_state->probe_side_eos = eos;
@@ -621,6 +1258,7 @@ Status
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, Block* blo
return Status::OK();
}
+// NOLINTNEXTLINE(readability-function-cognitive-complexity): existing pull
dispatch handles all NLJ variants.
Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, Block* block,
bool* eos) const {
auto& local_state = get_local_state(state);
if (_is_output_probe_side_only) {
diff --git a/be/src/exec/operator/nested_loop_join_probe_operator.h
b/be/src/exec/operator/nested_loop_join_probe_operator.h
index aea6972f428..9313331fc69 100644
--- a/be/src/exec/operator/nested_loop_join_probe_operator.h
+++ b/be/src/exec/operator/nested_loop_join_probe_operator.h
@@ -17,12 +17,13 @@
#pragma once
-#include <stdint.h>
-
#include <cstdint>
+#include <set>
+#include <vector>
#include "common/cast_set.h"
#include "common/status.h"
+#include "core/column/column.h"
#include "exec/operator/join_probe_operator.h"
#include "exec/operator/operator.h"
#include "util/simd/bits.h"
@@ -77,6 +78,39 @@ public:
private:
// Whether to generate data based on the build side
bool use_generate_block_base_build() const;
+ Status _advance_lazy_probe_row(RuntimeState* state, const Block&
probe_block);
+ Status _generate_lazy_block_base_probe(RuntimeState* state, Block*
probe_block,
+ bool ignore_null);
+ Status _generate_lazy_block_base_build(RuntimeState* state, Block*
probe_block);
+ bool _should_delay_lazy_probe_build_block(size_t candidate_rows, size_t
batch_size) const;
+ bool _lazy_should_output_matched_rows() const;
+ Status _process_lazy_probe_build_block(Block* probe_block, const Block&
build_block,
+ size_t build_block_idx, bool
ignore_null);
+ void _mark_lazy_build_rows_visited(size_t build_block_idx, const
IColumn::Filter& filter);
+ void _update_lazy_mark_join_state(const IColumn::Filter& mark_filter,
+ const ColumnUInt8& mark_null_map,
+ const IColumn::Filter& other_filter);
+ Status _append_lazy_rows(const IColumn::Filter& filter, size_t
selected_rows,
+ bool fixed_side_probe, int64_t fixed_side_pos,
+ const Block& probe_block, const Block&
build_block);
+ Status _append_lazy_probe_row_with_build_defaults(const Block& probe_block,
+ int64_t probe_row_pos);
+ Status _append_lazy_mark_probe_row_with_build_defaults(const Block&
probe_block,
+ int64_t
probe_row_pos,
+ int8_t mark_value);
+ Status _finalize_lazy_probe_row(RuntimeState* state, const Block&
probe_block,
+ int64_t probe_row_pos, bool* consumed);
+ Status _append_lazy_build_rows_with_probe_defaults(const Block&
build_block,
+ const IColumn::Filter&
filter,
+ size_t selected_rows);
+ Status _finalize_lazy_build_side(RuntimeState* state);
+ void _replace_lazy_placeholder_columns(size_t rows);
+ void _append_lazy_probe_eval_columns(ColumnsWithTypeAndName& eval_columns,
+ const Block& probe_block, bool
fixed_side_probe,
+ int64_t fixed_side_pos, size_t rows);
+ void _append_lazy_build_eval_columns(ColumnsWithTypeAndName& eval_columns,
+ const Block& build_block, bool
fixed_side_probe,
+ int64_t fixed_side_pos, size_t rows);
friend class NestedLoopJoinProbeOperatorX;
void _update_additional_flags(Block* block);
@@ -139,7 +173,7 @@ private:
template <bool SetBuildSideFlag, bool SetProbeSideFlag, bool IgnoreNull>
Status _do_filtering_and_update_visited_flags(Block* block, bool
materialize) {
// The number of columns will not exceed the range of u32.
- uint32_t column_to_keep = cast_set<uint32_t>(block->columns());
+ auto column_to_keep = cast_set<uint32_t>(block->columns());
// If we need to set visited flags for build side,
// 1. Execute conjuncts and get a column with bool type to do
filtering.
// 2. Use bool column to update build-side visited flags.
@@ -205,6 +239,7 @@ private:
bool _need_more_input_data = true;
// Visited flags for current row in probe side.
std::vector<int8_t> _cur_probe_row_visited_flags;
+ std::vector<int8_t> _cur_probe_row_mark_flags;
size_t _current_build_pos = 0;
size_t _current_build_row_pos =
0; // current row pos in build block, used by
_generate_block_base_build
@@ -212,7 +247,9 @@ private:
std::stack<uint16_t> _build_offset_stack;
std::stack<uint16_t> _probe_offset_stack;
uint64_t _output_null_idx_build_side = 0;
+ uint64_t _output_null_row_idx_build_side = 0;
VExprContextSPtrs _join_conjuncts;
+ VExprContextSPtrs _mark_join_conjuncts;
RuntimeProfile::Counter* _loop_join_timer = nullptr;
RuntimeProfile::Counter* _output_temp_blocks_timer = nullptr;
@@ -232,7 +269,8 @@ public:
Status push(RuntimeState* state, Block* input_block, bool eos) const
override;
Status pull(doris::RuntimeState* state, Block* output_block, bool* eos)
const override;
const RowDescriptor& intermediate_row_desc() const override {
- return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
+ DORIS_CHECK(_intermediate_row_desc != nullptr);
+ return *_intermediate_row_desc;
}
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
@@ -245,9 +283,11 @@ public:
}
const RowDescriptor& row_desc() const override {
- return _old_version_flag
- ? (_output_row_descriptor ? *_output_row_descriptor :
_row_descriptor)
- : (_output_row_descriptor ? *_output_row_descriptor :
*_output_row_desc);
+ if (_output_row_descriptor) {
+ return *_output_row_descriptor;
+ }
+ DORIS_CHECK(_output_row_desc != nullptr);
+ return *_output_row_desc;
}
bool need_more_input_data(RuntimeState* state) const override;
@@ -256,9 +296,17 @@ private:
friend class NestedLoopJoinProbeLocalState;
bool _is_output_probe_side_only;
VExprContextSPtrs _join_conjuncts;
+ VExprContextSPtrs _mark_join_conjuncts;
size_t _num_probe_side_columns = 0;
size_t _num_build_side_columns = 0;
- const bool _old_version_flag;
+ bool _has_materialized_slot_ids = false;
+ std::vector<SlotId> _materialized_slot_ids;
+ bool _enable_lazy_materialize = false;
+ bool _enable_lazy_probe_finalize = false;
+ bool _enable_lazy_build_finalize = false;
+ bool _enable_lazy_mark_finalize = false;
+ std::set<int> _lazy_eval_column_ids;
+ std::set<int> _materialize_column_ids;
};
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b2ca6cfa2b6..7b01ce6c6b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1970,6 +1970,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.flatMap(e -> e.getInputSlots().stream())
.map(SlotReference.class::cast)
.forEach(s -> outputSlotReferenceMap.put(s.getExprId(), s));
+ Map<ExprId, SlotReference> materializedSlotReferenceMap =
Maps.newHashMap(outputSlotReferenceMap);
+ nestedLoopJoinNode.enableMaterializedSlotIds();
List<SlotReference> outputSlotReferences =
Stream.concat(leftTuples.stream(), rightTuples.stream())
.map(TupleDescriptor::getSlots)
.flatMap(Collection::stream)
@@ -1983,17 +1985,32 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
for (SlotDescriptor leftSlotDescriptor : leftSlotDescriptors) {
SlotReference sf =
leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor,
sf);
+ if (materializedSlotReferenceMap.get(sf.getExprId()) != null) {
+ nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(sd.getId());
+ }
+ nestedLoopJoinNode.getMaterializedSlotIdMap().put(sf.getExprId(),
sd.getId());
leftIntermediateSlotDescriptor.add(sd);
}
for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
SlotReference sf =
rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor,
sf);
+ if (materializedSlotReferenceMap.get(sf.getExprId()) != null) {
+ nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(sd.getId());
+ }
+ nestedLoopJoinNode.getMaterializedSlotIdMap().put(sf.getExprId(),
sd.getId());
rightIntermediateSlotDescriptor.add(sd);
}
if (nestedLoopJoin.getMarkJoinSlotReference().isPresent()) {
-
outputSlotReferences.add(nestedLoopJoin.getMarkJoinSlotReference().get());
- context.createSlotDesc(intermediateDescriptor,
nestedLoopJoin.getMarkJoinSlotReference().get());
+ SlotReference markJoinSlotReference =
nestedLoopJoin.getMarkJoinSlotReference().get();
+ outputSlotReferences.add(markJoinSlotReference);
+ SlotDescriptor markJoinSlotDescriptor =
context.createSlotDesc(intermediateDescriptor,
+ markJoinSlotReference);
+ if
(materializedSlotReferenceMap.get(markJoinSlotReference.getExprId()) != null) {
+
nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(markJoinSlotDescriptor.getId());
+ }
+
nestedLoopJoinNode.getMaterializedSlotIdMap().put(markJoinSlotReference.getExprId(),
+ markJoinSlotDescriptor.getId());
}
// set slots as nullable for outer join
@@ -2029,7 +2046,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
nestedLoopJoinNode.setJoinConjuncts(joinConjuncts);
- if (!nestedLoopJoin.getOtherJoinConjuncts().isEmpty()) {
+ if (!nestedLoopJoin.getMarkJoinConjuncts().isEmpty()) {
List<Expr> markJoinConjuncts =
nestedLoopJoin.getMarkJoinConjuncts().stream()
.map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
nestedLoopJoinNode.setMarkJoinConjuncts(markJoinConjuncts);
@@ -2257,6 +2274,18 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
break;
}
}
+ } else if (joinNode instanceof NestedLoopJoinNode) {
+ NestedLoopJoinNode nestedLoopJoinNode = (NestedLoopJoinNode)
joinNode;
+ nestedLoopJoinNode.getMaterializedSlotIds().clear();
+ nestedLoopJoinNode.enableMaterializedSlotIds();
+ Set<ExprId> requiredExprIds = Sets.newHashSet();
+ requiredSlotIdSet.forEach(e ->
requiredExprIds.add(context.findExprId(e)));
+ for (ExprId exprId : requiredExprIds) {
+ SlotId slotId =
nestedLoopJoinNode.getMaterializedSlotIdMap().get(exprId);
+ if (slotId != null) {
+
nestedLoopJoinNode.addSlotIdToMaterializedSlotIds(slotId);
+ }
+ }
}
return inputFragment;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index daf65dc74d5..ff788e4c5f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -23,12 +23,20 @@ import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
+import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNestedLoopJoinNode;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Nested loop join between left child and right child.
@@ -52,6 +60,12 @@ public class NestedLoopJoinNode extends JoinNodeBase {
private List<Expr> markJoinConjuncts;
+ private final Set<SlotId> materializedSlotIds = Sets.newHashSet();
+
+ private final Map<ExprId, SlotId> materializedSlotIdMap =
Maps.newHashMap();
+
+ private boolean hasMaterializedSlotIds = false;
+
public static boolean canParallelize(JoinOperator joinOp) {
return joinOp == JoinOperator.CROSS_JOIN || joinOp ==
JoinOperator.INNER_JOIN
|| joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp ==
JoinOperator.LEFT_SEMI_JOIN
@@ -69,6 +83,29 @@ public class NestedLoopJoinNode extends JoinNodeBase {
this.markJoinConjuncts = markJoinConjuncts;
}
+ public Set<SlotId> getMaterializedSlotIds() {
+ return materializedSlotIds;
+ }
+
+ public Map<ExprId, SlotId> getMaterializedSlotIdMap() {
+ return materializedSlotIdMap;
+ }
+
+ public void enableMaterializedSlotIds() {
+ hasMaterializedSlotIds = true;
+ }
+
+ public void addSlotIdToMaterializedSlotIds(SlotId slotId) {
+ materializedSlotIds.add(slotId);
+ hasMaterializedSlotIds = true;
+ }
+
+ private List<SlotId> getSortedMaterializedSlotIds() {
+ List<SlotId> sortedSlotIds = new ArrayList<>(materializedSlotIds);
+ sortedSlotIds.sort(Comparator.comparingInt(SlotId::asInt));
+ return sortedSlotIds;
+ }
+
public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner,
List<TupleId> tupleIds,
JoinOperator joinOperator, boolean isMarkJoin) {
super(id, "NESTED LOOP JOIN", joinOperator, isMarkJoin);
@@ -103,6 +140,13 @@ public class NestedLoopJoinNode extends JoinNodeBase {
}
msg.nested_loop_join_node.setIsOutputLeftSideOnly(isOutputLeftSideOnly);
msg.nested_loop_join_node.setUseSpecificProjections(false);
+ if (hasMaterializedSlotIds) {
+ List<Integer> slotIds = new ArrayList<>();
+ for (SlotId slotId : getSortedMaterializedSlotIds()) {
+ slotIds.add(slotId.asInt());
+ }
+ msg.nested_loop_join_node.setMaterializedSlotIds(slotIds);
+ }
msg.node_type = TPlanNodeType.CROSS_JOIN_NODE;
}
@@ -148,6 +192,13 @@ public class NestedLoopJoinNode extends JoinNodeBase {
}
output.append("\n");
}
+ if (hasMaterializedSlotIds) {
+ output.append(detailPrefix).append("materialized slot ids: ");
+ for (SlotId slotId : getSortedMaterializedSlotIds()) {
+ output.append(slotId).append(" ");
+ }
+ output.append("\n");
+ }
if (detailLevel == TExplainLevel.VERBOSE) {
output.append(detailPrefix).append("isMarkJoin:
").append(isMarkJoin()).append("\n");
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index dadd1396cdc..757751d3a46 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1051,6 +1051,11 @@ struct TNestedLoopJoinNode {
9: optional list<Exprs.TExpr> mark_join_conjuncts
// deprecated
10: optional bool use_specific_projections
+
+ // Slots that need to be materialized after join conjunct evaluation.
+ // If this field is not set, BE keeps the legacy behavior.
+ // If this field is set to an empty list, no payload slot needs
materialization.
+ 11: optional list<Types.TSlotId> materialized_slot_ids
}
struct TMergeJoinNode {
diff --git
a/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy
b/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy
new file mode 100644
index 00000000000..0c1884aea85
--- /dev/null
+++
b/regression-test/suites/query_p0/join/test_nestedloop_lazy_materialization.groovy
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_nestedloop_lazy_materialization") {
+ sql "set enable_nereids_planner=true"
+ sql "set enable_fallback_to_original_planner=false"
+ sql "set disable_join_reorder=true"
+
+ sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_probe"
+ sql """
+ CREATE TABLE test_nestedloop_lazy_materialization_probe (
+ id INT NOT NULL,
+ v INT NOT NULL,
+ flag INT NOT NULL
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES ("replication_num" = "1");
+ """
+
+ sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_build"
+ sql """
+ CREATE TABLE test_nestedloop_lazy_materialization_build (
+ id INT NOT NULL,
+ v INT NOT NULL,
+ flag INT NOT NULL
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES ("replication_num" = "1");
+ """
+
+ sql "DROP TABLE IF EXISTS test_nestedloop_lazy_materialization_null_build"
+ sql """
+ CREATE TABLE test_nestedloop_lazy_materialization_null_build (
+ id INT NOT NULL,
+ v INT NULL
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES ("replication_num" = "1");
+ """
+
+ sql """
+ INSERT INTO test_nestedloop_lazy_materialization_probe VALUES
+ (1, 1, 10),
+ (2, 3, 20),
+ (3, 5, 30),
+ (4, 7, 40),
+ (5, 200, 50);
+ """
+ sql """
+ INSERT INTO test_nestedloop_lazy_materialization_build VALUES
+ (5, 0, 1),
+ (10, 2, 1),
+ (20, 4, 0),
+ (30, 6, 1),
+ (40, 8, 0),
+ (50, 100, 1);
+ """
+ sql """
+ INSERT INTO test_nestedloop_lazy_materialization_null_build VALUES
+ (1, NULL),
+ (2, 2),
+ (3, 4);
+ """
+ sql "sync"
+
+ // Eval-only demand: predicate columns are real in the temporary eval
block,
+ // but no payload column is needed after the join for count(*).
+ def countOnly = sql """
+ SELECT COUNT(*)
+ FROM test_nestedloop_lazy_materialization_probe p
+ JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v AND p.flag + b.flag > 10;
+ """
+ assertEquals("12", countOnly[0][0].toString())
+
+ // Materialize-only payload: p.id is not needed by ON predicate, but is
needed
+ // after predicate filtering as final output.
+ def materializeOnly = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v AND p.flag + b.flag > 10
+ ORDER BY p.id, b.id;
+ """
+ assertEquals([[1], [1], [1], [2], [2], [2], [2], [3], [3], [3], [4], [4]],
+ materializeOnly)
+
+ // Overlap demand: p.v and b.v are needed both by ON predicate and by
output.
+ def evalAndMaterialize = sql """
+ SELECT p.v, b.v
+ FROM test_nestedloop_lazy_materialization_probe p
+ JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v AND b.flag = 1
+ WHERE p.v < 4
+ ORDER BY p.v, b.v;
+ """
+ assertEquals([[1, 2], [1, 6], [1, 100], [3, 6], [3, 100]],
evalAndMaterialize)
+
+ // Post-join filter demand: b.flag is not required by ON predicate here,
but
+ // it must be materialized because WHERE is evaluated on the joined block.
+ def postJoinFilter = sql """
+ SELECT p.id, b.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ LEFT OUTER JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v
+ WHERE b.flag = 1
+ ORDER BY p.id, b.id;
+ """
+ assertEquals([[1, 10], [1, 30], [1, 50], [2, 30], [2, 50], [3, 30], [3,
50], [4, 50]],
+ postJoinFilter)
+
+ // Nullable eval demand: RIGHT/FULL outer joins make probe-side
intermediate
+ // slots nullable, while the source probe block column is still
non-nullable.
+ def rightOuterNullableEval = sql """
+ SELECT b.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ RIGHT OUTER JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v
+ ORDER BY b.id, p.id;
+ """
+ assertEquals([[5], [10], [20], [20], [30], [30], [30], [40], [40], [40],
[40],
+ [50], [50], [50], [50]], rightOuterNullableEval)
+
+ // Build-side and probe-side finalization should be resumable across output
+ // batches when lazy FULL OUTER JOIN has many unmatched rows.
+ def fullOuterResume = sql """
+ SELECT /*+SET_VAR(batch_size=2)*/ COALESCE(p.id, -1), COALESCE(b.id,
-1)
+ FROM test_nestedloop_lazy_materialization_probe p
+ FULL OUTER JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v + 100 < b.v
+ ORDER BY 1, 2;
+ """
+ assertEquals([[-1, 5], [-1, 10], [-1, 20], [-1, 30], [-1, 40], [-1, 50],
+ [1, -1], [2, -1], [3, -1], [4, -1], [5, -1]], fullOuterResume)
+
+ // Probe-side matched tracking and finalization for lazy LEFT SEMI JOIN.
+ def leftSemi = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ LEFT SEMI JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v AND b.flag = 1
+ ORDER BY p.id;
+ """
+ assertEquals([[1], [2], [3], [4]], leftSemi)
+
+ // Probe-side unmatched tracking and finalization for lazy LEFT ANTI JOIN.
+ def leftAnti = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ LEFT ANTI JOIN test_nestedloop_lazy_materialization_build b
+ ON p.v < b.v AND b.flag = 1
+ ORDER BY p.id;
+ """
+ assertEquals([[5]], leftAnti)
+
+ // MARK LEFT SEMI lazy finalization: EXISTS is represented as a mark value
+ // because it is combined with an OR predicate.
+ def markLeftSemi = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ WHERE EXISTS (
+ SELECT *
+ FROM test_nestedloop_lazy_materialization_build b
+ WHERE p.v < b.v AND b.flag = 1
+ ) OR p.id = 5
+ ORDER BY p.id;
+ """
+ assertEquals([[1], [2], [3], [4], [5]], markLeftSemi)
+
+ // MARK LEFT ANTI lazy finalization: NOT EXISTS inverts the mark value for
+ // rows without a successful predicate match.
+ def markLeftAnti = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ WHERE NOT EXISTS (
+ SELECT *
+ FROM test_nestedloop_lazy_materialization_build b
+ WHERE p.v < b.v AND b.flag = 1
+ ) OR p.id = 1
+ ORDER BY p.id;
+ """
+ assertEquals([[1], [5]], markLeftAnti)
+
+ // MARK NULL state: IN can produce NULL when the correlated subquery
contains
+ // NULL and no equal value, and IS NULL observes that mark state.
+ def markNull = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ WHERE (p.v IN (
+ SELECT b.v
+ FROM test_nestedloop_lazy_materialization_null_build b
+ WHERE p.id > b.id
+ )) IS NULL
+ ORDER BY p.id;
+ """
+ assertEquals([[2], [3], [4], [5]], markNull)
+
+ // NULL-aware anti join lazy finalization: the NULL in the correlated
subquery
+ // result should make NOT IN unknown for rows whose subquery sees it.
+ def nullAwareAnti = sql """
+ SELECT p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ WHERE p.v NOT IN (
+ SELECT b.v
+ FROM test_nestedloop_lazy_materialization_null_build b
+ WHERE p.id > b.id
+ )
+ ORDER BY p.id;
+ """
+ assertEquals([[1]], nullAwareAnti)
+
+ // Probe-side finalization must not consume a probe row when the output
+ // batch is already full and the build side is empty.
+ def leftOuterEmptyBuild = sql """
+ SELECT /*+SET_VAR(batch_size=2)*/ p.id, COALESCE(b.id, -1)
+ FROM test_nestedloop_lazy_materialization_probe p
+ LEFT OUTER JOIN (
+ SELECT *
+ FROM test_nestedloop_lazy_materialization_build
+ WHERE id < 0
+ ) b
+ ON p.v < b.v
+ ORDER BY p.id;
+ """
+ assertEquals([[1, -1], [2, -1], [3, -1], [4, -1], [5, -1]],
leftOuterEmptyBuild)
+
+ def leftAntiEmptyBuild = sql """
+ SELECT /*+SET_VAR(batch_size=2)*/ p.id
+ FROM test_nestedloop_lazy_materialization_probe p
+ LEFT ANTI JOIN (
+ SELECT *
+ FROM test_nestedloop_lazy_materialization_build
+ WHERE id < 0
+ ) b
+ ON p.v < b.v
+ ORDER BY p.id;
+ """
+ assertEquals([[1], [2], [3], [4], [5]], leftAntiEmptyBuild)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]