This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9701457c364 [refine](SetOperator) refine some SetOperator code.
(#49772)
9701457c364 is described below
commit 9701457c364c3f2bec6e2d2adc2da4676110350f
Author: Mryange <[email protected]>
AuthorDate: Thu Apr 17 11:26:18 2025 +0800
[refine](SetOperator) refine some SetOperator code. (#49772)
### What problem does this PR solve?
Modified some parts of the SetOperator code.
1. expr should use local state.
2. Abstracted out the get_hash_table_size function.
3. Removed some unreachable code.
4. For the output of source, replace it with append_data_by_selector to
optimize speed.
---
be/src/pipeline/dependency.cpp | 13 ++++++++
be/src/pipeline/dependency.h | 1 +
be/src/pipeline/exec/set_probe_sink_operator.cpp | 42 ++++++++++--------------
be/src/pipeline/exec/set_sink_operator.cpp | 30 +++++------------
be/src/pipeline/exec/set_source_operator.cpp | 35 +++++++++++---------
be/src/pipeline/exec/set_source_operator.h | 5 ++-
6 files changed, 62 insertions(+), 64 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index c8a9f5ed528..5624959f630 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -432,6 +432,19 @@ Status SetSharedState::update_build_not_ignore_null(const
vectorized::VExprConte
return Status::OK();
}
+size_t SetSharedState::get_hash_table_size() const {
+ size_t hash_table_size = 0;
+ std::visit(
+ [&](auto&& arg) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ hash_table_size = arg.hash_table->size();
+ }
+ },
+ hash_table_variants->method_variant);
+ return hash_table_size;
+}
+
Status SetSharedState::hash_table_init() {
std::vector<vectorized::DataTypePtr> data_types;
for (size_t i = 0; i != child_exprs_lists[0].size(); ++i) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index cbb2e043a2a..ef8ed63eb16 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -676,6 +676,7 @@ public:
// If a calculation involves both nullable and non-nullable columns, the
final output should be a nullable column
Status update_build_not_ignore_null(const vectorized::VExprContextSPtrs&
ctxs);
+ size_t get_hash_table_size() const;
/// init in both upstream side.
//The i-th result expr list refers to the i-th child.
std::vector<vectorized::VExprContextSPtrs> child_exprs_lists;
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 5abf8b36ab0..c2c9e6741e8 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -144,22 +144,25 @@ Status
SetProbeSinkOperatorX<is_intersect>::_extract_probe_column(
vectorized::ColumnRawPtrs& raw_ptrs, int child_id) {
auto& build_not_ignore_null =
local_state._shared_state->build_not_ignore_null;
- for (size_t i = 0; i < _child_exprs.size(); ++i) {
+ auto& child_exprs = local_state._child_exprs;
+ for (size_t i = 0; i < child_exprs.size(); ++i) {
int result_col_id = -1;
- RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_col_id));
+ RETURN_IF_ERROR(child_exprs[i]->execute(&block, &result_col_id));
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
- auto column = block.get_by_position(result_col_id).column.get();
-
- if (auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& col_nested = nullable->get_nested_column();
- if (build_not_ignore_null[i]) { //same as build column
- raw_ptrs[i] = nullable;
- } else {
- raw_ptrs[i] = &col_nested;
+ const auto* column = block.get_by_position(result_col_id).column.get();
+
+ if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+ if (!build_not_ignore_null[i]) {
+ return Status::InternalError(
+ "SET operator expects a nullable : {} column in column
{}, but the "
+ "computed "
+ "output is a nullable : {} column",
+ build_not_ignore_null[i], i,
+ nullable->get_nested_column_ptr()->is_nullable());
}
-
+ raw_ptrs[i] = nullable;
} else {
if (build_not_ignore_null[i]) {
auto column_ptr =
make_nullable(block.get_by_position(result_col_id).column, false);
@@ -179,22 +182,10 @@ template <bool is_intersect>
void SetProbeSinkOperatorX<is_intersect>::_finalize_probe(
SetProbeSinkLocalState<is_intersect>& local_state) {
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;
- auto& hash_table_variants = local_state._shared_state->hash_table_variants;
-
if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) {
_refresh_hash_table(local_state);
- if constexpr (is_intersect) {
- valid_element_in_hash_tbl = 0;
- } else {
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- valid_element_in_hash_tbl = arg.hash_table->size();
- }
- },
- hash_table_variants->method_variant);
- }
+ uint64_t hash_table_size =
local_state._shared_state->get_hash_table_size();
+ valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
local_state._probe_columns.resize(
local_state._shared_state->child_exprs_lists[_cur_child_id +
1].size());
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
@@ -256,6 +247,7 @@ void
SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table(
}
arg.hash_table = std::move(tmp_hash_table);
} else if (is_intersect) {
+ DCHECK_EQ(valid_element_in_hash_tbl,
arg.hash_table->size());
while (iter != iter_end) {
auto& mapped = iter->get_second();
auto* it = &mapped;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6c5b4483915..41fd67aabf8 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -26,19 +26,6 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
-uint64_t get_hash_table_size(const auto& hash_table_variant) {
- uint64_t hash_table_size = 0;
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- hash_table_size = arg.hash_table->size();
- }
- },
- hash_table_variant);
- return hash_table_size;
-}
-
template <bool is_intersect>
Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
@@ -58,8 +45,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState*
state, Status exec_s
if (!_terminated && _runtime_filter_producer_helper &&
!state->is_cancelled()) {
try {
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
- state, &_shared_state->build_block,
-
get_hash_table_size(_shared_state->hash_table_variants->method_variant)));
+ state, &_shared_state->build_block,
_shared_state->get_hash_table_size()));
} catch (Exception& e) {
return Status::InternalError(
"rf process meet error: {}, _terminated: {},
_finish_dependency: {}",
@@ -105,12 +91,12 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
local_state._mutable_block.clear();
if (eos) {
- uint64_t hash_table_size = get_hash_table_size(
-
local_state._shared_state->hash_table_variants->method_variant);
+ uint64_t hash_table_size =
local_state._shared_state->get_hash_table_size();
valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
+ DCHECK_GT(_child_quantity, 1);
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
state, hash_table_size, local_state._finish_dependency));
}
@@ -152,16 +138,18 @@ template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_extract_build_column(
SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) {
- std::vector<int> result_locs(local_state._child_exprs.size(), -1);
+ // use local state child exprs
+ auto& child_expr = local_state._child_exprs;
+ std::vector<int> result_locs(child_expr.size(), -1);
bool is_all_const = true;
- for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
- RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block,
&result_locs[i]));
+ for (size_t i = 0; i < child_expr.size(); ++i) {
+ RETURN_IF_ERROR(child_expr[i]->execute(&block, &result_locs[i]));
is_all_const &=
is_column_const(*block.get_by_position(result_locs[i]).column);
}
rows = is_all_const ? 1 : rows;
- for (size_t i = 0; i < local_state._child_exprs.size(); ++i) {
+ for (size_t i = 0; i < child_expr.size(); ++i) {
size_t result_col_id = result_locs[i];
if (is_all_const) {
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index bc2dc32d577..6d464513b1f 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -127,34 +127,42 @@ Status
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
vectorized::Block* output_block, const int batch_size, bool* eos) {
size_t left_col_len = local_state._left_table_data_types.size();
hash_table_ctx.init_iterator();
- auto block_size = 0;
+ local_state._result_indexs.clear();
+ local_state._result_indexs.reserve(batch_size);
- auto add_result = [&local_state, &block_size, this](auto value) {
+ auto add_result = [&local_state](auto value) {
auto* it = &value;
if constexpr (is_intersect) {
if (it->visited) { //intersected: have done probe, so visited
values it's the result
- _add_result_columns(local_state, value, block_size);
+ local_state._result_indexs.push_back(value.row_num);
}
} else {
if (!it->visited) { //except: haven't visited values it's the
needed result
- _add_result_columns(local_state, value, block_size);
+ local_state._result_indexs.push_back(value.row_num);
}
}
};
auto& iter = hash_table_ctx.iterator;
- for (; iter != hash_table_ctx.hash_table->end() && block_size <
batch_size; ++iter) {
+ while (iter != hash_table_ctx.hash_table->end() &&
+ local_state._result_indexs.size() < batch_size) {
add_result(iter->get_second());
+ ++iter;
}
*eos = iter == hash_table_ctx.hash_table->end();
if (*eos && hash_table_ctx.hash_table->has_null_key_data()) {
auto value = hash_table_ctx.hash_table->template
get_null_key_data<RowRefWithFlag>();
+ // If the hashmap can store nulldata, the return value is
RowRefWithFlag, otherwise it is char*
+ static_assert(std::is_same_v<RowRefWithFlag,
std::decay_t<decltype(value)>> ||
+ std::is_same_v<char*, std::decay_t<decltype(value)>>);
if constexpr (std::is_same_v<RowRefWithFlag,
std::decay_t<decltype(value)>>) {
add_result(value);
}
}
+ local_state._add_result_columns();
+
if (!output_block->mem_reuse()) {
for (int i = 0; i < left_col_len; ++i) {
output_block->insert(
@@ -169,18 +177,15 @@ Status
SetSourceOperatorX<is_intersect>::_get_data_in_hashtable(
}
template <bool is_intersect>
-void SetSourceOperatorX<is_intersect>::_add_result_columns(
- SetSourceLocalState<is_intersect>& local_state, RowRefWithFlag& value,
int& block_size) {
- auto& build_col_idx = local_state._shared_state->build_col_idx;
- auto& build_block = local_state._shared_state->build_block;
-
- for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
- auto& column = *build_block.get_by_position(idx->second).column;
- local_state._mutable_cols[idx->first]->insert_from(column,
value.row_num);
+void SetSourceLocalState<is_intersect>::_add_result_columns() {
+ auto& build_col_idx = _shared_state->build_col_idx;
+ auto& build_block = _shared_state->build_block;
+
+ for (auto& idx : build_col_idx) {
+ const auto& column = *build_block.get_by_position(idx.second).column;
+ column.append_data_by_selector(_mutable_cols[idx.first],
_result_indexs);
}
- block_size++;
}
-
template class SetSourceLocalState<true>;
template class SetSourceLocalState<false>;
template class SetSourceOperatorX<true>;
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index 20cfd885e04..a023888de58 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -41,6 +41,7 @@ public:
Status open(RuntimeState* state) override;
private:
+ void _add_result_columns();
friend class SetSourceOperatorX<is_intersect>;
friend class OperatorX<SetSourceLocalState<is_intersect>>;
std::vector<vectorized::MutableColumnPtr> _mutable_cols;
@@ -49,6 +50,7 @@ private:
RuntimeProfile::Counter* _get_data_timer = nullptr;
RuntimeProfile::Counter* _filter_timer = nullptr;
+ vectorized::IColumn::Selector _result_indexs;
};
template <bool is_intersect>
@@ -90,9 +92,6 @@ private:
Status _get_data_in_hashtable(SetSourceLocalState<is_intersect>&
local_state,
HashTableContext& hash_table_ctx,
vectorized::Block* output_block,
const int batch_size, bool* eos);
-
- void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
RowRefWithFlag& value,
- int& block_size);
const size_t _child_quantity;
};
#include "common/compile_check_end.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]