Copilot commented on code in PR #60572:
URL: https://github.com/apache/doris/pull/60572#discussion_r2781753157
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java:
##########
@@ -486,53 +486,163 @@ public int hashCode() {
@Override
public String toString() {
StringBuilder str = new StringBuilder("Group[" + groupId + "]\n");
- str.append(" logical expressions:\n");
- for (GroupExpression logicalExpression : logicalExpressions) {
- str.append(" ").append(logicalExpression).append("\n");
+ // Logical expressions with numbering
+ str.append(" Logical Expressions:\n");
+ if (logicalExpressions.isEmpty()) {
+ str.append(" (none)\n");
+ } else {
+ int index = 1;
+ for (GroupExpression logicalExpression : logicalExpressions) {
+ str.append(" [").append(index++).append("]
").append(logicalExpression).append("\n");
+ }
}
- str.append(" physical expressions:\n");
- for (GroupExpression physicalExpression : physicalExpressions) {
- str.append(" ").append(physicalExpression).append("\n");
+ // Physical expressions with numbering
+ str.append(" Physical Expressions:\n");
+ if (physicalExpressions.isEmpty()) {
+ str.append(" (none)\n");
+ } else {
+ int index = 1;
+ for (GroupExpression physicalExpression : physicalExpressions) {
+ str.append(" [").append(index++).append("]
").append(physicalExpression).append("\n");
+ }
}
- str.append(" enforcers:\n");
+ // Enforcers with numbering
+ str.append(" Enforcers:\n");
List<GroupExpression> enforcerList = enforcers.keySet().stream()
.sorted(java.util.Comparator.comparing(e1 ->
e1.getId().asInt()))
.collect(Collectors.toList());
- for (GroupExpression enforcer : enforcerList) {
- str.append(" ").append(enforcer).append("\n");
+ if (enforcerList.isEmpty()) {
+ str.append(" (none)\n");
+ } else {
+ int index = 1;
+ for (GroupExpression enforcer : enforcerList) {
+ str.append(" [").append(index++).append("]
").append(enforcer).append("\n");
+ }
}
if (!chosenEnforcerIdList.isEmpty()) {
- str.append(" chosen enforcer(id, requiredProperties):\n");
+ str.append(" ⭐ Chosen Enforcer(ID, RequiredProperties):\n");
for (int i = 0; i < chosenEnforcerIdList.size(); i++) {
str.append("
(").append(i).append(")").append(chosenEnforcerIdList.get(i)).append(", ")
-
.append(chosenEnforcerPropertiesList.get(i)).append("\n");
+
.append(formatProperty(chosenEnforcerPropertiesList.get(i))).append("\n");
}
}
if (chosenGroupExpressionId != -1) {
- str.append(" chosen expression id:
").append(chosenGroupExpressionId).append("\n");
- str.append(" chosen properties:
").append(chosenProperties).append("\n");
+ str.append(" ⭐ Chosen Expression ID:
").append(chosenGroupExpressionId).append("\n");
+ str.append(" ⭐ Chosen Properties:
").append(formatProperty(chosenProperties)).append("\n");
}
Review Comment:
This `toString()` output introduces emoji/Unicode symbols (e.g., "⭐") in
diagnostic text. Similar to `Memo.toString()`, this can lead to
encoding/log-parsing issues in some environments. Prefer plain ASCII markers or
gate the decorated output behind a debug option.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java:
##########
@@ -70,18 +70,20 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan>
extends PhysicalUnar
private final boolean maybeUsingStream;
+ private final boolean hasSourceRepeat;
+
Review Comment:
`hasSourceRepeat` is a new stateful attribute but it is not included in
`equals()`/`hashCode()`. Since `GroupExpression.equals()` depends on
`plan.equals()`, aggregates that differ only by `hasSourceRepeat` can be
treated as identical, which can incorrectly deduplicate memo expressions and
affect optimization/correctness. Include `hasSourceRepeat` in both `equals()`
and `hashCode()` (and consider exposing it in `toString()` for debugging).
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ShuffleKeyPruneUtils.java:
##########
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.properties;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.PlanContext;
+import org.apache.doris.nereids.memo.Group;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.Statistics;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**AggShuffleKeyOptimize*/
+public class ShuffleKeyPruneUtils {
+ private static GroupExpression getGroupExpression(Group group) {
+ List<GroupExpression> physicalGroupExpressions =
group.getPhysicalExpressions();
+ if (!physicalGroupExpressions.isEmpty()) {
+ return physicalGroupExpressions.get(0);
+ } else {
+ return group.getLogicalExpressions().get(0);
+ }
+ }
+
+ /*
+ * @param agg is a global aggregate
+ * @return the Statistics of the children of the local aggregate
corresponding to the global aggregate.
+ */
+ private static Optional<Statistics>
getGlobalAggChildStats(PhysicalHashAggregate<? extends Plan> agg) {
+ Optional<GroupExpression> groupExpression = agg.getGroupExpression();
+ if (!groupExpression.isPresent()) {
+ return Optional.empty();
+ }
+ Statistics aggChildStats = groupExpression.get().childStatistics(0);
+ Group childGroup = groupExpression.get().child(0);
+ Plan childExpression = getGroupExpression(childGroup).getPlan();
+ if (childExpression instanceof PhysicalHashAggregate
+ && ((PhysicalHashAggregate)
childExpression).getAggPhase().isLocal()) {
+ childGroup = childGroup.getPhysicalExpressions().get(0).child(0);
+ aggChildStats = childGroup.getStatistics();
+ }
+ return Optional.ofNullable(aggChildStats);
+ }
+
+ private static boolean canAggShuffleKeyOpt(PhysicalHashAggregate<? extends
Plan> agg,
+ ConnectContext connectContext) {
+ if (!connectContext.getSessionVariable().chooseOneAggShuffleKey) {
+ return false;
+ }
+ if (agg.getGroupByExpressions().size() <=
connectContext.getSessionVariable().shuffleKeyPruneThreshold) {
+ return false;
+ }
+ if (agg.hasSourceRepeat()) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * When parent sends shuffle request, choose one optimal key from
intersection of parent hash
+ * columns and agg group-by columns, or use full intersection. Returns
list of ExprIds as
+ * shuffle keys.
+ */
+ public static List<ExprId>
selectOptimalShuffleKeyForAggWithParentHashRequest(
+ PhysicalHashAggregate<? extends Plan> agg, Set<ExprId>
intersectIdSet, PlanContext context) {
+ List<ExprId> orderedIds = Utils.fastToImmutableList(intersectIdSet);
+ if
(!context.getConnectContext().getSessionVariable().chooseOneAggShuffleKey
+ || intersectIdSet.size() <=
context.getConnectContext().getSessionVariable().shuffleKeyPruneThreshold) {
+ return orderedIds;
Review Comment:
`orderedIds` is built from `intersectIdSet` (a `Set`), so the resulting
shuffle key list order is non-deterministic. This can lead to unstable plan
shapes and potentially different distribution specs across runs. Preserve a
deterministic order (e.g., iterate `parentHashExprIds` and keep those present
in the intersection, or preserve group-by order) instead of materializing
directly from a `Set`.
##########
regression-test/suites/nereids_p0/hash_shuffle_key_prune/hash_shuffle_key_prune.groovy:
##########
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("hash_shuffle_key_prune") {
+ multi_sql """
+ drop table if exists t1;
+ create table t1(a int, b int, c int, d int, e int, f int, g int, h
int, i int, j int) properties("replication_num"="1");
+ alter table t1 modify column a set stats ('row_count'='204000',
'ndv'='8', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column b set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column c set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column d set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column e set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column f set stats ('row_count'='204000',
'ndv'='8', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column g set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column h set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column i set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t1 modify column j set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+
+ drop table if exists t2;
+ create table t2(a int, b int, c int, d int, e int, f int, g int, h
int, i int, j int) distributed by hash(a) properties("replication_num"="1");
+ alter table t2 modify column a set stats ('row_count'='204000',
'ndv'='8', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column b set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column c set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column d set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column e set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column f set stats ('row_count'='204000',
'ndv'='8', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column g set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column h set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column i set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t2 modify column j set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+
+ drop table if exists t3;
+ create table t3(a int, b int, c int, d int, e int, f int, g int, h
int, i int, j int) distributed by hash(e) properties("replication_num"="1");
+ alter table t3 modify column a set stats ('row_count'='204000',
'ndv'='2000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column b set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column c set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column d set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column e set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column f set stats ('row_count'='204000',
'ndv'='8', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column g set stats ('row_count'='204000',
'ndv'='30', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column h set stats ('row_count'='204000',
'ndv'='100', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column i set stats ('row_count'='204000',
'ndv'='200', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+ alter table t3 modify column j set stats ('row_count'='204000',
'ndv'='5000', 'min_value'='0', 'max_value'='1000', 'avg_size'='880961',
'max_size'='880961' );
+
+ set disable_nereids_rules='prune_empty_partition';
+ set choose_one_agg_shuffle_key=true;
+ set shuffle_key_prune_threshold=5;
+ set detail_shape_nodes='PhysicalDistribute';
+ """
+ def checkShuffleKey = { String sqlString, String... expectedHashCols ->
+ def result = sql """ explain shape plan ${sqlString}"""
+ for (def expected : expectedHashCols) {
+ assertTrue(result.toString().contains("Hash
Columns:[${expected}]"), "\n${result.collect { it[0] }.join('\n')} not contains
expected hash cols: ${expected}")
+ }
+ }
Review Comment:
`checkShuffleKey` is defined but never used in this suite. Keeping unused
helpers makes the regression test harder to maintain; either remove it or
convert some of the `qt_...` checks to use it so the intent (verifying chosen
hash columns) is explicit.
```suggestion
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java:
##########
@@ -839,9 +839,18 @@ private void recycleExpression(GroupExpression
groupExpression) {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("root:").append(getRoot()).append("\n");
+ boolean first = true;
+ Group rootGroup = getRoot();
for (Group group : groups.values()) {
- builder.append("\n\n").append(group).append("\n");
+ if (!first) {
+
builder.append("\n").append("═══════════════════════════════════════════════════════════").append("\n");
+ }
+ if (group.equals(rootGroup)) {
+ builder.append("\n").append("🌲 ROOT ").append(group);
+ } else {
+ builder.append("\n").append(group);
+ }
Review Comment:
`toString()` output now contains emoji/Unicode decorations (e.g., "🌲 ROOT")
and heavy box-drawing separators. These strings often end up in logs or
diagnostic dumps, and non-ASCII output can cause encoding issues or break log
parsing tools. Consider using plain ASCII markers (e.g., "ROOT" / "*" and
"-----") or gating the fancy formatting behind a debug flag.
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -3362,6 +3366,13 @@ public boolean isEnableESParallelScroll() {
+ "to exclude the impact of dangling delete
files."})
public boolean ignoreIcebergDanglingDelete = false;
+ public static final String SHUFFLE_KEY_PRUNE_THRESHOLD =
"shuffle_key_prune_threshold";
+ @VariableMgr.VarAttr(name = SHUFFLE_KEY_PRUNE_THRESHOLD,
+ description = {"控制在聚合或连接操作中进行 shuffle key 裁剪的阈值。"
+ + "当 shuffle key 的基数小于该阈值时,启用裁剪以哈希分发的开销。",
+ "Controls the threshold for performing shuffle key pruning
in aggregation or join operations. "})
Review Comment:
The `shuffle_key_prune_threshold` session variable description is currently
unclear/grammatically incorrect (CN ends with a comma and reads like it's
missing a verb such as “减少/降低”). Please refine the CN/EN descriptions to
precisely state when pruning is applied (e.g., group-by/join key count >
threshold) and what is being optimized.
```suggestion
description = {"控制在聚合或连接算子中进行 shuffle key 裁剪的阈值。"
+ "当 group by 或 join 的键数量大于该阈值时,规划器会尝试裁剪部分 shuffle key,"
+ "以减少哈希分发的数据量和计算开销。",
"Controls the threshold for pruning shuffle keys in
aggregation or join operators. "
+ "When the number of group-by or join keys is
greater than this threshold, "
+ "the planner will try to prune some shuffle
keys to reduce hash-distribution "
+ "data volume and computation overhead."})
```
##########
fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java:
##########
@@ -1322,4 +1322,51 @@ public static LinkedHashMap<Literal, Float>
getHotValues(String stringValues, Ty
}
return null;
}
+
+ /**
+ * Get max hot value count including null for balance/skew computation.
+ * When hotValues not exist or empty, treat nulls as the only hot value.
+ * Otherwise use the max of (top hot value count, null count).
+ */
+ public static double getMaxHotValueCntIncludeNull(ColumnStatistic
columnStatistic, double rowCount) {
+ Map<Literal, Float> hotValues = columnStatistic.getHotValues();
+ if (columnStatistic.getHotValues() == null || hotValues.isEmpty()) {
+ return columnStatistic.numNulls;
+ }
+ double maxRate =
hotValues.values().stream().mapToDouble(Float::doubleValue).max().orElse(0);
+ double maxHotRows = maxRate * rowCount;
+ return maxHotRows > columnStatistic.numNulls ? maxHotRows :
columnStatistic.numNulls;
Review Comment:
`getHotValues()` returns percentages (see `getHotValues` Javadoc: "row count
percentage"), but `getMaxHotValueCntIncludeNull` treats the max value as a
ratio and multiplies by `rowCount` directly. This will massively overestimate
hot-row counts (e.g., 30% becomes 30×rowCount) and break `isBalanced` / skew
scoring. Convert percentage to a fraction (e.g., divide by
`ColumnStatistic.ONE_HUNDRED`) before multiplying, and reconsider treating
`hotValues.isEmpty()` as "no hot values" (in `ColumnStatistic` an empty map
means hot values exist but are unknown).
```suggestion
* When hotValues do not exist (null), treat nulls as the only hot value.
* Otherwise use the max of (top hot value count, null count).
*/
public static double getMaxHotValueCntIncludeNull(ColumnStatistic
columnStatistic, double rowCount) {
Map<Literal, Float> hotValues = columnStatistic.getHotValues();
if (hotValues == null) {
return columnStatistic.numNulls;
}
// getHotValues() returns percentages (0-100). Convert to fraction
before applying to rowCount.
double maxRatePercent = hotValues.values().stream()
.mapToDouble(Float::doubleValue)
.max()
.orElse(0);
double maxHotRows = (maxRatePercent / ColumnStatistic.ONE_HUNDRED) *
rowCount;
return Math.max(maxHotRows, columnStatistic.numNulls);
```
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -2765,6 +2766,9 @@ public boolean isEnableHboNonStrictMatchingMode() {
}, needForward = false)
public boolean aggShuffleUseParentKey = true;
+ @VariableMgr.VarAttr(name = CHOOSE_ONE_AGG_SHUFFLE_KEY, needForward =
false)
Review Comment:
The new session variable `choose_one_agg_shuffle_key` is missing the
required bilingual `description` in `@VarAttr`. Without it, `SHOW
VARIABLES`/docs will expose the default placeholder ("待补充"/"TODO"). Please add
a clear CN/EN description of what enabling this does and any caveats.
```suggestion
@VariableMgr.VarAttr(
name = CHOOSE_ONE_AGG_SHUFFLE_KEY,
needForward = false,
description = {
"在聚合算子存在多个可选分组键时,只选择一个键作为 shuffle 键,以减少 shuffle 数据量;"
+ "可能增加数据倾斜风险,当前为实验特性,建议在确认无明显倾斜时开启。",
"When aggregation operators have multiple candidate
grouping keys, choose only one key as "
+ "the shuffle key to reduce the amount of
shuffled data. This may increase the risk "
+ "of data skew; it is an experimental feature
and should be enabled only after "
+ "verifying that skew is acceptable."
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]