morrySnow commented on code in PR #10659:
URL: https://github.com/apache/doris/pull/10659#discussion_r915560690
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -114,60 +134,96 @@ public PlanFragment visit(Plan plan,
PlanTranslatorContext context) {
* Translate Agg.
*/
@Override
- public PlanFragment visitPhysicalAggregation(
- PhysicalUnaryPlan<PhysicalAggregation, Plan> agg,
PlanTranslatorContext context) {
-
+ public PlanFragment visitPhysicalAggregate(
+ PhysicalUnaryPlan<PhysicalAggregate, Plan> agg,
PlanTranslatorContext context) {
PlanFragment inputPlanFragment = visit(agg.child(0), context);
-
- AggregationNode aggregationNode;
- List<Slot> slotList = new ArrayList<>();
- PhysicalAggregation physicalAggregation = agg.getOperator();
- AggregateInfo.AggPhase phase =
physicalAggregation.getAggPhase().toExec();
-
- List<Expression> groupByExpressionList =
physicalAggregation.getGroupByExprList();
+ PhysicalAggregate physicalAggregate = agg.getOperator();
+
+ // TODO: stale planner generate aggregate tuple in a special way.
tuple include 2 parts:
+ // 1. group by expressions: removing duplicate expressions add to
tuple
+ // 2. agg functions: only removing duplicate agg functions in
output expression should appear in tuple.
+ // e.g. select sum(v1) + 1, sum(v1) + 2 from t1 should only
generate one sum(v1) in tuple
+ // We need:
+ // 1. add a project after agg, if output expressions include agg
function as a expression tree leaf.
+ // 2. introduce canonicalized, semanticEquals and deterministic in
Expression
+ // for removing duplicate.
+ List<Expression> groupByExpressionList =
physicalAggregate.getGroupByExprList();
+ List<NamedExpression> outputExpressionList =
physicalAggregate.getOutputExpressionList();
+
+ // 1. generate slot reference for each group expression
+ List<SlotReference> groupSlotList = Lists.newArrayList();
+ for (Expression e : groupByExpressionList) {
+ if (e instanceof SlotReference &&
outputExpressionList.stream().anyMatch(o -> o.contains(e::equals))) {
+ groupSlotList.add((SlotReference) e);
+ } else {
+ groupSlotList.add(new SlotReference(e.sql(), e.getDataType(),
e.nullable(), Collections.emptyList()));
+ }
+ }
ArrayList<Expr> execGroupingExpressions =
groupByExpressionList.stream()
- // Since output of plan doesn't contain the slots of groupBy,
which is actually needed by
- // the BE execution, so we have to collect them and add to the
slotList to generate corresponding
- // TupleDesc.
- .peek(x ->
slotList.addAll(x.collect(SlotReference.class::isInstance)))
.map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toCollection(ArrayList::new));
- slotList.addAll(agg.getOutput());
- TupleDescriptor outputTupleDesc = generateTupleDesc(slotList, context,
null);
-
- List<NamedExpression> outputExpressionList =
physicalAggregation.getOutputExpressionList();
- ArrayList<FunctionCallExpr> execAggExpressions =
outputExpressionList.stream()
- .map(e ->
e.<List<AggregateFunction>>collect(AggregateFunction.class::isInstance))
+ // 2. collect agg functions and generate agg function to slot
reference map
+ List<Slot> aggFunctionOutput = Lists.newArrayList();
+ List<AggregateFunction> aggregateFunctionList =
outputExpressionList.stream()
+ .filter(o -> o.contains(AggregateFunction.class::isInstance))
+ .peek(o -> aggFunctionOutput.add(o.toSlot()))
+ .map(o -> (List<AggregateFunction>)
o.collect(AggregateFunction.class::isInstance))
.flatMap(List::stream)
+ .collect(Collectors.toList());
+ ArrayList<FunctionCallExpr> execAggExpressions =
aggregateFunctionList.stream()
.map(x -> (FunctionCallExpr) ExpressionTranslator.translate(x,
context))
.collect(Collectors.toCollection(ArrayList::new));
- List<Expression> partitionExpressionList =
physicalAggregation.getPartitionExprList();
+ // 3. generate output tuple
+ // TODO: currently, we only support sum(a), if we want to support
sum(a) + 1, we need to
+ // split merge agg to project(agg) and generate tuple like what first
phase agg do.
+ List<Slot> slotList = Lists.newArrayList();
+ TupleDescriptor outputTupleDesc;
+ if (agg.getOperator().getAggPhase() == AggPhase.FIRST_MERGE) {
+ slotList.addAll(groupSlotList);
+ slotList.addAll(aggFunctionOutput);
+ outputTupleDesc = generateTupleDesc(slotList, null, context);
+ } else {
+ outputTupleDesc = generateTupleDesc(agg.getOutput(), null,
context);
+ }
+
+ // process partition list
+ List<Expression> partitionExpressionList =
physicalAggregate.getPartitionExprList();
List<Expr> execPartitionExpressions = partitionExpressionList.stream()
- .map(e -> (FunctionCallExpr) ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
+ .map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
+ DataPartition mergePartition = DataPartition.UNPARTITIONED;
+ if (CollectionUtils.isNotEmpty(execPartitionExpressions)) {
Review Comment:
yes, u r right
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]