This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9083bf7e14d revert "[Improvementation](join) empty_block shall be set
true when build blo… (#33977)"
9083bf7e14d is described below
commit 9083bf7e14d066b17239808c23b2860130bcdcb9
Author: yiguolei <[email protected]>
AuthorDate: Thu Apr 25 23:33:11 2024 +0800
revert "[Improvementation](join) empty_block shall be set true when build
blo… (#33977)"
This reverts commit e3ed861e4b6a602ea874b6501998578952291f38.
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 17 +++--
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 80 ++++++++++++----------
be/src/pipeline/exec/hashjoin_probe_operator.h | 9 ++-
be/src/vec/core/column_with_type_and_name.cpp | 12 ++--
.../join/test_half_join_nullable_build_side.out | 6 --
.../join/test_half_join_nullable_build_side.groovy | 4 --
6 files changed, 61 insertions(+), 67 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 2b2bdad86f7..a0d111c63a7 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -156,22 +156,21 @@ bool HashJoinBuildSinkLocalState::build_unique() const {
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
- bool empty_block =
- !_shared_state->build_block ||
- !(_shared_state->build_block->rows() > 1); // build size always
mock a row into block
_shared_state->short_circuit_for_probe =
(_shared_state->_has_null_in_build_side &&
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!p._is_mark_join) ||
- (empty_block && p._join_op == TJoinOp::INNER_JOIN &&
!p._is_mark_join) ||
- (empty_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN &&
!p._is_mark_join) ||
- (empty_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
- (empty_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
- (empty_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);
+ (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN
&&
+ !p._is_mark_join) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::LEFT_SEMI_JOIN &&
+ !p._is_mark_join) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_ANTI_JOIN);
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
- (empty_block && !p._have_other_join_conjunct && !p._is_mark_join)
&&
+ (!_shared_state->build_block && !p._have_other_join_conjunct &&
!p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index fc6f81f4190..a58ad62211c 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -247,7 +247,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
//TODO: this short circuit maybe could refactor, no need to check at here.
- if (local_state.empty_right_table_shortcut()) {
+ if (local_state._shared_state->empty_right_table_need_probe_dispose) {
// when build table rows is 0 and not have other_join_conjunct and
join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by
add additional null data.
@@ -257,6 +257,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
+ vectorized::Block temp_block;
+ //get probe side output column
+ for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+ temp_block.insert(local_state._probe_block.get_by_position(i));
+ }
+
//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i <
_right_output_slot_flags.size(); ++i) {
@@ -267,8 +273,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
auto nullable_column =
vectorized::ColumnNullable::create(std::move(column),
std::move(null_map_column));
- local_state._probe_block.insert({std::move(nullable_column),
make_nullable(type),
- _right_table_column_names[i]});
+ temp_block.insert({std::move(nullable_column), make_nullable(type),
+ _right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
@@ -284,7 +290,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
/// No need to check the block size in `_filter_data_and_build_output`
because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows
count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state,
output_block, eos,
-
&local_state._probe_block, false));
+ &temp_block,
false));
+ temp_block.clear();
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
return Status::OK();
}
@@ -367,52 +374,36 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
+
vectorized::ColumnUInt8::MutablePtr& null_map,
+
vectorized::ColumnRawPtrs& raw_ptrs,
const std::vector<int>&
res_col_ids) {
- if (empty_right_table_shortcut()) {
- return Status::OK();
- }
-
- _probe_columns.resize(_probe_expr_ctxs.size());
-
- if (!_has_set_need_null_map_for_probe) {
- _has_set_need_null_map_for_probe = true;
- _need_null_map_for_probe = _need_probe_null_map(block, res_col_ids);
- }
- if (_need_null_map_for_probe) {
- if (_null_map_column == nullptr) {
- _null_map_column = vectorized::ColumnUInt8::create();
- }
- _null_map_column->get_data().assign(block.rows(), (uint8_t)0);
- }
-
auto& shared_state = *_shared_state;
auto& p = _parent->cast<HashJoinProbeOperatorX>();
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
if (p._should_convert_to_nullable[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
- _probe_columns[i] = _key_columns_holder.back().get();
+ raw_ptrs[i] = _key_columns_holder.back().get();
continue;
}
if (shared_state.is_null_safe_eq_join[i]) {
- _probe_columns[i] =
block.get_by_position(res_col_ids[i]).column.get();
+ raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get();
} else {
- const auto* column =
block.get_by_position(res_col_ids[i]).column.get();
- if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
- const auto& col_nested = nullable->get_nested_column();
- const auto& col_nullmap = nullable->get_null_map_data();
-
- DCHECK(_null_map_column != nullptr);
-
vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(),
- col_nullmap);
+ auto column = block.get_by_position(res_col_ids[i]).column.get();
+ if (auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+ auto& col_nested = nullable->get_nested_column();
+ auto& col_nullmap = nullable->get_null_map_data();
+
+ DCHECK(null_map != nullptr);
+
vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
if (shared_state.store_null_in_hash_table[i]) {
- _probe_columns[i] = nullable;
+ raw_ptrs[i] = nullable;
} else {
- _probe_columns[i] = &col_nested;
+ raw_ptrs[i] = &col_nested;
}
} else {
- _probe_columns[i] = column;
+ raw_ptrs[i] = column;
}
}
}
@@ -491,7 +482,10 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state,
vectorized::Block* inpu
local_state._probe_eos = eos;
if (input_block->rows() > 0) {
COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows());
- std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
+ int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size();
+ local_state._probe_columns.resize(probe_expr_ctxs_sz);
+
+ std::vector<int> res_col_ids(probe_expr_ctxs_sz);
RETURN_IF_ERROR(_do_evaluate(*input_block,
local_state._probe_expr_ctxs,
*local_state._probe_expr_call_timer,
res_col_ids));
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN) {
@@ -499,8 +493,22 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state,
vectorized::Block* inpu
local_state._convert_block_to_null(*input_block);
}
- RETURN_IF_ERROR(local_state._extract_join_column(*input_block,
res_col_ids));
+ // TODO: Now we are not sure whether a column is nullable only by
ExecNode's `row_desc`
+ // so we have to initialize this flag by the first probe block.
+ if (!local_state._has_set_need_null_map_for_probe) {
+ local_state._has_set_need_null_map_for_probe = true;
+ local_state._need_null_map_for_probe =
+ local_state._need_probe_null_map(*input_block,
res_col_ids);
+ }
+ if (local_state._need_null_map_for_probe) {
+ if (local_state._null_map_column == nullptr) {
+ local_state._null_map_column =
vectorized::ColumnUInt8::create();
+ }
+
local_state._null_map_column->get_data().assign(input_block->rows(),
(uint8_t)0);
+ }
+ RETURN_IF_ERROR(local_state._extract_join_column(*input_block,
local_state._null_map_column,
+
local_state._probe_columns, res_col_ids));
if (&local_state._probe_block != input_block) {
input_block->swap(local_state._probe_block);
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 1b45a2a258e..b4930307bcc 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -94,16 +94,15 @@ public:
const std::shared_ptr<vectorized::Block>& build_block() const {
return _shared_state->build_block;
}
- bool empty_right_table_shortcut() const {
- // !Base::_projections.empty() means nereids planner
- return _shared_state->empty_right_table_need_probe_dispose &&
!Base::_projections.empty();
- }
private:
void _prepare_probe_block();
bool _need_probe_null_map(vectorized::Block& block, const
std::vector<int>& res_col_ids);
std::vector<uint16_t> _convert_block_to_null(vectorized::Block& block);
- Status _extract_join_column(vectorized::Block& block, const
std::vector<int>& res_col_ids);
+ Status _extract_join_column(vectorized::Block& block,
+ vectorized::ColumnUInt8::MutablePtr& null_map,
+ vectorized::ColumnRawPtrs& raw_ptrs,
+ const std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
template <int JoinOpType, typename Parent>
friend struct vectorized::ProcessHashTableProbe;
diff --git a/be/src/vec/core/column_with_type_and_name.cpp
b/be/src/vec/core/column_with_type_and_name.cpp
index e93946804ff..cd0f7194004 100644
--- a/be/src/vec/core/column_with_type_and_name.cpp
+++ b/be/src/vec/core/column_with_type_and_name.cpp
@@ -62,17 +62,15 @@ void ColumnWithTypeAndName::dump_structure(std::ostream&
out) const {
out << name;
}
- if (type) {
+ if (type)
out << " " << type->get_name();
- } else {
+ else
out << " nullptr";
- }
- if (column) {
- out << ' ' << column->dump_structure() << "(use_count=" <<
column->use_count() << ')';
- } else {
+ if (column)
+ out << ' ' << column->dump_structure();
+ else
out << " nullptr";
- }
}
String ColumnWithTypeAndName::dump_structure() const {
diff --git
a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
index 56c5f6e2229..8404bee641f 100644
--- a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
+++ b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
@@ -134,9 +134,3 @@
4 \N \N \N \N \N
5 1111 1111 3 1111 1111
--- !shortcut --
-1 11 11
-2 111 111
-3 1111 1111
-4 111 111
-
diff --git
a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
index 230332fdf3b..2bb24309960 100644
---
a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
+++
b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
@@ -286,8 +286,4 @@ suite("test_half_join_nullable_build_side", "query,p0") {
left join test_half_join_nullable_build_side_l r on l.v2 <=> r.v2
order by 1, 2, 3;
"""
-
- qt_shortcut """
- select * from test_half_join_nullable_build_side_l l
left anti join test_half_join_nullable_build_side_r r on l.v2 <=> r.v2 and
r.k1=5 order by 1, 2, 3;
- """
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]