This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b78fd76993c branch-3.1 [fix](nereids) adjust distribute expr lists
after project common sub expression for aggregation #57258 (#57714)
b78fd76993c is described below
commit b78fd76993ce79f8860f130cc2deb250b41a6258
Author: minghong <[email protected]>
AuthorDate: Fri Nov 7 15:34:42 2025 +0800
branch-3.1 [fix](nereids) adjust distribute expr lists after project common
sub expression for aggregation #57258 (#57714)
picked from #57258
---
.../post/ProjectAggregateExpressionsForCse.java | 13 ++-
.../properties/ChildOutputPropertyDeriver.java | 43 +++++---
.../properties/ChildOutputPropertyDeriverTest.java | 26 +++++
.../dist_expr_list/dist_expr_list.groovy | 108 +++++++++++++++++++++
4 files changed, 175 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java
index d7a13148c10..ee7cede4717 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ProjectAggregateExpressionsForCse.java
@@ -18,6 +18,7 @@
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.ChildOutputPropertyDeriver;
import org.apache.doris.nereids.properties.DataTrait;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
@@ -33,6 +34,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.util.ExpressionUtils;
@@ -136,7 +138,11 @@ public class ProjectAggregateExpressionsForCse extends
PlanPostProcessor {
}
}
newProjections.addAll(cseCandidates.values());
- project = project.withProjectionsAndChild(newProjections, (Plan)
project.child());
+
+ project = project.withProjectionsAndChild(newProjections,
project.child());
+ PhysicalProperties projectPhysicalProperties =
ChildOutputPropertyDeriver.computeProjectOutputProperties(
+ project.getProjects(), ((PhysicalPlan)
project.child()).getPhysicalProperties());
+ project =
project.withPhysicalPropertiesAndStats(projectPhysicalProperties,
project.getStats());
aggregate = (PhysicalHashAggregate<? extends Plan>) aggregate
.withAggOutput(aggOutputReplaced)
.withChildren(project);
@@ -153,9 +159,8 @@ public class ProjectAggregateExpressionsForCse extends
PlanPostProcessor {
() -> DataTrait.EMPTY_TRAIT
);
AbstractPhysicalPlan child = ((AbstractPhysicalPlan)
aggregate.child());
- PhysicalProperties projectPhysicalProperties = new
PhysicalProperties(
- child.getPhysicalProperties().getDistributionSpec(),
- child.getPhysicalProperties().getOrderSpec());
+ PhysicalProperties projectPhysicalProperties =
ChildOutputPropertyDeriver.computeProjectOutputProperties(
+ projections, child.getPhysicalProperties());
PhysicalProject<? extends Plan> project = new
PhysicalProject<>(projections, Optional.empty(),
projectLogicalProperties,
projectPhysicalProperties,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 312ecb140c4..3f312b34157 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -319,17 +319,18 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
return new
PhysicalProperties(leftOutputProperty.getDistributionSpec());
}
- @Override
- public PhysicalProperties visitPhysicalProject(PhysicalProject<? extends
Plan> project, PlanContext context) {
- // TODO: order spec do not process since we do not use it.
- Preconditions.checkState(childrenOutputProperties.size() == 1);
- PhysicalProperties childProperties = childrenOutputProperties.get(0);
+ /**
+ * Derive output properties for physical project.
+ */
+ public static PhysicalProperties computeProjectOutputProperties(
+ List<NamedExpression> projects,
+ PhysicalProperties childProperties) {
DistributionSpec childDistributionSpec =
childProperties.getDistributionSpec();
OrderSpec childOrderSpec = childProperties.getOrderSpec();
if (childDistributionSpec instanceof DistributionSpecHash) {
Map<ExprId, ExprId> projections = Maps.newHashMap();
Set<ExprId> obstructions = Sets.newHashSet();
- for (NamedExpression namedExpression : project.getProjects()) {
+ for (NamedExpression namedExpression : projects) {
if (namedExpression instanceof Alias) {
Alias alias = (Alias) namedExpression;
Expression child = alias.child();
@@ -345,22 +346,42 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
.map(NamedExpression::getExprId)
.collect(Collectors.toSet()));
}
+ } else {
+ // namedExpression is slot
+ projections.put(namedExpression.getExprId(),
namedExpression.getExprId());
}
}
- if (projections.entrySet().stream().allMatch(kv ->
kv.getKey().equals(kv.getValue()))) {
- return childrenOutputProperties.get(0);
- }
+
DistributionSpecHash childDistributionSpecHash =
(DistributionSpecHash) childDistributionSpec;
+ boolean canUseChildProperties = true;
+ for (ExprId exprId :
childDistributionSpecHash.getOrderedShuffledColumns()) {
+ if (!projections.containsKey(exprId) ||
!projections.get(exprId).equals(exprId)) {
+ canUseChildProperties = false;
+ break;
+ }
+ }
+
+ if (canUseChildProperties) {
+ return childProperties;
+ }
DistributionSpec defaultAnySpec =
childDistributionSpecHash.getShuffleType() == ShuffleType.NATURAL
? DistributionSpecStorageAny.INSTANCE :
DistributionSpecAny.INSTANCE;
DistributionSpec outputDistributionSpec =
childDistributionSpecHash.project(
projections, obstructions, defaultAnySpec);
return new PhysicalProperties(outputDistributionSpec,
childOrderSpec);
} else {
- return childrenOutputProperties.get(0);
+ return childProperties;
}
}
+ @Override
+ public PhysicalProperties visitPhysicalProject(PhysicalProject<? extends
Plan> project, PlanContext context) {
+ // TODO: order spec do not process since we do not use it.
+ Preconditions.checkState(childrenOutputProperties.size() == 1);
+ PhysicalProperties childProperties = childrenOutputProperties.get(0);
+ return computeProjectOutputProperties(project.getProjects(),
childProperties);
+ }
+
@Override
public PhysicalProperties visitPhysicalRepeat(PhysicalRepeat<? extends
Plan> repeat, PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
@@ -625,7 +646,7 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
return new DistributionSpecHash(anotherSideOrderedExprIds,
oneSideSpec.getShuffleType());
}
- private boolean isSameHashValue(DataType originType, DataType castType) {
+ private static boolean isSameHashValue(DataType originType, DataType
castType) {
if (originType.isStringLikeType() && (castType.isVarcharType() ||
castType.isStringType())
&& (castType.width() >= originType.width() || castType.width()
< 0)) {
return true;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index a0ad68bbc89..a1b80d2e468 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -27,12 +27,15 @@ import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.memo.GroupId;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.AssertNumRowsElement;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.Abs;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.DistributeType;
@@ -911,4 +914,27 @@ class ChildOutputPropertyDeriverTest {
PhysicalProperties result = deriver.getOutputProperties(null,
groupExpression);
Assertions.assertEquals(child, result);
}
+
+ @Test
+ void testComputeProjectOutputProperties() {
+ SlotReference c1 = new SlotReference(
+ new ExprId(1), "c1", TinyIntType.INSTANCE, true,
ImmutableList.of());
+ PhysicalProperties hashC1 = PhysicalProperties.createHash(
+ ImmutableList.of(new ExprId(1)),
ShuffleType.EXECUTION_BUCKETED);
+ List<NamedExpression> projects1 = new ArrayList<>();
+ projects1.add(c1);
+ PhysicalProperties phyProp =
ChildOutputPropertyDeriver.computeProjectOutputProperties(projects1, hashC1);
+ Assertions.assertEquals(hashC1, phyProp);
+
+ List<NamedExpression> projects2 = new ArrayList<>();
+ projects2.add(new Alias(new Abs(c1)));
+ PhysicalProperties phyProp2 =
ChildOutputPropertyDeriver.computeProjectOutputProperties(projects2, hashC1);
+ Assertions.assertEquals(DistributionSpecAny.INSTANCE,
phyProp2.getDistributionSpec());
+
+ List<NamedExpression> projects3 = new ArrayList<>();
+ projects3.add(new Alias(new Abs(c1)));
+ projects3.add(c1);
+ PhysicalProperties phyProp3 =
ChildOutputPropertyDeriver.computeProjectOutputProperties(projects3, hashC1);
+ Assertions.assertEquals(hashC1, phyProp3);
+ }
}
diff --git
a/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
b/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
new file mode 100644
index 00000000000..ae57e4ce6a5
--- /dev/null
+++
b/regression-test/suites/query_p0/common_sub_expression/dist_expr_list/dist_expr_list.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// The cases is copied from https://github.com/trinodb/trino/tree/master
+//
/testing/trino-product-tests/src/main/resources/sql-tests/testcases/aggregate
+// and modified by Doris.
+
+suite("dist_expr_list") {
+ sql """
+ drop table if exists agg_cse_shuffle;
+ create table agg_cse_shuffle(
+ a int, b int, c int
+ )distributed by hash(a) buckets 3
+ properties("replication_num" = "1")
+ ;
+ insert into agg_cse_shuffle values (1, 2, 3), (3, 4, 5);
+ """
+
+ explain {
+ sql """
+ select max(
+ case
+ when (abs(b) > 10) then a+1
+ when (abs(b) < 10) then a+2
+ else NULL
+ end
+ ) x
+ from agg_cse_shuffle
+ where c > 0;
+ """
+ notContains "distribute expr lists: a"
+ }
+ /*
+ after ProjectAggregateExpressionsForCse, a#6 is not output slot of
scanNode, and hence it should not be distr expr.
+
+ expect explain string
+ | 1:VAGGREGATE (update serialize)(114)
|
+ | | output: partial_max(CASE WHEN (abs(b) > 10) THEN (cast(a as BIGINT)
+ 1) WHEN (abs(b) < 10) THEN (cast(a as BIGINT) + 2) ELSE NULL END[#8])[#9]
|
+ | | group by:
|
+ | | sortByGroupKey:false
|
+ | | cardinality=1
|
+ | | distribute expr lists:
|
+ | |
|
+ | 0:VOlapScanNode(95)
+
+
+ the bad case: distribute expr lists: a[#6]
+ explain string
+ | 1:VAGGREGATE (update serialize)(114)
|
+ | | output: partial_max(CASE WHEN (abs(b) > 10) THEN (cast(a as BIGINT)
+ 1) WHEN (abs(b) < 10) THEN (cast(a as BIGINT) + 2) ELSE NULL END[#8])[#9]
|
+ | | group by:
|
+ | | sortByGroupKey:false
|
+ | | cardinality=1
|
+ | | distribute expr lists: a[#6] <==
|
+ | |
|
+ | 0:VOlapScanNode(95)
+ | TABLE: rqg.agg_cse_shuffle(agg_cse_shuffle), PREAGGREGATION: ON
|
+ | partitions=1/1 (agg_cse_shuffle)
|
+ | tablets=3/3, tabletList=1761200234884,1761200234886,1761200234888
|
+ | cardinality=2, avgRowSize=1182.5, numNodes=2
|
+ | pushAggOp=NONE
|
+ | final projections: a[#2], b[#3], CASE WHEN (abs(b)[#4] > 10) THEN
(cast(a as BIGINT)[#5] + 1) WHEN (abs(b)[#4] < 10) THEN (cast(a as BIGINT)[#5]
+ 2) ELSE NULL END |
+ | final project output tuple id: 2
|
+ | intermediate projections: a[#0], b[#1], abs(b[#1]), CAST(a[#0] AS
bigint)
|
+ | intermediate tuple id: 1
+ */
+
+ explain {
+ sql """
+ select max(
+ case
+ when (abs(b) > 10) then a+1
+ when (abs(b) < 10) then a+2
+ else NULL
+ end
+ ),
+ max(a)
+ from agg_cse_shuffle;
+ """
+ contains "distribute expr lists: a"
+ /*
+ expect explain string
+ | 1:VAGGREGATE (update serialize)(100)
|
+ | | output: partial_max(CASE WHEN (abs(b) > 10) THEN (cast(a as
BIGINT) + 1) WHEN (abs(b) < 10) THEN (cast(a as BIGINT) + 2) ELSE NULL
END[#8])[#9], partial_max(a[#6])[#10] |
+ | | group by:
|
+ | | sortByGroupKey:false
|
+ | | cardinality=1
|
+ | | distribute expr lists: a[#6]
|
+ | | tuple ids: 3
|
+ | |
|
+ | 0:VOlapScanNode(81)
+ */
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]