This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6512893257 [refactor](vectorized) Remove useless control variables to
simplify aggregation node code (#22026)
6512893257 is described below
commit 6512893257a9c595a576d61af5fc6a82a7e352fd
Author: ZenoYang <[email protected]>
AuthorDate: Fri Jul 21 12:45:23 2023 +0800
[refactor](vectorized) Remove useless control variables to simplify
aggregation node code (#22026)
* [refactor](vectorized) Remove useless control variables to simplify
aggregation node code
* fix
---
be/src/vec/exec/vaggregation_node.cpp | 178 +++++----------------
be/src/vec/exec/vaggregation_node.h | 15 +-
.../org/apache/doris/planner/AggregationNode.java | 1 -
gensrc/thrift/PlanNodes.thrift | 2 +-
4 files changed, 41 insertions(+), 155 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 5f021e9c95..d7ebfe0a51 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -135,9 +135,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
}
_is_first_phase = tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase;
- _use_fixed_length_serialization_opt =
- tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
- tnode.agg_node.use_fixed_length_serialization_opt;
_agg_data = std::make_unique<AggregatedDataVariants>();
_agg_arena_pool = std::make_unique<Arena>();
}
@@ -709,34 +706,16 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
MutableColumns value_columns(agg_size);
std::vector<DataTypePtr> data_types(agg_size);
// will serialize data to string column
- if (_use_fixed_length_serialization_opt) {
- auto serialize_string_type = std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
- value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
- }
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
- _agg_data->without_key + _offsets_of_aggregate_states[i],
*value_columns[i]);
- }
- } else {
- std::vector<VectorBufferWriter> value_buffer_writers;
- auto serialize_string_type = std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- data_types[i] = serialize_string_type;
- value_columns[i] = serialize_string_type->create_column();
- value_buffer_writers.emplace_back(
- *reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize(
- _agg_data->without_key + _offsets_of_aggregate_states[i],
- value_buffer_writers[i]);
- value_buffer_writers[i].commit();
- }
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+ _agg_data->without_key + _offsets_of_aggregate_states[i],
*value_columns[i]);
}
+
{
ColumnsWithTypeAndName data_with_schema;
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
@@ -774,21 +753,9 @@ Status AggregationNode::_merge_without_key(Block* block) {
}
SCOPED_TIMER(_deserialize_data_timer);
- if (_use_fixed_length_serialization_opt) {
-
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
- _agg_data->without_key +
_offsets_of_aggregate_states[i], *column,
- _agg_arena_pool.get());
- } else {
- const int rows = block->rows();
- for (int j = 0; j < rows; ++j) {
- VectorBufferReader buffer_reader(
- ((ColumnString*)(column.get()))->get_data_at(j));
-
-
_aggregate_evaluators[i]->function()->deserialize_and_merge(
- _agg_data->without_key +
_offsets_of_aggregate_states[i], buffer_reader,
- _agg_arena_pool.get());
- }
- }
+
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
+ _agg_data->without_key + _offsets_of_aggregate_states[i],
*column,
+ _agg_arena_pool.get());
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key +
_offsets_of_aggregate_states[i],
@@ -1127,56 +1094,28 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
std::vector<DataTypePtr> data_types;
MutableColumns value_columns;
- if (_use_fixed_length_serialization_opt) {
- for (int i = 0; i < _aggregate_evaluators.size();
++i) {
- auto data_type =
-
_aggregate_evaluators[i]->function()->get_serialized_type();
- if (mem_reuse) {
- value_columns.emplace_back(
-
std::move(*out_block->get_by_position(i + key_size)
- .column)
- .mutate());
- } else {
- // slot type of value it should always be
string type
-
value_columns.emplace_back(_aggregate_evaluators[i]
-
->function()
-
->create_serialize_column());
- }
- data_types.emplace_back(data_type);
- }
-
- for (int i = 0; i != _aggregate_evaluators.size();
++i) {
- SCOPED_TIMER(_serialize_data_timer);
- RETURN_IF_ERROR(
-
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i],
rows,
- _agg_arena_pool.get()));
- }
- } else {
- std::vector<VectorBufferWriter>
value_buffer_writers;
- auto serialize_string_type =
std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size();
++i) {
- if (mem_reuse) {
- value_columns.emplace_back(
-
std::move(*out_block->get_by_position(i + key_size)
- .column)
- .mutate());
- } else {
- // slot type of value it should always be
string type
- value_columns.emplace_back(
-
serialize_string_type->create_column());
- }
- data_types.emplace_back(serialize_string_type);
- value_buffer_writers.emplace_back(
-
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i)
{
+ auto data_type =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ if (mem_reuse) {
+ value_columns.emplace_back(
+
std::move(*out_block->get_by_position(i + key_size).column)
+ .mutate());
+ } else {
+ // slot type of value it should always be
string type
+
value_columns.emplace_back(_aggregate_evaluators[i]
+ ->function()
+
->create_serialize_column());
}
+ data_types.emplace_back(data_type);
+ }
- for (int i = 0; i != _aggregate_evaluators.size();
++i) {
- SCOPED_TIMER(_serialize_data_timer);
-
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize(
- in_block, value_buffer_writers[i],
rows,
- _agg_arena_pool.get()));
- }
+ for (int i = 0; i != _aggregate_evaluators.size();
++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+ RETURN_IF_ERROR(
+
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+ in_block, value_columns[i], rows,
+ _agg_arena_pool.get()));
}
if (!mem_reuse) {
@@ -1233,17 +1172,9 @@ Status
AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context
key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column());
}
- if (_use_fixed_length_serialization_opt) {
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- value_data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
- value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
- }
- } else {
- auto serialize_string_type = std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- value_data_types[i] = serialize_string_type;
- value_columns[i] = serialize_string_type->create_column();
- }
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ value_data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
}
context.init_once();
@@ -1280,21 +1211,9 @@ Status
AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context
++num_rows;
}
- if (_use_fixed_length_serialization_opt) {
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize_to_column(
- _values, _offsets_of_aggregate_states[i],
value_columns[i], num_rows);
- }
- } else {
- std::vector<VectorBufferWriter> value_buffer_writers;
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- value_buffer_writers.emplace_back(
- *reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize_vec(
- _values, _offsets_of_aggregate_states[i],
value_buffer_writers[i], num_rows);
- }
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->serialize_to_column(
+ _values, _offsets_of_aggregate_states[i], value_columns[i],
num_rows);
}
ColumnsWithTypeAndName columns_with_schema;
@@ -1677,7 +1596,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS
}
}
- if (_use_fixed_length_serialization_opt) {
+ {
SCOPED_TIMER(_serialize_data_timer);
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
value_data_types[i] =
@@ -1694,27 +1613,6 @@ Status
AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeS
_values, _offsets_of_aggregate_states[i],
value_columns[i],
num_rows);
}
- } else {
- SCOPED_TIMER(_serialize_data_timer);
- std::vector<VectorBufferWriter> value_buffer_writers;
- auto serialize_string_type =
std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- value_data_types[i] = serialize_string_type;
- if (mem_reuse) {
- value_columns[i] =
- std::move(*block->get_by_position(i +
key_size).column)
- .mutate();
- } else {
- value_columns[i] =
serialize_string_type->create_column();
- }
- value_buffer_writers.emplace_back(
-
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize_vec(
- _values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
- num_rows);
- }
}
},
_agg_data->_aggregated_method_variant);
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index b30d1cfa0c..9d6f4c4979 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -909,7 +909,6 @@ private:
bool _needs_finalize;
bool _is_merge;
bool _is_first_phase;
- bool _use_fixed_length_serialization_opt;
std::unique_ptr<Arena> _agg_profile_arena;
size_t _align_aggregate_states = 1;
@@ -1122,15 +1121,10 @@ private:
_deserialize_buffer.resize(buffer_size);
}
- if (_use_fixed_length_serialization_opt) {
+ {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
_deserialize_buffer.data(), *column,
_agg_arena_pool.get(), rows);
- } else {
- SCOPED_TIMER(_deserialize_data_timer);
- _aggregate_evaluators[i]->function()->deserialize_vec(
- _deserialize_buffer.data(),
(ColumnString*)(column.get()),
- _agg_arena_pool.get(), rows);
}
DEFER({
@@ -1169,15 +1163,10 @@ private:
_deserialize_buffer.resize(buffer_size);
}
- if (_use_fixed_length_serialization_opt) {
+ {
SCOPED_TIMER(_deserialize_data_timer);
_aggregate_evaluators[i]->function()->deserialize_from_column(
_deserialize_buffer.data(), *column,
_agg_arena_pool.get(), rows);
- } else {
- SCOPED_TIMER(_deserialize_data_timer);
- _aggregate_evaluators[i]->function()->deserialize_vec(
- _deserialize_buffer.data(),
(ColumnString*)(column.get()),
- _agg_arena_pool.get(), rows);
}
DEFER({
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index b564bba161..580700303f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -274,7 +274,6 @@ public class AggregationNode extends PlanNode {
msg.agg_node.setAggSortInfos(aggSortInfos);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
- msg.agg_node.setUseFixedLengthSerializationOpt(true);
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 9dacaac889..da19eb4975 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -792,7 +792,7 @@ struct TAggregationNode {
6: optional bool use_streaming_preaggregation
7: optional list<TSortInfo> agg_sort_infos
8: optional bool is_first_phase
- 9: optional bool use_fixed_length_serialization_opt
+ // 9: optional bool use_fixed_length_serialization_opt
}
struct TRepeatNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]