This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0325fa436e [fix](agg)Add field of 'is_first_phase' in TAggregationNode 
(#11321)
0325fa436e is described below

commit 0325fa436e414b089388dc92327b6e0db0dc97c7
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Aug 1 11:49:50 2022 +0800

    [fix](agg)Add field of 'is_first_phase' in TAggregationNode (#11321)
---
 be/src/vec/exec/vaggregation_node.cpp              | 72 +++++++++++-----------
 be/src/vec/exec/vaggregation_node.h                |  1 +
 .../org/apache/doris/analysis/AggregateInfo.java   |  4 ++
 .../org/apache/doris/planner/AggregationNode.java  |  1 +
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 5 files changed, 44 insertions(+), 35 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 54f6b8d15a..6de36c6d1e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -102,6 +102,8 @@ AggregationNode::AggregationNode(ObjectPool* pool, const 
TPlanNode& tnode,
     } else {
         _is_streaming_preagg = false;
     }
+
+    _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
 }
 
 AggregationNode::~AggregationNode() = default;
@@ -142,26 +144,26 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         case TYPE_INT:
         case TYPE_FLOAT:
         case TYPE_DATEV2:
-            if (_is_merge)
-                _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, 
is_nullable);
-            else
+            if (_is_first_phase)
                 _agg_data.init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
+            else
+                _agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, 
is_nullable);
             return;
         case TYPE_BIGINT:
         case TYPE_DOUBLE:
         case TYPE_DATE:
         case TYPE_DATETIME:
         case TYPE_DATETIMEV2:
-            if (_is_merge)
-                _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, 
is_nullable);
-            else
+            if (_is_first_phase)
                 _agg_data.init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
+            else
+                _agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, 
is_nullable);
             return;
         case TYPE_LARGEINT: {
-            if (_is_merge)
-                
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
-            else
+            if (_is_first_phase)
                 _agg_data.init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
+            else
+                
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
             return;
         }
         case TYPE_DECIMALV2:
@@ -175,20 +177,20 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
                                         : type_ptr->get_type_id();
             WhichDataType which(idx);
             if (which.is_decimal32()) {
-                if (_is_merge)
-                    
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
-                else
+                if (_is_first_phase)
                     _agg_data.init(AggregatedDataVariants::Type::int32_key, 
is_nullable);
-            } else if (which.is_decimal64()) {
-                if (_is_merge)
-                    
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
                 else
+                    
_agg_data.init(AggregatedDataVariants::Type::int32_key_phase2, is_nullable);
+            } else if (which.is_decimal64()) {
+                if (_is_first_phase)
                     _agg_data.init(AggregatedDataVariants::Type::int64_key, 
is_nullable);
-            } else {
-                if (_is_merge)
-                    
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
                 else
+                    
_agg_data.init(AggregatedDataVariants::Type::int64_key_phase2, is_nullable);
+            } else {
+                if (_is_first_phase)
                     _agg_data.init(AggregatedDataVariants::Type::int128_key, 
is_nullable);
+                else
+                    
_agg_data.init(AggregatedDataVariants::Type::int128_key_phase2, is_nullable);
             }
             return;
         }
@@ -229,38 +231,38 @@ void 
AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
         if (use_fixed_key) {
             if (has_null) {
                 if (std::tuple_size<KeysNullMap<UInt64>>::value + 
key_byte_size <= sizeof(UInt64)) {
-                    if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
-                    else
+                    if (_is_first_phase)
                         
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
+                    else
+                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
                 } else if (std::tuple_size<KeysNullMap<UInt128>>::value + 
key_byte_size <=
                            sizeof(UInt128)) {
-                    if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
-                    else
+                    if (_is_first_phase)
                         
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
-                } else {
-                    if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
                     else
+                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
+                } else {
+                    if (_is_first_phase)
                         
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                    else
+                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
                 }
             } else {
                 if (key_byte_size <= sizeof(UInt64)) {
-                    if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
-                    else
+                    if (_is_first_phase)
                         
_agg_data.init(AggregatedDataVariants::Type::int64_keys, has_null);
-                } else if (key_byte_size <= sizeof(UInt128)) {
-                    if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
                     else
+                        
_agg_data.init(AggregatedDataVariants::Type::int64_keys_phase2, has_null);
+                } else if (key_byte_size <= sizeof(UInt128)) {
+                    if (_is_first_phase)
                         
_agg_data.init(AggregatedDataVariants::Type::int128_keys, has_null);
+                    else
+                        
_agg_data.init(AggregatedDataVariants::Type::int128_keys_phase2, has_null);
                 } else {
                     if (_is_merge)
-                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
-                    else
                         
_agg_data.init(AggregatedDataVariants::Type::int256_keys, has_null);
+                    else
+                        
_agg_data.init(AggregatedDataVariants::Type::int256_keys_phase2, has_null);
                 }
             }
         } else {
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 53eecb025f..478cd563ca 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -647,6 +647,7 @@ private:
 
     bool _needs_finalize;
     bool _is_merge;
+    bool _is_first_phase;
     std::unique_ptr<MemPool> _mem_pool;
 
     std::unique_ptr<MemTracker> _data_mem_tracker;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
index 93a2cfcaa3..cd38b4ee52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java
@@ -373,6 +373,10 @@ public final class AggregateInfo extends AggregateInfoBase 
{
         return aggPhase.isMerge();
     }
 
+    public boolean isFirstPhase() {
+        return aggPhase == AggPhase.FIRST;
+    }
+
     public boolean isDistinctAgg() {
         return secondPhaseDistinctAggInfo != null;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4cdd1aceee..c8c89a6fa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -272,6 +272,7 @@ public class AggregationNode extends PlanNode {
                 aggInfo.getOutputTupleId().asInt(), needsFinalize);
         msg.agg_node.setAggSortInfos(aggSortInfos);
         msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
+        msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
         List<Expr> groupingExprs = aggInfo.getGroupingExprs();
         if (groupingExprs != null) {
             msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index e3cc0679b8..72967bf07c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -545,6 +545,7 @@ struct TAggregationNode {
   5: required bool need_finalize
   6: optional bool use_streaming_preaggregation
   7: optional list<TSortInfo> agg_sort_infos
+  8: optional bool is_first_phase;
 }
 
 struct TRepeatNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to