This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0dcf27125b0 [feature](agg) add variable agg_shuffle_use_parent_key to
control the shuffle key choosing (#59876)
0dcf27125b0 is described below
commit 0dcf27125b0a64807999045a6b185fad8b34c565
Author: feiniaofeiafei <[email protected]>
AuthorDate: Fri Jan 30 19:14:43 2026 +0800
[feature](agg) add variable agg_shuffle_use_parent_key to control the
shuffle key choosing (#59876)
Problem Summary:
add var agg_shuffle_use_parent_key, default value is true, when set
false, can make agg shuffle by all gby key instead of parent shuffle
key.
---
.../java/org/apache/doris/nereids/PlanContext.java | 4 +
.../nereids/properties/RequestPropertyDeriver.java | 8 +-
.../java/org/apache/doris/qe/SessionVariable.java | 8 ++
.../properties/RequestPropertyDeriverTest.java | 92 ++++++++++++++++++++++
4 files changed, 110 insertions(+), 2 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
index 09285a638de..e20d9bf4044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java
@@ -83,4 +83,8 @@ public class PlanContext {
public StatementContext getStatementContext() {
return connectContext.getStatementContext();
}
+
+ public ConnectContext getConnectContext() {
+ return connectContext;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index bee83f0cc80..5a7134fffe0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -468,7 +468,7 @@ public class RequestPropertyDeriver extends
PlanVisitor<Void, PlanContext> {
Set<ExprId> intersectId = Sets.intersection(new
HashSet<>(parentHashExprIds),
new HashSet<>(groupByExprIds));
if (!intersectId.isEmpty() && intersectId.size() <
groupByExprIds.size()) {
- if (shouldUseParent(parentHashExprIds, agg)) {
+ if (shouldUseParent(parentHashExprIds, agg, context)) {
addRequestPropertyToChildren(PhysicalProperties.createHash(
Utils.fastToImmutableList(intersectId),
ShuffleType.REQUIRE));
}
@@ -482,7 +482,11 @@ public class RequestPropertyDeriver extends
PlanVisitor<Void, PlanContext> {
return null;
}
- private boolean shouldUseParent(List<ExprId> parentHashExprIds,
PhysicalHashAggregate<? extends Plan> agg) {
+ private boolean shouldUseParent(List<ExprId> parentHashExprIds,
PhysicalHashAggregate<? extends Plan> agg,
+ PlanContext context) {
+ if
(!context.getConnectContext().getSessionVariable().aggShuffleUseParentKey) {
+ return false;
+ }
Optional<GroupExpression> groupExpression = agg.getGroupExpression();
if (!groupExpression.isPresent()) {
return true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8920ddcc332..025720b0670 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -838,6 +838,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SKEW_REWRITE_JOIN_SALT_EXPLODE_FACTOR =
"skew_rewrite_join_salt_explode_factor";
public static final String SKEW_REWRITE_AGG_BUCKET_NUM =
"skew_rewrite_agg_bucket_num";
+ public static final String AGG_SHUFFLE_USE_PARENT_KEY =
"agg_shuffle_use_parent_key";
public static final String HOT_VALUE_COLLECT_COUNT =
"hot_value_collect_count";
@VariableMgr.VarAttr(name = HOT_VALUE_COLLECT_COUNT, needForward = true,
@@ -846,6 +847,7 @@ public class SessionVariable implements Serializable,
Writable {
+ "proportion as hot values, up to
HOT_VALUE_COLLECT_COUNT."})
public int hotValueCollectCount = 10; // Select the values that account
for at least 10% of the column
+
public void setHotValueCollectCount(int count) {
this.hotValueCollectCount = count;
}
@@ -2750,6 +2752,12 @@ public class SessionVariable implements Serializable,
Writable {
}, checker = "checkSkewRewriteAggBucketNum")
public int skewRewriteAggBucketNum = 1024;
+ @VariableMgr.VarAttr(name = AGG_SHUFFLE_USE_PARENT_KEY, description = {
+ "在聚合算子进行 shuffle 时,是否使用父节点的分组键进行 shuffle",
+ "Whether to use the parent node's grouping key for shuffling
during the aggregation operator"
+ }, needForward = false)
+ public boolean aggShuffleUseParentKey = true;
+
@VariableMgr.VarAttr(name = ENABLE_PREFER_CACHED_ROWSET, needForward =
false,
description = {"是否启用 prefer cached rowset 功能",
"Whether to enable prefer cached rowset feature"})
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
index 4524fc10e5e..32a6adcf212 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java
@@ -368,4 +368,96 @@ class RequestPropertyDeriverTest {
expected.add(Lists.newArrayList(PhysicalProperties.GATHER));
Assertions.assertEquals(expected, actual);
}
+
+ @Test
+ void testAggregateWithAggShuffleUseParentKeyDisabled() {
+ // Create ConnectContext with aggShuffleUseParentKey = false
+ ConnectContext testConnectContext = new ConnectContext();
+ testConnectContext.getSessionVariable().aggShuffleUseParentKey = false;
+
+ SlotReference key1 = new SlotReference(new ExprId(0), "col1",
IntegerType.INSTANCE, true, ImmutableList.of());
+ SlotReference key2 = new SlotReference(new ExprId(1), "col2",
IntegerType.INSTANCE, true, ImmutableList.of());
+ PhysicalHashAggregate<GroupPlan> aggregate = new
PhysicalHashAggregate<>(
+ Lists.newArrayList(key1, key2),
+ Lists.newArrayList(key1, key2),
+ new AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_RESULT),
+ true,
+ logicalProperties,
+ groupPlan
+ );
+ GroupExpression groupExpression = new GroupExpression(aggregate);
+ new Group(null, groupExpression, null);
+
+ // Create a parent hash distribution with key1 only
+ PhysicalProperties parentProperties = PhysicalProperties.createHash(
+ Lists.newArrayList(key1.getExprId()), ShuffleType.REQUIRE);
+
+ new Expectations() {
+ {
+ jobContext.getRequiredProperties();
+ result = parentProperties;
+ }
+ };
+
+ RequestPropertyDeriver requestPropertyDeriver = new
RequestPropertyDeriver(testConnectContext, jobContext);
+ List<List<PhysicalProperties>> actual
+ =
requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
+
+ // When aggShuffleUseParentKey is false, should only use all
groupByExpressions (key1, key2)
+ // and not use parent key (key1) separately
+ List<List<PhysicalProperties>> expected = Lists.newArrayList();
+ expected.add(Lists.newArrayList(PhysicalProperties.createHash(
+ Lists.newArrayList(key1.getExprId(), key2.getExprId()),
ShuffleType.REQUIRE)));
+ Assertions.assertEquals(1, actual.size());
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ void testAggregateWithAggShuffleUseParentKeyEnabled() {
+ // Create ConnectContext with aggShuffleUseParentKey = true (default
value)
+ ConnectContext testConnectContext = new ConnectContext();
+ testConnectContext.getSessionVariable().aggShuffleUseParentKey = true;
+
+ SlotReference key1 = new SlotReference(new ExprId(0), "col1",
IntegerType.INSTANCE, true, ImmutableList.of());
+ SlotReference key2 = new SlotReference(new ExprId(1), "col2",
IntegerType.INSTANCE, true, ImmutableList.of());
+ PhysicalHashAggregate<GroupPlan> aggregate = new
PhysicalHashAggregate<>(
+ Lists.newArrayList(key1, key2),
+ Lists.newArrayList(key1, key2),
+ new AggregateParam(AggPhase.GLOBAL, AggMode.BUFFER_TO_RESULT),
+ true,
+ logicalProperties,
+ groupPlan
+ );
+ GroupExpression groupExpression = new GroupExpression(aggregate);
+ new Group(null, groupExpression, null);
+
+ // Create a parent hash distribution with key1 only
+ PhysicalProperties parentProperties = PhysicalProperties.createHash(
+ Lists.newArrayList(key1.getExprId()), ShuffleType.REQUIRE);
+
+ new Expectations() {
+ {
+ jobContext.getRequiredProperties();
+ result = parentProperties;
+ }
+ };
+ new MockUp<org.apache.doris.nereids.memo.GroupExpression>() {
+ @mockit.Mock
+ org.apache.doris.statistics.Statistics childStatistics(int idx) {
+ return null;
+ }
+ };
+ RequestPropertyDeriver requestPropertyDeriver = new
RequestPropertyDeriver(testConnectContext, jobContext);
+ List<List<PhysicalProperties>> actual
+ =
requestPropertyDeriver.getRequestChildrenPropertyList(groupExpression);
+
+ // When aggShuffleUseParentKey is true, shouldUseParent may return true
+ // If shouldUseParent returns true, it will add parent key (key1)
first, then all groupByExpressions (key1, key2)
+ Assertions.assertEquals(2, actual.size(), "Should have at least one
property request");
+ PhysicalProperties parentProp = PhysicalProperties.createHash(
+ Lists.newArrayList(key1.getExprId()), ShuffleType.REQUIRE);
+ PhysicalProperties aggProp = PhysicalProperties.createHash(
+ Lists.newArrayList(key1.getExprId(), key2.getExprId()),
ShuffleType.REQUIRE);
+ Assertions.assertTrue(actual.contains(ImmutableList.of(aggProp)) &&
actual.contains(ImmutableList.of(parentProp)));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]