This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 99afe889052 branch-4.0: [fix](enforcer) shuffle if has continuous
project or filter on cte consumer #58964 (#59046)
99afe889052 is described below
commit 99afe8890528678855cde4d18f294c36dd4c45e7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 16:13:34 2025 +0800
branch-4.0: [fix](enforcer) shuffle if has continuous project or filter on
cte consumer #58964 (#59046)
Cherry-picked from #58964
Co-authored-by: morrySnow <[email protected]>
---
.../properties/ChildrenPropertiesRegulator.java | 39 +++++
.../ChildrenPropertiesRegulatorTest.java | 167 +++++++++++++++++++++
.../suites/nereids_syntax_p0/cte.groovy | 5 +
3 files changed, 211 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index bc0ae7e9f26..32a5202bee4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -37,6 +37,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
@@ -277,6 +278,17 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
return ImmutableList.of();
}
+ DistributionSpec distributionSpec =
originChildrenProperties.get(0).getDistributionSpec();
+ // process must shuffle
+ if (distributionSpec instanceof DistributionSpecMustShuffle) {
+ Plan child = filter.child();
+ Plan realChild = getChildPhysicalPlan(child);
+ if (realChild instanceof PhysicalProject
+ || realChild instanceof PhysicalFilter
+ || realChild instanceof PhysicalLimit) {
+ visit(filter, context);
+ }
+ }
return ImmutableList.of(originChildrenProperties);
}
@@ -308,6 +320,19 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
}
}
+ private Plan getChildPhysicalPlan(Plan plan) {
+ if (!(plan instanceof GroupPlan)) {
+ return null;
+ }
+ GroupPlan groupPlan = (GroupPlan) plan;
+ if (groupPlan == null || groupPlan.getGroup() == null
+ || groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
+ return null;
+ } else {
+ return
groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
+ }
+ }
+
private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan
groupPlan) {
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
@@ -574,6 +599,20 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
return ImmutableList.of();
}
+ DistributionSpec distributionSpec =
originChildrenProperties.get(0).getDistributionSpec();
+ // process must shuffle
+ if (distributionSpec instanceof DistributionSpecMustShuffle) {
+ Plan child = project.child();
+ Plan realChild = getChildPhysicalPlan(child);
+ if (realChild instanceof PhysicalLimit) {
+ visit(project, context);
+ } else if (realChild instanceof PhysicalProject) {
+ PhysicalProject physicalProject = (PhysicalProject) realChild;
+ if (!project.canMergeChildProjections(physicalProject)) {
+ visit(project, context);
+ }
+ }
+ }
return ImmutableList.of(originChildrenProperties);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
new file mode 100644
index 00000000000..eab66c026a5
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulatorTest.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.cost.Cost;
+import org.apache.doris.nereids.cost.CostCalculator;
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class ChildrenPropertiesRegulatorTest {
+
+ private List<GroupExpression> children;
+ private JobContext mockedJobContext;
+ private List<PhysicalProperties> originOutputChildrenProperties
+ = Lists.newArrayList(PhysicalProperties.MUST_SHUFFLE);
+
+ @BeforeEach
+ public void setUp() {
+ Group childGroup = Mockito.mock(Group.class);
+
Mockito.when(childGroup.getLogicalProperties()).thenReturn(Mockito.mock(LogicalProperties.class));
+ GroupExpression child = Mockito.mock(GroupExpression.class);
+
Mockito.when(child.getOutputProperties(Mockito.any())).thenReturn(PhysicalProperties.MUST_SHUFFLE);
+ Mockito.when(child.getOwnerGroup()).thenReturn(childGroup);
+ Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> lct =
Maps.newHashMap();
+ lct.put(PhysicalProperties.MUST_SHUFFLE, Pair.of(Cost.zero(),
Lists.newArrayList()));
+ Mockito.when(child.getLowestCostTable()).thenReturn(lct);
+ children = Lists.newArrayList(child);
+
+ mockedJobContext = Mockito.mock(JobContext.class);
+
Mockito.when(mockedJobContext.getCascadesContext()).thenReturn(Mockito.mock(CascadesContext.class));
+
+ }
+
+ @Test
+ public void testMustShuffleProjectProjectCanNotMerge() {
+ testMustShuffleProject(PhysicalProject.class,
DistributionSpecExecutionAny.class, false);
+
+ }
+
+ @Test
+ public void testMustShuffleProjectProjectCanMerge() {
+ testMustShuffleProject(PhysicalProject.class,
DistributionSpecMustShuffle.class, true);
+
+ }
+
+ @Test
+ public void testMustShuffleProjectFilter() {
+ testMustShuffleProject(PhysicalFilter.class,
DistributionSpecMustShuffle.class, true);
+
+ }
+
+ @Test
+ public void testMustShuffleProjectLimit() {
+ testMustShuffleProject(PhysicalLimit.class,
DistributionSpecExecutionAny.class, true);
+ }
+
+ public void testMustShuffleProject(Class<? extends Plan> childClazz,
+ Class<? extends DistributionSpec> distributeClazz,
+ boolean canMergeChildProject) {
+ try (MockedStatic<CostCalculator> mockedCostCalculator =
Mockito.mockStatic(CostCalculator.class)) {
+ mockedCostCalculator.when(() ->
CostCalculator.calculateCost(Mockito.any(), Mockito.any(),
+ Mockito.anyList())).thenReturn(Cost.zero());
+ mockedCostCalculator.when(() ->
CostCalculator.addChildCost(Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.anyInt())).thenReturn(Cost.zero());
+
+ // project, cannot merge
+ Plan mockedChild = Mockito.mock(childClazz);
+
Mockito.when(mockedChild.withGroupExpression(Mockito.any())).thenReturn(mockedChild);
+ Group mockedGroup = Mockito.mock(Group.class);
+ List<GroupExpression> physicalExpressions = Lists.newArrayList(new
GroupExpression(mockedChild));
+
Mockito.when(mockedGroup.getPhysicalExpressions()).thenReturn(physicalExpressions);
+ GroupPlan mockedGroupPlan = Mockito.mock(GroupPlan.class);
+ Mockito.when(mockedGroupPlan.getGroup()).thenReturn(mockedGroup);
+ // let AbstractTreeNode's init happy
+ Mockito.when(mockedGroupPlan.getAllChildrenTypes()).thenReturn(new
BitSet());
+ PhysicalProject parentPlan = new
PhysicalProject<>(Lists.newArrayList(), null, mockedGroupPlan);
+ GroupExpression parent = new GroupExpression(parentPlan);
+ parentPlan = parentPlan.withGroupExpression(Optional.of(parent));
+ parentPlan = Mockito.spy(parentPlan);
+
Mockito.doReturn(canMergeChildProject).when(parentPlan).canMergeChildProjections(Mockito.any());
+ parent = Mockito.spy(parent);
+ Mockito.doReturn(parentPlan).when(parent).getPlan();
+ ChildrenPropertiesRegulator regulator = new
ChildrenPropertiesRegulator(parent, children,
+ new ArrayList<>(originOutputChildrenProperties), null,
mockedJobContext);
+ PhysicalProperties result =
regulator.adjustChildrenProperties().get(0).get(0);
+ Assertions.assertInstanceOf(distributeClazz,
result.getDistributionSpec());
+ }
+ }
+
+ @Test
+ public void testMustShuffleFilterProject() {
+ testMustShuffleFilter(PhysicalProject.class);
+ }
+
+ @Test
+ public void testMustShuffleFilterFilter() {
+ testMustShuffleFilter(PhysicalFilter.class);
+ }
+
+ @Test
+ public void testMustShuffleFilterLimit() {
+ testMustShuffleFilter(PhysicalLimit.class);
+ }
+
+ private void testMustShuffleFilter(Class<? extends Plan> childClazz) {
+ try (MockedStatic<CostCalculator> mockedCostCalculator =
Mockito.mockStatic(CostCalculator.class)) {
+ mockedCostCalculator.when(() ->
CostCalculator.calculateCost(Mockito.any(), Mockito.any(),
+ Mockito.anyList())).thenReturn(Cost.zero());
+ mockedCostCalculator.when(() ->
CostCalculator.addChildCost(Mockito.any(), Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.anyInt())).thenReturn(Cost.zero());
+
+ // project, cannot merge
+ Plan mockedChild = Mockito.mock(childClazz);
+
Mockito.when(mockedChild.withGroupExpression(Mockito.any())).thenReturn(mockedChild);
+ Group mockedGroup = Mockito.mock(Group.class);
+ List<GroupExpression> physicalExpressions = Lists.newArrayList(new
GroupExpression(mockedChild));
+
Mockito.when(mockedGroup.getPhysicalExpressions()).thenReturn(physicalExpressions);
+ GroupPlan mockedGroupPlan = Mockito.mock(GroupPlan.class);
+ Mockito.when(mockedGroupPlan.getGroup()).thenReturn(mockedGroup);
+ // let AbstractTreeNode's init happy
+ Mockito.when(mockedGroupPlan.getAllChildrenTypes()).thenReturn(new
BitSet());
+ GroupExpression parent = new GroupExpression(new
PhysicalFilter<>(Sets.newHashSet(), null, mockedGroupPlan));
+ ChildrenPropertiesRegulator regulator = new
ChildrenPropertiesRegulator(parent, children,
+ new ArrayList<>(originOutputChildrenProperties), null,
mockedJobContext);
+ PhysicalProperties result =
regulator.adjustChildrenProperties().get(0).get(0);
+ Assertions.assertInstanceOf(DistributionSpecExecutionAny.class,
result.getDistributionSpec());
+ }
+ }
+}
diff --git a/regression-test/suites/nereids_syntax_p0/cte.groovy
b/regression-test/suites/nereids_syntax_p0/cte.groovy
index d731aecf980..f6d6ca618a8 100644
--- a/regression-test/suites/nereids_syntax_p0/cte.groovy
+++ b/regression-test/suites/nereids_syntax_p0/cte.groovy
@@ -334,5 +334,10 @@ suite("cte") {
sql """
WITH cte_0 AS ( SELECT 1 AS a ), cte_1 AS ( SELECT 1 AS a ) select *
from cte_0, cte_1 union select * from cte_0, cte_1
"""
+
+ // test more than one project on cte consumer
+ sql """
+ with a as (select 1 c1) select *, uuid() from a union all select c2,
c2 from (select c1 + 1, uuid() c2 from a) x ;
+ """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]