This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e965013214c [fix](Nereids) fix group concat (#35334)
e965013214c is described below

commit e965013214c2b7ac602d42ea8b1e9f531009cce4
Author: 924060929 <[email protected]>
AuthorDate: Fri May 24 14:31:16 2024 +0800

    [fix](Nereids) fix group concat (#35334)
    
    cherry pick from #33091
    commit e7d6697cbcdd643c83154304c59aa6c16f8fccf9
    
    Fix failed in 
regression_test/suites/query_p0/group_concat/test_group_concat.groovy
    
    select
    group_concat( distinct b1, '?'), group_concat( distinct b3, '?')
    from
    table_group_concat
    group by
    b2
    
    exception:
    
    lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group
    
    The root cause is '?' is push down to slot by NormalizeAggregate, 
AggregateStrategies treat the slot as a distinct parameter and generate a 
invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver.
    
    I fix this bug by avoid push down literal to slot in NormalizeAggregate, 
and forbidden generate stream aggregate node when group by slots is empty
    
    (cherry picked from commit e7d6697cbcdd643c83154304c59aa6c16f8fccf9)
    
    * [fix](Nereids) fix group concat (#33091)
    
    Fix failed in 
regression_test/suites/query_p0/group_concat/test_group_concat.groovy
    
    select
    group_concat( distinct b1, '?'), group_concat( distinct b3, '?')
    from
    table_group_concat
    group by
    b2
    
    exception:
    
    lowestCostPlans with physicalProperties(GATHER) doesn't exist in root group
    
    The root cause is '?' is push down to slot by NormalizeAggregate, 
AggregateStrategies treat the slot as a distinct parameter and generate a 
invalid PhysicalHashAggregate, and then reject by ChildOutputPropertyDeriver.
    
    I fix this bug by avoid push down literal to slot in NormalizeAggregate, 
and forbidden generate stream aggregate node when group by slots is empty
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  9 ++++--
 be/src/runtime/descriptors.h                       |  5 ++++
 be/src/vec/exec/vaggregation_node.h                |  1 +
 .../java/org/apache/doris/nereids/memo/Group.java  | 22 +++++++++++++--
 .../apache/doris/nereids/memo/GroupExpression.java |  5 ++++
 .../properties/ChildrenPropertiesRegulator.java    |  3 ++
 .../nereids/properties/PhysicalProperties.java     |  6 ++--
 .../nereids/rules/analysis/NormalizeAggregate.java | 31 +++++++++++++--------
 .../rules/implementation/AggregateStrategies.java  | 32 ++++++++++++++++++++++
 9 files changed, 96 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 9f860610681..0d5d14a18b3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -513,7 +513,12 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
         auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
         auto new_pipe = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
-        if (agg_node->is_aggregate_evaluators_empty()) {
+        if (agg_node->is_probe_expr_ctxs_empty() && 
node->row_desc().num_slots() == 0) {
+            return Status::InternalError("Illegal aggregate node " +
+                                         std::to_string(agg_node->id()) +
+                                         ": group by and output is empty");
+        }
+        if (agg_node->is_aggregate_evaluators_empty() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink =
                     
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
@@ -524,7 +529,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* 
node, PipelinePtr cur
                     
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
                             node->id(), agg_node, data_queue);
             RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
-        } else if (agg_node->is_streaming_preagg()) {
+        } else if (agg_node->is_streaming_preagg() && 
!agg_node->is_probe_expr_ctxs_empty()) {
             auto data_queue = std::make_shared<DataQueue>(1);
             OperatorBuilderPtr pre_agg_sink = 
std::make_shared<StreamingAggSinkOperatorBuilder>(
                     node->id(), agg_node, data_queue);
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index c4adbb8d4b3..01127a52ed4 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -502,10 +502,12 @@ public:
               _has_varlen_slots(desc._has_varlen_slots) {
         _num_materialized_slots = 0;
         _num_null_slots = 0;
+        _num_slots = 0;
         std::vector<TupleDescriptor*>::const_iterator it = 
desc._tuple_desc_map.begin();
         for (; it != desc._tuple_desc_map.end(); ++it) {
             _num_materialized_slots += (*it)->num_materialized_slots();
             _num_null_slots += (*it)->num_null_slots();
+            _num_slots += (*it)->slots().size();
         }
         _num_null_bytes = (_num_null_slots + 7) / 8;
     }
@@ -528,6 +530,8 @@ public:
 
     int num_null_bytes() const { return _num_null_bytes; }
 
+    int num_slots() const { return _num_slots; }
+
     static const int INVALID_IDX;
 
     // Returns INVALID_IDX if id not part of this row.
@@ -582,6 +586,7 @@ private:
     int _num_materialized_slots;
     int _num_null_slots;
     int _num_null_bytes;
+    int _num_slots;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 2eafe03fd50..81077e75340 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -849,6 +849,7 @@ public:
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block, 
bool* eos) override;
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, 
bool eos) override;
     Status do_pre_agg(vectorized::Block* input_block, vectorized::Block* 
output_block);
+    bool is_probe_expr_ctxs_empty() const { return _probe_expr_ctxs.empty(); }
     bool is_streaming_preagg() const { return _is_streaming_preagg; }
     bool is_aggregate_evaluators_empty() const { return 
_aggregate_evaluators.empty(); }
     void _make_nullable_output_key(Block* block);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
index e6b2256b410..eab48eadbc2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java
@@ -34,6 +34,7 @@ import org.apache.doris.statistics.Statistics;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -65,7 +66,7 @@ public class Group {
 
     // Map of cost lower bounds
     // Map required plan props to cost lower bound of corresponding plan
-    private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> 
lowestCostPlans = Maps.newHashMap();
+    private final Map<PhysicalProperties, Pair<Cost, GroupExpression>> 
lowestCostPlans = Maps.newLinkedHashMap();
 
     private boolean isExplored = false;
 
@@ -213,6 +214,12 @@ public class Group {
         return Optional.ofNullable(lowestCostPlans.get(physicalProperties));
     }
 
+    public Map<PhysicalProperties, Cost> getLowestCosts() {
+        return lowestCostPlans.entrySet()
+                .stream()
+                .collect(ImmutableMap.toImmutableMap(Entry::getKey, kv -> 
kv.getValue().first));
+    }
+
     public GroupExpression getBestPlan(PhysicalProperties properties) {
         if (lowestCostPlans.containsKey(properties)) {
             return lowestCostPlans.get(properties).second;
@@ -451,9 +458,18 @@ public class Group {
     public String treeString() {
         Function<Object, String> toString = obj -> {
             if (obj instanceof Group) {
-                return "Group[" + ((Group) obj).groupId + "]";
+                Group group = (Group) obj;
+                Map<PhysicalProperties, Cost> lowestCosts = 
group.getLowestCosts();
+                return "Group[" + group.groupId + ", lowestCosts: " + 
lowestCosts + "]";
             } else if (obj instanceof GroupExpression) {
-                return ((GroupExpression) obj).getPlan().toString();
+                GroupExpression groupExpression = (GroupExpression) obj;
+                Map<PhysicalProperties, Pair<Cost, List<PhysicalProperties>>> 
lowestCostTable
+                        = groupExpression.getLowestCostTable();
+                Map<PhysicalProperties, PhysicalProperties> 
requestPropertiesMap
+                        = groupExpression.getRequestPropertiesMap();
+                Cost cost = groupExpression.getCost();
+                return groupExpression.getPlan().toString() + " [cost: " + 
cost + ", lowestCostTable: "
+                        + lowestCostTable + ", requestPropertiesMap: " + 
requestPropertiesMap + "]";
             } else if (obj instanceof Pair) {
                 // print logicalExpressions or physicalExpressions
                 // first is name, second is group expressions
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
index 78218b552f3..507be9fa8a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java
@@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -319,6 +320,10 @@ public class GroupExpression {
         this.estOutputRowCount = estOutputRowCount;
     }
 
+    public Map<PhysicalProperties, PhysicalProperties> 
getRequestPropertiesMap() {
+        return ImmutableMap.copyOf(requestPropertiesMap);
+    }
+
     @Override
     public String toString() {
         DecimalFormat format = new DecimalFormat("#,###.##");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 50256552a2a..0fa2d87f92f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -103,6 +103,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
 
     @Override
     public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends 
Plan> agg, Void context) {
+        if (agg.getGroupByExpressions().isEmpty() && 
agg.getOutputExpressions().isEmpty()) {
+            return false;
+        }
         if (!agg.getAggregateParam().canBeBanned) {
             return true;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
index c9cca233e57..5dad93ad3b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java
@@ -82,11 +82,13 @@ public class PhysicalProperties {
                 .map(SlotReference.class::cast)
                 .map(SlotReference::getExprId)
                 .collect(Collectors.toList());
-        return createHash(partitionedSlots, shuffleType);
+        return partitionedSlots.isEmpty() ? PhysicalProperties.GATHER : 
createHash(partitionedSlots, shuffleType);
     }
 
     public static PhysicalProperties createHash(List<ExprId> 
orderedShuffledColumns, ShuffleType shuffleType) {
-        return new PhysicalProperties(new 
DistributionSpecHash(orderedShuffledColumns, shuffleType));
+        return orderedShuffledColumns.isEmpty()
+                ? PhysicalProperties.GATHER
+                : new PhysicalProperties(new 
DistributionSpecHash(orderedShuffledColumns, shuffleType));
     }
 
     public static PhysicalProperties createHash(DistributionSpecHash 
distributionSpecHash) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
index 3f5f749e2fc..dc071544935 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/NormalizeAggregate.java
@@ -156,28 +156,37 @@ public class NormalizeAggregate implements 
RewriteRuleFactory, NormalizeToSlot {
         // split non-distinct agg child as two part
         // TRUE part 1: need push down itself, if it contains subqury or 
window expression
         // FALSE part 2: need push down its input slots, if it DOES NOT 
contain subqury or window expression
-        Map<Boolean, Set<Expression>> categorizedNoDistinctAggsChildren = 
aggFuncs.stream()
+        Map<Boolean, ImmutableSet<Expression>> 
categorizedNoDistinctAggsChildren = aggFuncs.stream()
                 .filter(aggFunc -> !aggFunc.isDistinct())
                 .flatMap(agg -> agg.children().stream())
+                // should not push down literal under aggregate
+                // e.g. group_concat(distinct xxx, ','), the ',' literal show 
stay in aggregate
+                .filter(arg -> !(arg instanceof Literal))
                 .collect(Collectors.groupingBy(
                         child -> child.containsType(SubqueryExpr.class, 
WindowExpression.class),
-                        Collectors.toSet()));
+                        ImmutableSet.toImmutableSet()));
 
         // split distinct agg child as two parts
         // TRUE part 1: need push down itself, if it is NOT SlotReference or 
Literal
         // FALSE part 2: need push down its input slots, if it is 
SlotReference or Literal
-        Map<Boolean, Set<Expression>> categorizedDistinctAggsChildren = 
aggFuncs.stream()
-                .filter(aggFunc -> aggFunc.isDistinct()).flatMap(agg -> 
agg.children().stream())
-                .collect(Collectors.groupingBy(
-                        child -> !(child instanceof SlotReference || child 
instanceof Literal),
-                        Collectors.toSet()));
+        Map<Boolean, ImmutableSet<Expression>> categorizedDistinctAggsChildren 
= aggFuncs.stream()
+                .filter(aggFunc -> aggFunc.isDistinct())
+                .flatMap(agg -> agg.children().stream())
+                // should not push down literal under aggregate
+                // e.g. group_concat(distinct xxx, ','), the ',' literal show 
stay in aggregate
+                .filter(arg -> !(arg instanceof Literal))
+                .collect(
+                        Collectors.groupingBy(
+                                child -> !(child instanceof SlotReference),
+                                ImmutableSet.toImmutableSet())
+                );
 
         Set<Expression> needPushSelf = Sets.union(
-                categorizedNoDistinctAggsChildren.getOrDefault(true, new 
HashSet<>()),
-                categorizedDistinctAggsChildren.getOrDefault(true, new 
HashSet<>()));
+                categorizedNoDistinctAggsChildren.getOrDefault(true, 
ImmutableSet.of()),
+                categorizedDistinctAggsChildren.getOrDefault(true, 
ImmutableSet.of()));
         Set<Slot> needPushInputSlots = 
ExpressionUtils.getInputSlotSet(Sets.union(
-                categorizedNoDistinctAggsChildren.getOrDefault(false, new 
HashSet<>()),
-                categorizedDistinctAggsChildren.getOrDefault(false, new 
HashSet<>())));
+                categorizedNoDistinctAggsChildren.getOrDefault(false, 
ImmutableSet.of()),
+                categorizedDistinctAggsChildren.getOrDefault(false, 
ImmutableSet.of())));
 
         Set<Alias> existsAlias =
                 ExpressionUtils.mutableCollect(aggregateOutput, 
Alias.class::isInstance);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
index 7f31585fd66..ad1ad4f5267 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java
@@ -70,6 +70,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate.PushDownAggOp;
+import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.qe.ConnectContext;
@@ -1292,6 +1293,15 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .build();
 
         List<Expression> localAggGroupBy = 
ImmutableList.copyOf(localAggGroupBySet);
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, 
localAggGroupBy);
         List<Expression> partitionExpressions = 
getHashAggregatePartitionExpressions(logicalAgg);
         RequireProperties requireAny = 
RequireProperties.of(PhysicalProperties.ANY);
@@ -1317,6 +1327,12 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .addAll(nonDistinctAggFunctionToAliasPhase2.values())
                 .build();
 
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            globalAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
         PhysicalHashAggregate<Plan> anyLocalGatherGlobalAgg = new 
PhysicalHashAggregate<>(
                 localAggGroupBy, globalAggOutput, 
Optional.of(partitionExpressions),
@@ -1680,6 +1696,16 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
         boolean maybeUsingStreamAgg = maybeUsingStreamAgg(connectContext, 
localAggGroupBy);
         List<Expression> partitionExpressions = 
getHashAggregatePartitionExpressions(logicalAgg);
         RequireProperties requireAny = 
RequireProperties.of(PhysicalProperties.ANY);
+
+        boolean isGroupByEmptySelectEmpty = localAggGroupBy.isEmpty() && 
localAggOutput.isEmpty();
+
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            localAggGroupBy = ImmutableList.of(new 
NullLiteral(TinyIntType.INSTANCE));
+            localAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         PhysicalHashAggregate<Plan> anyLocalAgg = new 
PhysicalHashAggregate<>(localAggGroupBy,
                 localAggOutput, Optional.of(partitionExpressions), 
inputToBufferParam,
                 maybeUsingStreamAgg, Optional.empty(), 
logicalAgg.getLogicalProperties(),
@@ -1702,6 +1728,12 @@ public class AggregateStrategies implements 
ImplementationRuleFactory {
                 .addAll(nonDistinctAggFunctionToAliasPhase2.values())
                 .build();
 
+        // be not recommend generate an aggregate node with empty group by and 
empty output,
+        // so add a null int slot to group by slot and output
+        if (isGroupByEmptySelectEmpty) {
+            globalAggOutput = ImmutableList.of(new Alias(new 
NullLiteral(TinyIntType.INSTANCE)));
+        }
+
         RequireProperties requireGather = 
RequireProperties.of(PhysicalProperties.GATHER);
 
         RequireProperties requireDistinctHash = RequireProperties.of(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to