This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e965013214c [fix](Nereids) fix group concat (#35334)
e965013214c is described below
commit e965013214c2b7ac602d42ea8b1e9f531009cce4
Author: 924060929 <[email protected]>
AuthorDate: Fri May 24 14:31:16 2024 +0800
[fix](Nereids) fix group concat (#35334)
cherry pick from #33091
commit e7d6697cbcdd643c83154304c59aa6c16f8fccf9
Fix failed in
regression_test/suites/query_p0/group_concat/test_group_concat.groovy
select
group_concat( distinct b1, '?'), group_concat( distinct b3, '?')
from
table_group_concat
group by
b2
exception:
lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group
The root cause is '?' is push down to slot by NormalizeAggregate,
AggregateStrategies treat the slot as a distinct parameter and generate a
invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver.
I fix this bug by avoid push down literal to slot in NormalizeAggregate,
and forbidden generate stream aggregate node when group by slots is empty
(cherry picked from commit e7d6697cbcdd643c83154304c59aa6c16f8fccf9)
* [fix](Nereids) fix group concat (#33091)
Fix failed in
regression_test/suites/query_p0/group_concat/test_group_concat.groovy
select
group_concat( distinct b1, '?'), group_concat( distinct b3, '?')
from
table_group_concat
group by
b2
exception:
lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group
The root cause is '?' is push down to slot by NormalizeAggregate,
AggregateStrategies treat the slot as a distinct parameter and generate a
invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver.
I fix this bug by avoid push down literal to slot in NormalizeAggregate,
and forbidden generate stream aggregate node when group by slots is empty
---
be/src/pipeline/pipeline_fragment_context.cpp | 9 ++++--
be/src/runtime/descriptors.h | 5 ++++
be/src/vec/exec/vaggregation_node.h | 1 +
.../java/org/apache/doris/nereids/memo/Group.java | 22 +++++++++++++--
.../apache/doris/nereids/memo/GroupExpression.java | 5 ++++
.../properties/ChildrenPropertiesRegulator.java | 3 ++
.../nereids/properties/PhysicalProperties.java | 6 ++--
.../nereids/rules/analysis/NormalizeAggregate.java | 31 +++++++++++++--------
.../rules/implementation/AggregateStrategies.java | 32 ++++++++++++++++++++++
9 files changed, 96 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9f860610681..0d5d14a18b3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -513,7 +513,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
- if (agg_node->is_aggregate_evaluators_empty()) {
+ if (agg_node->is_probe_expr_ctxs_empty() &&
node->row_desc().num_slots() == 0) {
+ return Status::InternalError("Illegal aggregate node " +
+ std::to_string(agg_node->id()) +
+ ": group by and output is empty");
+ }
+ if (agg_node->is_aggregate_evaluators_empty() &&
!agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
@@ -524,7 +529,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
- } else if (agg_node->is_streaming_preagg()) {
+ } else if (agg_node->is_streaming_preagg() &&
!agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index c4adbb8d4b3..01127a52ed4 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -502,10 +502,12 @@ public:
_has_varlen_slots(desc._has_varlen_slots) {
_num_materialized_slots = 0;
_num_null_slots = 0;
+ _num_slots = 0;
std::vector<TupleDescriptor*>::const_iterator it =
desc._tuple_desc_map.begin();
for (; it != desc._tuple_desc_map.end(); ++it) {
_num_materialized_slots += (*it)->num_materialized_slots();
_num_null_slots += (*it)->num_null_slots();
+ _num_slots += (*it)->slots().size();
}
_num_null_bytes = (_num_null_slots + 7) / 8;
}
@@ -528,6 +530,8 @@ public:
int num_null_bytes() const { return _num_null_bytes; }
+ int num_slots() const { return _num_slots; }
+
static const int INVALID_IDX;
// Returns INVALID_IDX if id not part of this row.
@@ -582,6 +586,7 @@ private:
int _num_materialized_slots;
int _num_null_slots;
int _num_null_bytes;
+ int _num_slots;
};
} // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 2eafe03fd50..81077e75340 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -849,6 +849,7 @@ public:
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) override;
Status sink(doris::RuntimeState* state, vectorized::Block* input_block,
bool eos) override;
Status do_pre_agg(vectorized::Block* input_block, vectorized::Block*
output_block);
+ bool is_probe_expr_ctxs_empty() const { return _probe_expr_ctxs.empty(); }
bool is_streaming_preagg() const { return _is_streaming_preagg; }
bool is_aggregate_evaluators_empty() const { return
_aggregate_evaluators.empty(); }
void _make_nullable_output_key(Block* block);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
index e6b2256b410..eab48eadbc2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
@@ -34,6 +34,7 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -65,7 +66,7 @@ public class Group {
// Map of cost lower bounds
// Map required plan props to cost lower bound of corresponding plan
- private final Map<PhysicalProperties, Pair<Cost, GroupExpression>>
lowestCostPlans = Maps.newHashMap();
+ private final Map<PhysicalProperties, Pair<Cost, GroupExpression>>
lowestCostPlans = Maps.newLinkedHashMap();
private boolean isExplored = false;
@@ -213,6 +214,12 @@ public class Group {
return Optional.ofNullable(lowestCostPlans.get(physicalProperties));
}
+ public Map<PhysicalProperties, Cost> getLowestCosts() {
+ return lowestCostPlans.entrySet()
+ .stream()
+ .collect(ImmutableMap.toImmutableMap(Entry::getKey, kv ->
kv.getValue().first));
+ }
+
public GroupExpression getBestPlan(PhysicalProperties properties) {
if (lowestCostPlans.containsKey(properties)) {
return lowestCostPlans.get(properties).second;
@@ -451,9 +458,18 @@ public class Group {
public String treeString() {
Function<Object, String> toString = obj -> {
if (obj instanceof Group) {
- return "Group[" + ((Group) obj).groupId + "]";
+ Group group = (Group) obj;
+ Map<PhysicalProperties, Cost> lowestCosts =
group.getLowestCosts();
+ return "Group[" + group.groupId + ", lowestCosts: " +
lowestCosts + "]";
} else if (obj instanceof GroupExpression) {
- return ((GroupExpression) obj).getPlan().toString();
+ GroupExpression groupExpression = (GroupExpression) obj;
+ Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>>
lowestCostTable
+ = groupExpression.getLowestCostTable();
+ Map<PhysicalProperties, PhysicalProperties>
requestPropertiesMap
+ = groupExpression.getRequestPropertiesMap();
+ Cost cost = groupExpression.getCost();
+ return groupExpression.getPlan().toString() + " [cost: " +
cost + ", lowestCostTable: "
+ + lowestCostTable + ", requestPropertiesMap: " +
requestPropertiesMap + "]";
} else if (obj instanceof Pair) {
// print logicalExpressions or physicalExpressions
// first is name, second is group expressions
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index 78218b552f3..507be9fa8a4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -319,6 +320,10 @@ public class GroupExpression {
this.estOutputRowCount = estOutputRowCount;
}
+ public Map<PhysicalProperties, PhysicalProperties>
getRequestPropertiesMap() {
+ return ImmutableMap.copyOf(requestPropertiesMap);
+ }
+
@Override
public String toString() {
DecimalFormat format = new DecimalFormat("#,###.##");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 50256552a2a..0fa2d87f92f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -103,6 +103,9 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
@Override
public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends
Plan> agg, Void context) {
+ if (agg.getGroupByExpressions().isEmpty() &&
agg.getOutputExpressions().isEmpty()) {
+ return false;
+ }
if (!agg.getAggregateParam().canBeBanned) {
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index c9cca233e57..5dad93ad3b4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -82,11 +82,13 @@ public class PhysicalProperties {
.map(SlotReference.class::cast)
.map(SlotReference::getExprId)
.collect(Collectors.toList());
- return createHash(partitionedSlots, shuffleType);
+ return partitionedSlots.isEmpty() ? PhysicalProperties.GATHER :
createHash(partitionedSlots, shuffleType);
}
public static PhysicalProperties createHash(List<ExprId>
orderedShuffledColumns, ShuffleType shuffleType) {
- return new PhysicalProperties(new
DistributionSpecHash(orderedShuffledColumns, shuffleType));
+ return orderedShuffledColumns.isEmpty()
+ ? PhysicalProperties.GATHER
+ : new PhysicalProperties(new
DistributionSpecHash(orderedShuffledColumns, shuffleType));
}
public static PhysicalProperties createHash(DistributionSpecHash
distributionSpecHash) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
index 3f5f749e2fc..dc071544935 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
@@ -156,28 +156,37 @@ public class NormalizeAggregate implements
RewriteRuleFactory, NormalizeToSlot {
// split non-distinct agg child as two part
// TRUE part 1: need push down itself, if it contains subqury or
window expression
// FALSE part 2: need push down its input slots, if it DOES NOT
contain subqury or window expression
- Map<Boolean, Set<Expression>> categorizedNoDistinctAggsChildren =
aggFuncs.stream()
+ Map<Boolean, ImmutableSet<Expression>>
categorizedNoDistinctAggsChildren = aggFuncs.stream()
.filter(aggFunc -> !aggFunc.isDistinct())
.flatMap(agg -> agg.children().stream())
+ // should not push down literal under aggregate
+ // e.g. group_concat(distinct xxx, ','), the ',' literal show
stay in aggregate
+ .filter(arg -> !(arg instanceof Literal))
.collect(Collectors.groupingBy(
child -> child.containsType(SubqueryExpr.class,
WindowExpression.class),
- Collectors.toSet()));
+ ImmutableSet.toImmutableSet()));
// split distinct agg child as two parts
// TRUE part 1: need push down itself, if it is NOT SlotReference or
Literal
// FALSE part 2: need push down its input slots, if it is
SlotReference or Literal
- Map<Boolean, Set<Expression>> categorizedDistinctAggsChildren =
aggFuncs.stream()
- .filter(aggFunc -> aggFunc.isDistinct()).flatMap(agg ->
agg.children().stream())
- .collect(Collectors.groupingBy(
- child -> !(child instanceof SlotReference || child
instanceof Literal),
- Collectors.toSet()));
+ Map<Boolean, ImmutableSet<Expression>> categorizedDistinctAggsChildren
= aggFuncs.stream()
+ .filter(aggFunc -> aggFunc.isDistinct())
+ .flatMap(agg -> agg.children().stream())
+ // should not push down literal under aggregate
+ // e.g. group_concat(distinct xxx, ','), the ',' literal show
stay in aggregate
+ .filter(arg -> !(arg instanceof Literal))
+ .collect(
+ Collectors.groupingBy(
+ child -> !(child instanceof SlotReference),
+ ImmutableSet.toImmutableSet())
+ );
Set<Expression> needPushSelf = Sets.union(
- categorizedNoDistinctAggsChildren.getOrDefault(true, new
HashSet<>()),
- categorizedDistinctAggsChildren.getOrDefault(true, new
HashSet<>()));
+ categorizedNoDistinctAggsChildren.getOrDefault(true,
ImmutableSet.of()),
+ categorizedDistinctAggsChildren.getOrDefault(true,
ImmutableSet.of()));
Set<Slot> needPushInputSlots =
ExpressionUtils.getInputSlotSet(Sets.union(
- categorizedNoDistinctAggsChildren.getOrDefault(false, new
HashSet<>()),
- categorizedDistinctAggsChildren.getOrDefault(false, new
HashSet<>())));
+ categorizedNoDistinctAggsChildren.getOrDefault(false,
ImmutableSet.of()),
+ categorizedDistinctAggsChildren.getOrDefault(false,
ImmutableSet.of())));
Set<Alias> existsAlias =
ExpressionUtils.mutableCollect(aggregateOutput,
Alias.class::isInstance);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 7f31585fd66..ad1ad4f5267 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -70,6 +70,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate.PushDownAggOp;
+import org.apache.doris.nereids.types.TinyIntType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.qe.ConnectContext;
@@ -1292,6 +1293,15 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
.build();
List<Expression> localAggGroupBy =
ImmutableList.copyOf(localAggGroupBySet);
+ boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() &&
localAggOutput.isEmpty();
+
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ localAggGroupBy = ImmutableList.of(new
NullLiteral(TinyIntType.INSTANCE));
+ localAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+
boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext,
localAggGroupBy);
List<Expression> partitionExpressions =
getHashAggregatePartitionExpressions(logicalAgg);
RequireProperties requireAny =
RequireProperties.of(PhysicalProperties.ANY);
@@ -1317,6 +1327,12 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
.addAll(nonDistinctAggFunctionToAliasPhase2.values())
.build();
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ globalAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+
RequireProperties requireGather =
RequireProperties.of(PhysicalProperties.GATHER);
PhysicalHashAggregate<Plan> anyLocalGatherGlobalAgg = new
PhysicalHashAggregate<>(
localAggGroupBy, globalAggOutput,
Optional.of(partitionExpressions),
@@ -1680,6 +1696,16 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext,
localAggGroupBy);
List<Expression> partitionExpressions =
getHashAggregatePartitionExpressions(logicalAgg);
RequireProperties requireAny =
RequireProperties.of(PhysicalProperties.ANY);
+
+ boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() &&
localAggOutput.isEmpty();
+
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ localAggGroupBy = ImmutableList.of(new
NullLiteral(TinyIntType.INSTANCE));
+ localAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+
PhysicalHashAggregate<Plan> anyLocalAgg = new
PhysicalHashAggregate<>(localAggGroupBy,
localAggOutput, Optional.of(partitionExpressions),
inputToBufferParam,
maybeUsingStreamAgg, Optional.empty(),
logicalAgg.getLogicalProperties(),
@@ -1702,6 +1728,12 @@ public class AggregateStrategies implements
ImplementationRuleFactory {
.addAll(nonDistinctAggFunctionToAliasPhase2.values())
.build();
+ // be not recommend generate an aggregate node with empty group by and
empty output,
+ // so add a null int slot to group by slot and output
+ if (isGroupByEmptySelectEmpty) {
+ globalAggOutput = ImmutableList.of(new Alias(new
NullLiteral(TinyIntType.INSTANCE)));
+ }
+
RequireProperties requireGather =
RequireProperties.of(PhysicalProperties.GATHER);
RequireProperties requireDistinctHash = RequireProperties.of(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]