This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new fbfb51e14b3 branch-3.1: [fix](agg) Fix agg's input distribution #56801
(#56830)
fbfb51e14b3 is described below
commit fbfb51e14b3616b0c1def5d0a3407fcc3a945e7c
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 10 14:15:57 2025 +0800
branch-3.1: [fix](agg) Fix agg's input distribution #56801 (#56830)
picked from #56801
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 19 ++++++++++++++++---
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../aggregate_function_distinct.cpp | 2 --
.../aggregate_function_simple_factory.h | 2 ++
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 63d43e07511..2bada975f8d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "runtime/primitive_type.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
#include "vec/common/hash_table/hash.h"
#include "vec/exprs/vectorized_agg_fn.h"
@@ -678,6 +679,7 @@ Status AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs&
return Status::OK();
}
+// TODO: Tricky processing if `multi_distinct_` exists which will be re-planed
by optimizer.
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
const DescriptorTbl& descs, bool
require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
@@ -690,9 +692,20 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts &&
!tnode.conjuncts.empty())),
- _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
- ? tnode.distribute_expr_lists[0]
- : tnode.agg_node.grouping_exprs),
+ _partition_exprs(
+ tnode.__isset.distribute_expr_lists &&
+ (require_bucket_distribution ||
+ std::any_of(
+
tnode.agg_node.aggregate_functions.begin(),
+
tnode.agg_node.aggregate_functions.end(),
+ [](const TExpr& texpr) -> bool {
+ return texpr.nodes[0]
+
.fn.name.function_name.starts_with(
+ vectorized::
+
DISTINCT_FUNCTION_PREFIX);
+ }))
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples),
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index e5209425f76..18eb70bc17c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -139,7 +139,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
- if (_probe_expr_ctxs.empty()) {
+ if (_partition_exprs.empty()) {
return _needs_finalize
? DataDistribution(ExchangeType::NOOP)
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
index fce58b38688..486c5d2e8f6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.cpp
@@ -79,8 +79,6 @@ public:
}
};
-const std::string DISTINCT_FUNCTION_PREFIX = "multi_distinct_";
-
void
register_aggregate_function_combinator_distinct(AggregateFunctionSimpleFactory&
factory) {
AggregateFunctionCreator creator = [&](const std::string& name, const
DataTypes& types,
const bool result_is_nullable,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
index f32d60fddde..4657825db3a 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
@@ -41,6 +41,8 @@ using DataTypes = std::vector<DataTypePtr>;
using AggregateFunctionCreator = std::function<AggregateFunctionPtr(
const std::string&, const DataTypes&, const bool, const
AggregateFunctionAttr&)>;
+const std::string DISTINCT_FUNCTION_PREFIX = "multi_distinct_";
+
inline std::string types_name(const DataTypes& types) {
std::string name;
for (auto&& type : types) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]