This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2ea0e0f3045 [opt](nereids) use one phase aggregate for group_concat 
with order #53066 (#53207)
2ea0e0f3045 is described below

commit 2ea0e0f30451e9c9a9e5ca72ba3ae23f72b88a5d
Author: 924060929 <[email protected]>
AuthorDate: Fri Jul 18 15:55:55 2025 +0800

    [opt](nereids) use one phase aggregate for group_concat with order #53066 
(#53207)
    
    cherry pick from #53066
---
 .../glue/translator/PhysicalPlanTranslator.java    | 10 ++--------
 .../glue/translator/PlanTranslatorContext.java     | 13 ------------
 .../rules/implementation/AggregateStrategies.java  | 18 ++++++++++++++---
 .../functions/agg/AggregateFunction.java           |  4 ++++
 .../expressions/functions/agg/GroupConcat.java     | 23 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 13 ++++++++++++
 .../suites/nereids_syntax_p0/group_concat.groovy   | 16 +++++++++++++++
 7 files changed, 73 insertions(+), 24 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 2c7e1988c48..7ffe57168c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -302,8 +302,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         List<List<Expr>> distributeExprLists = getDistributeExprs(child);
         // TODO: why need set streaming here? should remove this.
         if (inputFragment.getPlanRoot() instanceof AggregationNode
-                && child instanceof PhysicalHashAggregate
-                && context.getFirstAggregateInFragment(inputFragment) == 
child) {
+                && child instanceof PhysicalHashAggregate) {
             PhysicalHashAggregate<?> hashAggregate = 
(PhysicalHashAggregate<?>) child;
             if (hashAggregate.getAggPhase() == AggPhase.LOCAL
                     && hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER
@@ -1027,17 +1026,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             default:
                 throw new RuntimeException("Unsupported agg phase: " + 
aggregate.getAggPhase());
         }
-        // TODO: use to set useStreamingAgg, we should remove it by set it in 
Nereids
-        PhysicalHashAggregate firstAggregateInFragment = 
context.getFirstAggregateInFragment(inputPlanFragment);
-        if (firstAggregateInFragment == null) {
-            context.setFirstAggregateInFragment(inputPlanFragment, aggregate);
-        }
 
         // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
         // so, if we do final phase or merge without exchange.
         // we need turn of parallel scan to ensure to get correct result.
         PlanNode leftMostNode = inputPlanFragment.getPlanRoot();
-        while (leftMostNode.getChildren().size() != 0 && !(leftMostNode 
instanceof ExchangeNode)) {
+        while (!leftMostNode.getChildren().isEmpty() && !(leftMostNode 
instanceof ExchangeNode)) {
             leftMostNode = leftMostNode.getChild(0);
         }
         // TODO: nereids forbid all parallel scan under aggregate temporary, 
because nereids could generate
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 64a015dd5d2..227dca12a5e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -37,7 +37,6 @@ import 
org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.planner.CTEScanNode;
 import org.apache.doris.planner.PlanFragment;
@@ -54,7 +53,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -96,9 +94,6 @@ public class PlanTranslatorContext {
 
     private final IdGenerator<PlanNodeId> nodeIdGenerator = 
PlanNodeId.createGenerator();
 
-    private final IdentityHashMap<PlanFragment, PhysicalHashAggregate> 
firstAggInFragment
-            = new IdentityHashMap<>();
-
     private final Map<ExprId, SlotRef> bufferedSlotRefForWindow = 
Maps.newHashMap();
     private TupleDescriptor bufferedTupleForWindow = null;
 
@@ -257,14 +252,6 @@ public class PlanTranslatorContext {
         return scanNodes;
     }
 
-    public PhysicalHashAggregate getFirstAggregateInFragment(PlanFragment 
planFragment) {
-        return firstAggInFragment.get(planFragment);
-    }
-
-    public void setFirstAggregateInFragment(PlanFragment planFragment, 
PhysicalHashAggregate aggregate) {
-        firstAggInFragment.put(planFragment, aggregate);
-    }
-
     public Map<ExprId, SlotRef> getBufferedSlotRefForWindow() {
         return bufferedSlotRefForWindow;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 0539a08d3d7..ace11cd3eb3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -919,7 +919,16 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
     private List<PhysicalHashAggregate<Plan>> onePhaseAggregateWithoutDistinct(
             LogicalAggregate<? extends Plan> logicalAgg, ConnectContext 
connectContext) {
         RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
-        AggregateParam inputToResultParam = AggregateParam.LOCAL_RESULT;
+        boolean canBeBanned = true;
+        for (AggregateFunction aggregateFunction : 
logicalAgg.getAggregateFunctions()) {
+            if (aggregateFunction.forceSkipRegulator(AggregatePhase.ONE)) {
+                canBeBanned = false;
+                break;
+            }
+        }
+        AggregateParam inputToResultParam = new AggregateParam(
+                AggregateParam.LOCAL_RESULT.aggPhase, 
AggregateParam.LOCAL_RESULT.aggMode, canBeBanned
+        );
         List<NamedExpression> newOutput = 
ExpressionUtils.rewriteDownShortCircuit(
                 logicalAgg.getOutputExpressions(), outputChild -> {
                     if (outputChild instanceof AggregateFunction) {
@@ -935,8 +944,11 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
 
         if (logicalAgg.getGroupByExpressions().isEmpty()) {
             // TODO: usually bad, disable it until we could do better cost 
computation.
-            // return ImmutableList.of(gatherLocalAgg);
-            return ImmutableList.of();
+            if (!canBeBanned) {
+                return ImmutableList.of(gatherLocalAgg);
+            } else {
+                return ImmutableList.of();
+            }
         } else {
             RequireProperties requireHash = RequireProperties.of(
                     
PhysicalProperties.createHash(logicalAgg.getGroupByExpressions(), 
ShuffleType.REQUIRE));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
index 7aa5da02ff3..ce8ec70e765 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
@@ -66,6 +66,10 @@ public abstract class AggregateFunction extends 
BoundFunction implements Expects
         return withDistinctAndChildren(distinct, children);
     }
 
+    public boolean forceSkipRegulator(AggregatePhase aggregatePhase) {
+        return false;
+    }
+
     public abstract AggregateFunction withDistinctAndChildren(boolean 
distinct, List<Expression> children);
 
     /** getIntermediateTypes */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
index 2505329b2fe..2157deeeb4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
@@ -27,6 +27,7 @@ import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.types.VarcharType;
 import org.apache.doris.nereids.types.coercion.AnyDataType;
 import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -133,6 +134,28 @@ public class GroupConcat extends NullableAggregateFunction
         return SIGNATURES;
     }
 
+    @Override
+    public boolean supportAggregatePhase(AggregatePhase aggregatePhase) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && 
connectContext.getSessionVariable().useOnePhaseAggForGroupConcatWithOrder
+                && 
children.stream().anyMatch(OrderExpression.class::isInstance)
+                && aggregatePhase != AggregatePhase.ONE) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean forceSkipRegulator(AggregatePhase aggregatePhase) {
+        ConnectContext connectContext = ConnectContext.get();
+        if (connectContext != null && 
connectContext.getSessionVariable().useOnePhaseAggForGroupConcatWithOrder
+                && 
children.stream().anyMatch(OrderExpression.class::isInstance)
+                && aggregatePhase == AggregatePhase.ONE) {
+            return true;
+        }
+        return false;
+    }
+
     public MultiDistinctGroupConcat convertToMultiDistinct() {
         Preconditions.checkArgument(distinct,
                 "can't convert to multi_distinct_group_concat because there is 
no distinct args");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3a7ee7c8f2e..6400ae19a7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -410,6 +410,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_UNICODE_NAME_SUPPORT = 
"enable_unicode_name_support";
 
     public static final String GROUP_CONCAT_MAX_LEN = "group_concat_max_len";
+    public static final String USE_ONE_PHASE_AGG_FOR_GROUP_CONCAT_WITH_ORDER
+            = "use_one_phase_agg_for_group_concat_with_order";
 
     public static final String ENABLE_TWO_PHASE_READ_OPT = 
"enable_two_phase_read_opt";
     public static final String TOPN_OPT_LIMIT_THRESHOLD = 
"topn_opt_limit_threshold";
@@ -1630,6 +1632,17 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = GROUP_CONCAT_MAX_LEN)
     public long groupConcatMaxLen = 2147483646;
 
+    @VariableMgr.VarAttr(
+            name = USE_ONE_PHASE_AGG_FOR_GROUP_CONCAT_WITH_ORDER,
+            needForward = true,
+            fuzzy = true,
+            description = {
+                    "允许使用一阶段聚合来执行带有order的group_concat函数",
+                    "Enable to use one stage aggregation to execute the 
group_concat function with order"
+            }
+    )
+    public boolean useOnePhaseAggForGroupConcatWithOrder = false;
+
     // Whether enable two phase read optimization
     // 1. read related rowids along with necessary column data
     // 2. spawn fetch RPC to other nodes to get related data by sorted rowids
diff --git a/regression-test/suites/nereids_syntax_p0/group_concat.groovy 
b/regression-test/suites/nereids_syntax_p0/group_concat.groovy
index b46091616ba..ab50cb74e96 100644
--- a/regression-test/suites/nereids_syntax_p0/group_concat.groovy
+++ b/regression-test/suites/nereids_syntax_p0/group_concat.groovy
@@ -90,5 +90,21 @@ suite("group_concat") {
              LEFT OUTER JOIN test_group_concat_distinct_tbl3 tbl3 ON 
tbl3.tbl3_id2 = tbl2.tbl2_id2
              GROUP BY tbl1.tbl1_id1
            """
+
+        sql "set use_one_phase_agg_for_group_concat_with_order=true"
+
+        explain {
+            sql "select group_concat(tbl3_name, ',' order by tbl3_id2) FROM 
test_group_concat_distinct_tbl3"
+            check { explainStr ->
+                assertFalse(explainStr.contains("partial_group_concat"))
+            }
+        }
+
+        explain {
+            sql "select group_concat(tbl3_name, ',' order by tbl3_id2) FROM 
test_group_concat_distinct_tbl3 group by tbl3_name"
+            check { explainStr ->
+                assertFalse(explainStr.contains("partial_group_concat"))
+            }
+        }
     }()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to