This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0325fa436e [fix](agg)Add field of 'is_first_phase' in TAggregationNode
(#11321)
0325fa436e is described below
commit 0325fa436e414b089388dc92327b6e0db0dc97c7
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Aug 1 11:49:50 2022 +0800
[fix](agg)Add field of 'is_first_phase' in TAggregationNode (#11321)
---
be/src/vec/exec/vaggregation_node.cpp | 72 +++++++++++-----------
be/src/vec/exec/vaggregation_node.h | 1 +
.../org/apache/doris/analysis/AggregateInfo.java | 4 ++
.../org/apache/doris/planner/AggregationNode.java | 1 +
gensrc/thrift/PlanNodes.thrift | 1 +
5 files changed, 44 insertions(+), 35 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 54f6b8d15a..6de36c6d1e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -102,6 +102,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
} else {
_is_streaming_preagg = false;
}
+
+ _is_first_phase = tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase;
}
AggregationNode::~AggregationNode() = default;
@@ -142,26 +144,26 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
- if (_is_merge)
- _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2,
is_nullable);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int32_key,
is_nullable);
+ else
+ _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2,
is_nullable);
return;
case TYPE_BIGINT:
case TYPE_DOUBLE:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_DATETIMEV2:
- if (_is_merge)
- _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2,
is_nullable);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int64_key,
is_nullable);
+ else
+ _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2,
is_nullable);
return;
case TYPE_LARGEINT: {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int128_key,
is_nullable);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
return;
}
case TYPE_DECIMALV2:
@@ -175,20 +177,20 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
: type_ptr->get_type_id();
WhichDataType which(idx);
if (which.is_decimal32()) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int32_key,
is_nullable);
- } else if (which.is_decimal64()) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
else
+
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+ } else if (which.is_decimal64()) {
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int64_key,
is_nullable);
- } else {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
else
+
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+ } else {
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int128_key,
is_nullable);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
}
return;
}
@@ -229,38 +231,38 @@ void
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
if (use_fixed_key) {
if (has_null) {
if (std::tuple_size<KeysNullMap<UInt64>>::value +
key_byte_size <= sizeof(UInt64)) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
} else if (std::tuple_size<KeysNullMap<UInt128>>::value +
key_byte_size <=
sizeof(UInt128)) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
- } else {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
else
+
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+ } else {
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
}
} else {
if (key_byte_size <= sizeof(UInt64)) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
- else
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
- } else if (key_byte_size <= sizeof(UInt128)) {
- if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
else
+
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+ } else if (key_byte_size <= sizeof(UInt128)) {
+ if (_is_first_phase)
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
} else {
if (_is_merge)
-
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
- else
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+ else
+
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
}
}
} else {
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 53eecb025f..478cd563ca 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -647,6 +647,7 @@ private:
bool _needs_finalize;
bool _is_merge;
+ bool _is_first_phase;
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<MemTracker> _data_mem_tracker;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 93a2cfcaa3..cd38b4ee52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -373,6 +373,10 @@ public final class AggregateInfo extends AggregateInfoBase
{
return aggPhase.isMerge();
}
+ public boolean isFirstPhase() {
+ return aggPhase == AggPhase.FIRST;
+ }
+
public boolean isDistinctAgg() {
return secondPhaseDistinctAggInfo != null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4cdd1aceee..c8c89a6fa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -272,6 +272,7 @@ public class AggregationNode extends PlanNode {
aggInfo.getOutputTupleId().asInt(), needsFinalize);
msg.agg_node.setAggSortInfos(aggSortInfos);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
+ msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index e3cc0679b8..72967bf07c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -545,6 +545,7 @@ struct TAggregationNode {
5: required bool need_finalize
6: optional bool use_streaming_preaggregation
7: optional list<TSortInfo> agg_sort_infos
+ 8: optional bool is_first_phase;
}
struct TRepeatNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]