This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de96162ab37 [Chore](hash) catch error when hash method variant meet
valueless_by_exception #34956
de96162ab37 is described below
commit de96162ab3729569acf7e01cdc5d7f260eede39d
Author: Pxl <[email protected]>
AuthorDate: Fri May 17 11:33:41 2024 +0800
[Chore](hash) catch error when hash method variant meet
valueless_by_exception #34956
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++-
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../distinct_streaming_aggregation_operator.cpp | 13 +--
.../exec/distinct_streaming_aggregation_operator.h | 2 +-
.../pipeline/exec/partition_sort_sink_operator.cpp | 9 ++-
.../pipeline/exec/partition_sort_sink_operator.h | 2 +-
.../exec/streaming_aggregation_operator.cpp | 12 ++-
.../pipeline/exec/streaming_aggregation_operator.h | 2 +-
.../common/hash_table/hash_map_context_creator.h | 8 +-
be/src/vec/exec/vaggregation_node.cpp | 9 ++-
be/src/vec/exec/vaggregation_node.h | 2 +-
be/src/vec/exec/vpartition_sort_node.cpp | 92 ++++++++++++----------
be/src/vec/exec/vpartition_sort_node.h | 4 +-
13 files changed, 100 insertions(+), 69 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index c36427b9519..dcdff106b90 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -20,6 +20,7 @@
#include <memory>
#include <string>
+#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "runtime/primitive_type.h"
#include "vec/common/hash_table/hash.h"
@@ -100,7 +101,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
_executor = std::make_unique<Executor<true, false>>();
}
} else {
- _init_hash_method(Base::_shared_state->probe_expr_ctxs);
+
RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs));
std::visit(vectorized::Overload {
[&](std::monostate& arg) {
@@ -565,9 +566,12 @@ void
AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places
_agg_data->method_variant);
}
-void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs&
probe_exprs) {
- init_agg_hash_method(_agg_data, probe_exprs,
- Base::_parent->template
cast<AggSinkOperatorX>()._is_first_phase);
+Status AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& probe_exprs) {
+ if (!init_agg_hash_method(_agg_data, probe_exprs,
+ Base::_parent->template
cast<AggSinkOperatorX>()._is_first_phase)) {
+ return Status::InternalError("init hash method failed");
+ }
+ return Status::OK();
}
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index d32e5f616c2..036cbbf3162 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -75,7 +75,7 @@ protected:
Status _execute_without_key(vectorized::Block* block);
Status _merge_without_key(vectorized::Block* block);
void _update_memusage_without_key();
- void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+ Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
Status _execute_with_serialized_key(vectorized::Block* block);
Status _merge_with_serialized_key(vectorized::Block* block);
void _update_memusage_with_serialized_key();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 82b29b2afdb..c4c1ba370b0 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -98,7 +98,7 @@ Status DistinctStreamingAggLocalState::open(RuntimeState*
state) {
_agg_data->without_key =
reinterpret_cast<vectorized::AggregateDataPtr>(
_agg_profile_arena->alloc(p._total_size_of_aggregate_states));
} else {
- _init_hash_method(_probe_expr_ctxs);
+ RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
}
return Status::OK();
}
@@ -168,11 +168,14 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
_agg_data->method_variant);
}
-void DistinctStreamingAggLocalState::_init_hash_method(
+Status DistinctStreamingAggLocalState::_init_hash_method(
const vectorized::VExprContextSPtrs& probe_exprs) {
- init_agg_hash_method(
- _agg_data.get(), probe_exprs,
- Base::_parent->template
cast<DistinctStreamingAggOperatorX>()._is_first_phase);
+ if (!init_agg_hash_method(
+ _agg_data.get(), probe_exprs,
+ Base::_parent->template
cast<DistinctStreamingAggOperatorX>()._is_first_phase)) {
+ return Status::InternalError("init agg hash method failed");
+ }
+ return Status::OK();
}
Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index db951b44142..0a3af64ed46 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -52,7 +52,7 @@ private:
friend class StatefulOperatorX;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);
- void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+ Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector&
distinct_row,
vectorized::ColumnRawPtrs&
key_columns,
const size_t num_rows);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 3ec8b9e66cf..9d6cbd29b2f 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -46,7 +46,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order,
p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit,
p._partition_inner_limit,
p._top_n_algorithm, p._topn_phase);
- _init_hash_method();
+ RETURN_IF_ERROR(_init_hash_method());
return Status::OK();
}
@@ -223,8 +223,11 @@ Status
PartitionSortSinkOperatorX::_emplace_into_hash_table(
local_state._partitioned_data->method_variant);
}
-void PartitionSortSinkLocalState::_init_hash_method() {
- init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs,
true);
+Status PartitionSortSinkLocalState::_init_hash_method() {
+ if (!init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true)) {
+ return Status::InternalError("init hash method failed");
+ }
+ return Status::OK();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 43829f95a8f..f4d88204c0d 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -56,7 +56,7 @@ private:
RuntimeProfile::Counter* _selector_block_timer = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
- void _init_hash_method();
+ Status _init_hash_method();
};
class PartitionSortSinkOperatorX final : public
DataSinkOperatorX<PartitionSortSinkLocalState> {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 10bd37d8c44..ba66d01a0f8 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -152,7 +152,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
}
}
} else {
- _init_hash_method(_probe_expr_ctxs);
+ RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> void {
@@ -503,9 +503,13 @@ Status
StreamingAggLocalState::_merge_with_serialized_key(vectorized::Block* blo
}
}
-void StreamingAggLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& probe_exprs) {
- init_agg_hash_method(_agg_data.get(), probe_exprs,
- Base::_parent->template
cast<StreamingAggOperatorX>()._is_first_phase);
+Status StreamingAggLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& probe_exprs) {
+ if (!init_agg_hash_method(
+ _agg_data.get(), probe_exprs,
+ Base::_parent->template
cast<StreamingAggOperatorX>()._is_first_phase)) {
+ return Status::InternalError("init hash method failed");
+ }
+ return Status::OK();
}
Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block,
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 17040ca59ff..227536170ea 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -63,7 +63,7 @@ private:
Status _execute_with_serialized_key(vectorized::Block* block);
Status _merge_with_serialized_key(vectorized::Block* block);
void _update_memusage_with_serialized_key();
- void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+ Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
Status _get_without_key_result(RuntimeState* state, vectorized::Block*
block, bool* eos);
Status _serialize_without_key(RuntimeState* state, vectorized::Block*
block, bool* eos);
Status _get_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block,
diff --git a/be/src/vec/common/hash_table/hash_map_context_creator.h
b/be/src/vec/common/hash_table/hash_map_context_creator.h
index 22760ddfcb4..426892d3442 100644
--- a/be/src/vec/common/hash_table/hash_map_context_creator.h
+++ b/be/src/vec/common/hash_table/hash_map_context_creator.h
@@ -104,7 +104,7 @@ bool try_get_hash_map_context_fixed(Variant& variant, const
std::vector<DataType
}
template <typename DataVariants, typename Data>
-void init_hash_method(DataVariants* agg_data, const
vectorized::VExprContextSPtrs& probe_exprs,
+bool init_hash_method(DataVariants* agg_data, const
vectorized::VExprContextSPtrs& probe_exprs,
bool is_first_phase) {
using Type = DataVariants::Type;
Type t(Type::serialized);
@@ -164,5 +164,11 @@ void init_hash_method(DataVariants* agg_data, const
vectorized::VExprContextSPtr
agg_data->init(Type::serialized);
}
}
+
+ if (agg_data->method_variant.valueless_by_exception()) {
+ agg_data->method_variant.template emplace<std::monostate>();
+ return false;
+ }
+ return true;
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 77a883558de..3a66773cf03 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -161,8 +161,11 @@ Status AggregationNode::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-void AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) {
- init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase);
+Status AggregationNode::_init_hash_method(const VExprContextSPtrs&
probe_exprs) {
+ if (!init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase)) {
+ return Status::InternalError("init hash method failed");
+ }
+ return Status::OK();
}
Status AggregationNode::prepare_profile(RuntimeState* state) {
@@ -269,7 +272,7 @@ Status AggregationNode::prepare_profile(RuntimeState*
state) {
std::bind<void>(&AggregationNode::_update_memusage_without_key, this);
_executor.close =
std::bind<void>(&AggregationNode::_close_without_key, this);
} else {
- _init_hash_method(_probe_expr_ctxs);
+ RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
std::visit(Overload {[&](std::monostate& arg) {
throw
doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 5815f02e5bb..de94cd6d59b 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -533,7 +533,7 @@ private:
Status _merge_with_serialized_key(Block* block);
void _update_memusage_with_serialized_key();
void _close_with_serialized_key();
- void _init_hash_method(const VExprContextSPtrs& probe_exprs);
+ Status _init_hash_method(const VExprContextSPtrs& probe_exprs);
template <bool limit>
Status _execute_with_serialized_key_helper(Block* block) {
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp
b/be/src/vec/exec/vpartition_sort_node.cpp
index d7bae4bd35d..7788d58955e 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -158,7 +158,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(),
_row_descriptor));
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state,
child(0)->row_desc()));
- _init_hash_method();
+ RETURN_IF_ERROR(_init_hash_method());
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
child(0)->row_desc(),
@@ -182,45 +182,50 @@ Status VPartitionSortNode::_emplace_into_hash_table(const
ColumnRawPtrs& key_col
const vectorized::Block*
input_block,
bool eos) {
return std::visit(
- [&](auto&& agg_method) -> Status {
- SCOPED_TIMER(_build_timer);
- using HashMethodType = std::decay_t<decltype(agg_method)>;
- using AggState = typename HashMethodType::State;
-
- AggState state(key_columns);
- size_t num_rows = input_block->rows();
- agg_method.init_serialized_keys(key_columns, num_rows);
-
- auto creator = [&](const auto& ctor, auto& key, auto& origin) {
- HashMethodType::try_presis_key(key, origin,
*_agg_arena_pool);
- auto* aggregate_data = _pool->add(
- new PartitionBlocks(_partition_sort_info,
_value_places.empty()));
- _value_places.push_back(aggregate_data);
- ctor(key, aggregate_data);
- _num_partition++;
- };
- auto creator_for_null_key = [&](auto& mapped) {
- mapped = _pool->add(
- new PartitionBlocks(_partition_sort_info,
_value_places.empty()));
- _value_places.push_back(mapped);
- _num_partition++;
- };
- {
- SCOPED_TIMER(_emplace_key_timer);
- for (size_t row = 0; row < num_rows; ++row) {
- auto& mapped =
- agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
- mapped->add_row_idx(row);
- }
- }
- {
- SCOPED_TIMER(_selector_block_timer);
- for (auto* place : _value_places) {
-
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
- }
- }
- return Status::OK();
- },
+ vectorized::Overload {
+ [&](std::monostate& arg) -> Status {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return Status::InternalError("Unit hash table");
+ },
+ [&](auto&& agg_method) -> Status {
+ SCOPED_TIMER(_build_timer);
+ using HashMethodType =
std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
+
+ AggState state(key_columns);
+ size_t num_rows = input_block->rows();
+ agg_method.init_serialized_keys(key_columns, num_rows);
+
+ auto creator = [&](const auto& ctor, auto& key, auto&
origin) {
+ HashMethodType::try_presis_key(key, origin,
*_agg_arena_pool);
+ auto* aggregate_data = _pool->add(new
PartitionBlocks(
+ _partition_sort_info,
_value_places.empty()));
+ _value_places.push_back(aggregate_data);
+ ctor(key, aggregate_data);
+ _num_partition++;
+ };
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = _pool->add(new
PartitionBlocks(_partition_sort_info,
+
_value_places.empty()));
+ _value_places.push_back(mapped);
+ _num_partition++;
+ };
+ {
+ SCOPED_TIMER(_emplace_key_timer);
+ for (size_t row = 0; row < num_rows; ++row) {
+ auto& mapped = agg_method.lazy_emplace(state,
row, creator,
+
creator_for_null_key);
+ mapped->add_row_idx(row);
+ }
+ }
+ {
+ SCOPED_TIMER(_selector_block_timer);
+ for (auto* place : _value_places) {
+
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
+ }
+ }
+ return Status::OK();
+ }},
_partitioned_data->method_variant);
}
@@ -397,8 +402,11 @@ void VPartitionSortNode::release_resource(RuntimeState*
state) {
ExecNode::release_resource(state);
}
-void VPartitionSortNode::_init_hash_method() {
- init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs,
true);
+Status VPartitionSortNode::_init_hash_method() {
+ if (!init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true)) {
+ return Status::InternalError("init hash method failed");
+ }
+ return Status::OK();
}
void VPartitionSortNode::debug_profile() {
diff --git a/be/src/vec/exec/vpartition_sort_node.h
b/be/src/vec/exec/vpartition_sort_node.h
index 10135bd5107..481a99719fb 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -133,7 +133,7 @@ using PartitionDataWithUInt256Key = PHHashMap<UInt256,
PartitionDataPtr, HashCRC
using PartitionDataWithUInt136Key = PHHashMap<UInt136, PartitionDataPtr,
HashCRC32<UInt136>>;
using PartitionedMethodVariants = std::variant<
- MethodSerialized<PartitionDataWithStringKey>,
+ std::monostate, MethodSerialized<PartitionDataWithStringKey>,
MethodOneNumber<UInt8, PartitionDataWithUInt8Key>,
MethodOneNumber<UInt16, PartitionDataWithUInt16Key>,
MethodOneNumber<UInt32, PartitionDataWithUInt32Key>,
@@ -236,7 +236,7 @@ public:
bool can_read();
private:
- void _init_hash_method();
+ Status _init_hash_method();
Status _split_block_by_partition(vectorized::Block* input_block, bool eos);
Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, bool
eos);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]