This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2ea0e0f3045 [opt](nereids) use one phase aggregate for group_concat
with order #53066 (#53207)
2ea0e0f3045 is described below
commit 2ea0e0f30451e9c9a9e5ca72ba3ae23f72b88a5d
Author: 924060929 <[email protected]>
AuthorDate: Fri Jul 18 15:55:55 2025 +0800
[opt](nereids) use one phase aggregate for group_concat with order #53066
(#53207)
cherry pick from #53066
---
.../glue/translator/PhysicalPlanTranslator.java | 10 ++--------
.../glue/translator/PlanTranslatorContext.java | 13 ------------
.../rules/implementation/AggregateStrategies.java | 18 ++++++++++++++---
.../functions/agg/AggregateFunction.java | 4 ++++
.../expressions/functions/agg/GroupConcat.java | 23 ++++++++++++++++++++++
.../java/org/apache/doris/qe/SessionVariable.java | 13 ++++++++++++
.../suites/nereids_syntax_p0/group_concat.groovy | 16 +++++++++++++++
7 files changed, 73 insertions(+), 24 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 2c7e1988c48..7ffe57168c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -302,8 +302,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
List<List<Expr>> distributeExprLists = getDistributeExprs(child);
// TODO: why need set streaming here? should remove this.
if (inputFragment.getPlanRoot() instanceof AggregationNode
- && child instanceof PhysicalHashAggregate
- && context.getFirstAggregateInFragment(inputFragment) ==
child) {
+ && child instanceof PhysicalHashAggregate) {
PhysicalHashAggregate<?> hashAggregate =
(PhysicalHashAggregate<?>) child;
if (hashAggregate.getAggPhase() == AggPhase.LOCAL
&& hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER
@@ -1027,17 +1026,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
default:
throw new RuntimeException("Unsupported agg phase: " +
aggregate.getAggPhase());
}
- // TODO: use to set useStreamingAgg, we should remove it by set it in
Nereids
- PhysicalHashAggregate firstAggregateInFragment =
context.getFirstAggregateInFragment(inputPlanFragment);
- if (firstAggregateInFragment == null) {
- context.setFirstAggregateInFragment(inputPlanFragment, aggregate);
- }
// in pipeline engine, we use parallel scan by default, but it broke
the rule of data distribution
// so, if we do final phase or merge without exchange.
// we need turn of parallel scan to ensure to get correct result.
PlanNode leftMostNode = inputPlanFragment.getPlanRoot();
- while (leftMostNode.getChildren().size() != 0 && !(leftMostNode
instanceof ExchangeNode)) {
+ while (!leftMostNode.getChildren().isEmpty() && !(leftMostNode
instanceof ExchangeNode)) {
leftMostNode = leftMostNode.getChild(0);
}
// TODO: nereids forbid all parallel scan under aggregate temporary,
because nereids could generate
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 64a015dd5d2..227dca12a5e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -37,7 +37,6 @@ import
org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.PlanFragment;
@@ -54,7 +53,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -96,9 +94,6 @@ public class PlanTranslatorContext {
private final IdGenerator<PlanNodeId> nodeIdGenerator =
PlanNodeId.createGenerator();
- private final IdentityHashMap<PlanFragment, PhysicalHashAggregate>
firstAggInFragment
- = new IdentityHashMap<>();
-
private final Map<ExprId, SlotRef> bufferedSlotRefForWindow =
Maps.newHashMap();
private TupleDescriptor bufferedTupleForWindow = null;
@@ -257,14 +252,6 @@ public class PlanTranslatorContext {
return scanNodes;
}
- public PhysicalHashAggregate getFirstAggregateInFragment(PlanFragment
planFragment) {
- return firstAggInFragment.get(planFragment);
- }
-
- public void setFirstAggregateInFragment(PlanFragment planFragment,
PhysicalHashAggregate aggregate) {
- firstAggInFragment.put(planFragment, aggregate);
- }
-
public Map<ExprId, SlotRef> getBufferedSlotRefForWindow() {
return bufferedSlotRefForWindow;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 0539a08d3d7..ace11cd3eb3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -919,7 +919,16 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
private List<PhysicalHashAggregate<Plan>> onePhaseAggregateWithoutDistinct(
LogicalAggregate<? extends Plan> logicalAgg, ConnectContext
connectContext) {
RequireProperties requireGather =
RequireProperties.of(PhysicalProperties.GATHER);
- AggregateParam inputToResultParam = AggregateParam.LOCAL_RESULT;
+ boolean canBeBanned = true;
+ for (AggregateFunction aggregateFunction :
logicalAgg.getAggregateFunctions()) {
+ if (aggregateFunction.forceSkipRegulator(AggregatePhase.ONE)) {
+ canBeBanned = false;
+ break;
+ }
+ }
+ AggregateParam inputToResultParam = new AggregateParam(
+ AggregateParam.LOCAL_RESULT.aggPhase,
AggregateParam.LOCAL_RESULT.aggMode, canBeBanned
+ );
List<NamedExpression> newOutput =
ExpressionUtils.rewriteDownShortCircuit(
logicalAgg.getOutputExpressions(), outputChild -> {
if (outputChild instanceof AggregateFunction) {
@@ -935,8 +944,11 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
if (logicalAgg.getGroupByExpressions().isEmpty()) {
// TODO: usually bad, disable it until we could do better cost
computation.
- // return ImmutableList.of(gatherLocalAgg);
- return ImmutableList.of();
+ if (!canBeBanned) {
+ return ImmutableList.of(gatherLocalAgg);
+ } else {
+ return ImmutableList.of();
+ }
} else {
RequireProperties requireHash = RequireProperties.of(
PhysicalProperties.createHash(logicalAgg.getGroupByExpressions(),
ShuffleType.REQUIRE));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
index 7aa5da02ff3..ce8ec70e765 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/AggregateFunction.java
@@ -66,6 +66,10 @@ public abstract class AggregateFunction extends
BoundFunction implements Expects
return withDistinctAndChildren(distinct, children);
}
+ public boolean forceSkipRegulator(AggregatePhase aggregatePhase) {
+ return false;
+ }
+
public abstract AggregateFunction withDistinctAndChildren(boolean
distinct, List<Expression> children);
/** getIntermediateTypes */
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
index 2505329b2fe..2157deeeb4d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/GroupConcat.java
@@ -27,6 +27,7 @@ import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -133,6 +134,28 @@ public class GroupConcat extends NullableAggregateFunction
return SIGNATURES;
}
+ @Override
+ public boolean supportAggregatePhase(AggregatePhase aggregatePhase) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().useOnePhaseAggForGroupConcatWithOrder
+ &&
children.stream().anyMatch(OrderExpression.class::isInstance)
+ && aggregatePhase != AggregatePhase.ONE) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean forceSkipRegulator(AggregatePhase aggregatePhase) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().useOnePhaseAggForGroupConcatWithOrder
+ &&
children.stream().anyMatch(OrderExpression.class::isInstance)
+ && aggregatePhase == AggregatePhase.ONE) {
+ return true;
+ }
+ return false;
+ }
+
public MultiDistinctGroupConcat convertToMultiDistinct() {
Preconditions.checkArgument(distinct,
"can't convert to multi_distinct_group_concat because there is
no distinct args");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3a7ee7c8f2e..6400ae19a7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -410,6 +410,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_UNICODE_NAME_SUPPORT =
"enable_unicode_name_support";
public static final String GROUP_CONCAT_MAX_LEN = "group_concat_max_len";
+ public static final String USE_ONE_PHASE_AGG_FOR_GROUP_CONCAT_WITH_ORDER
+ = "use_one_phase_agg_for_group_concat_with_order";
public static final String ENABLE_TWO_PHASE_READ_OPT =
"enable_two_phase_read_opt";
public static final String TOPN_OPT_LIMIT_THRESHOLD =
"topn_opt_limit_threshold";
@@ -1630,6 +1632,17 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = GROUP_CONCAT_MAX_LEN)
public long groupConcatMaxLen = 2147483646;
+ @VariableMgr.VarAttr(
+ name = USE_ONE_PHASE_AGG_FOR_GROUP_CONCAT_WITH_ORDER,
+ needForward = true,
+ fuzzy = true,
+ description = {
+ "允许使用一阶段聚合来执行带有order的group_concat函数",
+ "Enable to use one stage aggregation to execute the
group_concat function with order"
+ }
+ )
+ public boolean useOnePhaseAggForGroupConcatWithOrder = false;
+
// Whether enable two phase read optimization
// 1. read related rowids along with necessary column data
// 2. spawn fetch RPC to other nodes to get related data by sorted rowids
diff --git a/regression-test/suites/nereids_syntax_p0/group_concat.groovy
b/regression-test/suites/nereids_syntax_p0/group_concat.groovy
index b46091616ba..ab50cb74e96 100644
--- a/regression-test/suites/nereids_syntax_p0/group_concat.groovy
+++ b/regression-test/suites/nereids_syntax_p0/group_concat.groovy
@@ -90,5 +90,21 @@ suite("group_concat") {
LEFT OUTER JOIN test_group_concat_distinct_tbl3 tbl3 ON
tbl3.tbl3_id2 = tbl2.tbl2_id2
GROUP BY tbl1.tbl1_id1
"""
+
+ sql "set use_one_phase_agg_for_group_concat_with_order=true"
+
+ explain {
+ sql "select group_concat(tbl3_name, ',' order by tbl3_id2) FROM
test_group_concat_distinct_tbl3"
+ check { explainStr ->
+ assertFalse(explainStr.contains("partial_group_concat"))
+ }
+ }
+
+ explain {
+ sql "select group_concat(tbl3_name, ',' order by tbl3_id2) FROM
test_group_concat_distinct_tbl3 group by tbl3_name"
+ check { explainStr ->
+ assertFalse(explainStr.contains("partial_group_concat"))
+ }
+ }
}()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]