This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d7d9ac45846 [Chore](hash) catch exception on init_hash_method (#35005)
d7d9ac45846 is described below
commit d7d9ac458469cc0f2248d0eb327fa8fc286abc3b
Author: Pxl <[email protected]>
AuthorDate: Fri May 17 18:03:03 2024 +0800
[Chore](hash) catch exception on init_hash_method (#35005)
catch exception on init_hash_method
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 7 +-
.../distinct_streaming_aggregation_operator.cpp | 8 +-
.../pipeline/exec/partition_sort_sink_operator.cpp | 5 +-
.../exec/streaming_aggregation_operator.cpp | 8 +-
.../common/hash_table/hash_map_context_creator.h | 123 +++++++++++----------
be/src/vec/exec/vaggregation_node.cpp | 4 +-
be/src/vec/exec/vpartition_sort_node.cpp | 5 +-
7 files changed, 80 insertions(+), 80 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index dcdff106b90..7d1840f95a4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -567,10 +567,9 @@ void
AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places
}
Status AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& probe_exprs) {
- if (!init_agg_hash_method(_agg_data, probe_exprs,
- Base::_parent->template
cast<AggSinkOperatorX>()._is_first_phase)) {
- return Status::InternalError("init hash method failed");
- }
+ RETURN_IF_ERROR(
+ init_agg_hash_method(_agg_data, probe_exprs,
+ Base::_parent->template
cast<AggSinkOperatorX>()._is_first_phase));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index c4c1ba370b0..0ea9a06ac71 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -170,11 +170,9 @@ bool
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
Status DistinctStreamingAggLocalState::_init_hash_method(
const vectorized::VExprContextSPtrs& probe_exprs) {
- if (!init_agg_hash_method(
- _agg_data.get(), probe_exprs,
- Base::_parent->template
cast<DistinctStreamingAggOperatorX>()._is_first_phase)) {
- return Status::InternalError("init agg hash method failed");
- }
+ RETURN_IF_ERROR(init_agg_hash_method(
+ _agg_data.get(), probe_exprs,
+ Base::_parent->template
cast<DistinctStreamingAggOperatorX>()._is_first_phase));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 9d6cbd29b2f..3deb3ca556c 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -224,9 +224,8 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
}
Status PartitionSortSinkLocalState::_init_hash_method() {
- if (!init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true)) {
- return Status::InternalError("init hash method failed");
- }
+ RETURN_IF_ERROR(
+ init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index ba66d01a0f8..bf6bd7bb26d 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -504,11 +504,9 @@ Status
StreamingAggLocalState::_merge_with_serialized_key(vectorized::Block* blo
}
Status StreamingAggLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& probe_exprs) {
- if (!init_agg_hash_method(
- _agg_data.get(), probe_exprs,
- Base::_parent->template
cast<StreamingAggOperatorX>()._is_first_phase)) {
- return Status::InternalError("init hash method failed");
- }
+ RETURN_IF_ERROR(init_agg_hash_method(
+ _agg_data.get(), probe_exprs,
+ Base::_parent->template
cast<StreamingAggOperatorX>()._is_first_phase));
return Status::OK();
}
diff --git a/be/src/vec/common/hash_table/hash_map_context_creator.h
b/be/src/vec/common/hash_table/hash_map_context_creator.h
index 426892d3442..31bdbf02573 100644
--- a/be/src/vec/common/hash_table/hash_map_context_creator.h
+++ b/be/src/vec/common/hash_table/hash_map_context_creator.h
@@ -17,6 +17,8 @@
#pragma once
+#include <exception>
+
#include "vec/common/hash_table/hash_map_context.h"
#include "vec/common/hash_table/ph_hash_map.h"
@@ -104,71 +106,78 @@ bool try_get_hash_map_context_fixed(Variant& variant,
const std::vector<DataType
}
template <typename DataVariants, typename Data>
-bool init_hash_method(DataVariants* agg_data, const
vectorized::VExprContextSPtrs& probe_exprs,
- bool is_first_phase) {
+Status init_hash_method(DataVariants* agg_data, const
vectorized::VExprContextSPtrs& probe_exprs,
+ bool is_first_phase) {
using Type = DataVariants::Type;
Type t(Type::serialized);
- if (probe_exprs.size() == 1) {
- auto is_nullable = probe_exprs[0]->root()->is_nullable();
- PrimitiveType type = probe_exprs[0]->root()->result_type();
- switch (type) {
- case TYPE_TINYINT:
- case TYPE_BOOLEAN:
- case TYPE_SMALLINT:
- case TYPE_INT:
- case TYPE_FLOAT:
- case TYPE_DATEV2:
- case TYPE_BIGINT:
- case TYPE_DOUBLE:
- case TYPE_DATE:
- case TYPE_DATETIME:
- case TYPE_DATETIMEV2:
- case TYPE_LARGEINT:
- case TYPE_DECIMALV2:
- case TYPE_DECIMAL32:
- case TYPE_DECIMAL64:
- case TYPE_DECIMAL128I: {
- size_t size = get_primitive_type_size(type);
- if (size == 1) {
- t = Type::int8_key;
- } else if (size == 2) {
- t = Type::int16_key;
- } else if (size == 4) {
- t = Type::int32_key;
- } else if (size == 8) {
- t = Type::int64_key;
- } else if (size == 16) {
- t = Type::int128_key;
- } else {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "meet invalid type size, size={}, type={}",
size,
- type_to_string(type));
+ try {
+ if (probe_exprs.size() == 1) {
+ auto is_nullable = probe_exprs[0]->root()->is_nullable();
+ PrimitiveType type = probe_exprs[0]->root()->result_type();
+ switch (type) {
+ case TYPE_TINYINT:
+ case TYPE_BOOLEAN:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_FLOAT:
+ case TYPE_DATEV2:
+ case TYPE_BIGINT:
+ case TYPE_DOUBLE:
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_DATETIMEV2:
+ case TYPE_LARGEINT:
+ case TYPE_DECIMALV2:
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ size_t size = get_primitive_type_size(type);
+ if (size == 1) {
+ t = Type::int8_key;
+ } else if (size == 2) {
+ t = Type::int16_key;
+ } else if (size == 4) {
+ t = Type::int32_key;
+ } else if (size == 8) {
+ t = Type::int64_key;
+ } else if (size == 16) {
+ t = Type::int128_key;
+ } else {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "meet invalid type size, size={},
type={}", size,
+ type_to_string(type));
+ }
+ break;
+ }
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ t = Type::string_key;
+ break;
+ }
+ default:
+ t = Type::serialized;
}
- break;
- }
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- t = Type::string_key;
- break;
- }
- default:
- t = Type::serialized;
- }
- agg_data->init(get_hash_key_type_with_phase(t, !is_first_phase),
is_nullable);
- } else {
- if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32, Data>(
- agg_data->method_variant, probe_exprs)) {
- agg_data->init(Type::serialized);
+ agg_data->init(get_hash_key_type_with_phase(t, !is_first_phase),
is_nullable);
+ } else {
+ if (!try_get_hash_map_context_fixed<PHNormalHashMap, HashCRC32,
Data>(
+ agg_data->method_variant, probe_exprs)) {
+ agg_data->init(Type::serialized);
+ }
}
+ } catch (const Exception& e) {
+ // method_variant may meet valueless_by_exception, so we set it to
monostate
+ agg_data->method_variant.template emplace<std::monostate>();
+ return e.to_status();
}
- if (agg_data->method_variant.valueless_by_exception()) {
- agg_data->method_variant.template emplace<std::monostate>();
- return false;
+ CHECK(!agg_data->method_variant.valueless_by_exception());
+
+ if (agg_data->method_variant.index() == 0) { // index is 0 means variant
is monostate
+ return Status::InternalError("agg_data->method_variant init failed");
}
- return true;
+ return Status::OK();
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 3a66773cf03..05954a864de 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -162,9 +162,7 @@ Status AggregationNode::init(const TPlanNode& tnode,
RuntimeState* state) {
}
Status AggregationNode::_init_hash_method(const VExprContextSPtrs&
probe_exprs) {
- if (!init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase)) {
- return Status::InternalError("init hash method failed");
- }
+ RETURN_IF_ERROR(init_agg_hash_method(_agg_data.get(), probe_exprs,
_is_first_phase));
return Status::OK();
}
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp
b/be/src/vec/exec/vpartition_sort_node.cpp
index 7788d58955e..15d8124c653 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -403,9 +403,8 @@ void VPartitionSortNode::release_resource(RuntimeState*
state) {
}
Status VPartitionSortNode::_init_hash_method() {
- if (!init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true)) {
- return Status::InternalError("init hash method failed");
- }
+ RETURN_IF_ERROR(
+ init_partition_hash_method(_partitioned_data.get(),
_partition_expr_ctxs, true));
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]