This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new fbfb51e14b3 branch-3.1: [fix](agg) Fix agg's input distribution #56801 
(#56830)
fbfb51e14b3 is described below

commit fbfb51e14b3616b0c1def5d0a3407fcc3a945e7c
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 10 14:15:57 2025 +0800

    branch-3.1: [fix](agg) Fix agg's input distribution #56801 (#56830)
    
    picked from #56801
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp    | 19 ++++++++++++++++---
 be/src/pipeline/exec/aggregation_sink_operator.h      |  2 +-
 .../aggregate_function_distinct.cpp                   |  2 --
 .../aggregate_function_simple_factory.h               |  2 ++
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 63d43e07511..2bada975f8d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -23,6 +23,7 @@
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "runtime/primitive_type.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/common/hash_table/hash.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 
@@ -678,6 +679,7 @@ Status AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs&
     return Status::OK();
 }
 
+// TODO: Tricky processing if `multi_distinct_` exists which will be re-planed 
by optimizer.
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
                                    const DescriptorTbl& descs, bool 
require_bucket_distribution)
         : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
@@ -690,9 +692,20 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
           _limit(tnode.limit),
           _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
                           (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
-          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
-                                   ? tnode.distribute_expr_lists[0]
-                                   : tnode.agg_node.grouping_exprs),
+          _partition_exprs(
+                  tnode.__isset.distribute_expr_lists &&
+                                  (require_bucket_distribution ||
+                                   std::any_of(
+                                           
tnode.agg_node.aggregate_functions.begin(),
+                                           
tnode.agg_node.aggregate_functions.end(),
+                                           [](const TExpr& texpr) -> bool {
+                                               return texpr.nodes[0]
+                                                       
.fn.name.function_name.starts_with(
+                                                               vectorized::
+                                                                       
DISTINCT_FUNCTION_PREFIX);
+                                           }))
+                          ? tnode.distribute_expr_lists[0]
+                          : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
           _require_bucket_distribution(require_bucket_distribution),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples),
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index e5209425f76..18eb70bc17c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -139,7 +139,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
     DataDistribution required_data_distribution() const override {
-        if (_probe_expr_ctxs.empty()) {
+        if (_partition_exprs.empty()) {
             return _needs_finalize
                            ? DataDistribution(ExchangeType::NOOP)
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
index fce58b38688..486c5d2e8f6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
@@ -79,8 +79,6 @@ public:
     }
 };
 
-const std::string DISTINCT_FUNCTION_PREFIX = "multi_distinct_";
-
 void 
register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFactory& 
factory) {
     AggregateFunctionCreator creator = [&](const std::string& name, const 
DataTypes& types,
                                            const bool result_is_nullable,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h 
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
index f32d60fddde..4657825db3a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
@@ -41,6 +41,8 @@ using DataTypes = std::vector<DataTypePtr>;
 using AggregateFunctionCreator = std::function<AggregateFunctionPtr(
         const std::string&, const DataTypes&, const bool, const 
AggregateFunctionAttr&)>;
 
+const std::string DISTINCT_FUNCTION_PREFIX = "multi_distinct_";
+
 inline std::string types_name(const DataTypes& types) {
     std::string name;
     for (auto&& type : types) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to