This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch tpc_preview6 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ff0972f4c83b043eb6ee949350cace453850769c Author: englefly <[email protected]> AuthorDate: Thu Feb 5 20:39:17 2026 +0800 support count pushdown. tpcds 14/ tpch13 ds14 增加了agg push,执行时间 4.7 -> 4.8 h13 增加了 agg push,应该让p6 恢复到 p4 的成绩,从10sec 恢复到 7 sec --- be/src/pipeline/exec/operator.cpp | 4 ++ be/src/pipeline/pipeline_fragment_context.cpp | 57 ++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 26b63443dd6..5ef03bc5d07 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -62,6 +62,8 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/queue_sink_operator.h" +#include "pipeline/exec/queue_source_operator.h" #include "pipeline/exec/rec_cte_anchor_sink_operator.h" #include "pipeline/exec/rec_cte_scan_operator.h" #include "pipeline/exec/rec_cte_sink_operator.h" @@ -809,6 +811,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>) DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState) DECLARE_OPERATOR(GroupCommitBlockSinkLocalState) DECLARE_OPERATOR(CacheSinkLocalState) +DECLARE_OPERATOR(QueueSinkLocalState) DECLARE_OPERATOR(DictSinkLocalState) DECLARE_OPERATOR(RecCTESinkLocalState) DECLARE_OPERATOR(RecCTEAnchorSinkLocalState) @@ -845,6 +848,7 @@ DECLARE_OPERATOR(MetaScanLocalState) DECLARE_OPERATOR(LocalExchangeSourceLocalState) DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState) DECLARE_OPERATOR(CacheSourceLocalState) +DECLARE_OPERATOR(QueueSourceLocalState) DECLARE_OPERATOR(RecCTESourceLocalState) DECLARE_OPERATOR(RecCTEScanLocalState) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 83196022f4e..75627d85b83 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -82,6 +82,8 @@ #include "pipeline/exec/partitioned_aggregation_source_operator.h" #include "pipeline/exec/partitioned_hash_join_probe_operator.h" #include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/queue_sink_operator.h" +#include "pipeline/exec/queue_source_operator.h" #include "pipeline/exec/rec_cte_anchor_sink_operator.h" #include "pipeline/exec/rec_cte_scan_operator.h" #include "pipeline/exec/rec_cte_sink_operator.h" @@ -1314,12 +1316,53 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); cur_pipe = new_pipe; } else { - op = std::make_shared<DistinctStreamingAggOperatorX>( - pool, next_operator_id(), tnode, descs, _require_bucket_distribution); - op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); - _require_bucket_distribution = - _require_bucket_distribution || op->require_data_distribution(); - RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + // Check if parent is SetProbeSinkOperatorX by checking if the sink is a SetProbeSinkOperatorX + bool parent_is_set_probe = false; + if (cur_pipe->sink() != nullptr) { + auto sink = cur_pipe->sink(); + // Try to dynamic_cast to both template instances of SetProbeSinkOperatorX + if (dynamic_cast<SetProbeSinkOperatorX<true>*>(sink) || + dynamic_cast<SetProbeSinkOperatorX<false>*>(sink)) { + parent_is_set_probe = true; + } + } + + if (parent_is_set_probe) { + // Create QueueSourceOperatorX + auto queue_source_id = next_operator_id(); + OperatorPtr queue_source_op = std::make_shared<QueueSourceOperatorX>( + pool, tnode.node_id, queue_source_id); + RETURN_IF_ERROR(cur_pipe->add_operator(queue_source_op, _parallel_instances)); + + // Create new pipeline for QueueSinkOperatorX + const auto downstream_pipeline_id = cur_pipe->id(); + if (!_dag.contains(downstream_pipeline_id)) { + _dag.insert({downstream_pipeline_id, {}}); + } + PipelinePtr queue_side_pipe = add_pipeline(cur_pipe); + _dag[downstream_pipeline_id].push_back(queue_side_pipe->id()); + + // Create QueueSinkOperatorX + auto queue_sink_id = next_sink_operator_id(); + DataSinkOperatorPtr queue_sink_op = std::make_shared<QueueSinkOperatorX>( + queue_sink_id, queue_source_id, queue_source_op->operator_id()); + RETURN_IF_ERROR(queue_side_pipe->set_sink(queue_sink_op)); + + // Create DistinctStreamingAggOperatorX in the new pipeline + // Note: we assign to op so that _create_tree_helper will set its child correctly + op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(), + tnode, descs, _require_bucket_distribution); + RETURN_IF_ERROR(queue_side_pipe->add_operator(op, _parallel_instances)); + RETURN_IF_ERROR(queue_source_op->set_child(op)); + cur_pipe = queue_side_pipe; + } else { + op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(), + tnode, descs, _require_bucket_distribution); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); + RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); + } } } else if (is_streaming_agg) { if (need_create_cache_op) { @@ -1721,6 +1764,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node( PipelinePtr probe_side_pipe = add_pipeline(cur_pipe); _dag[downstream_pipeline_id].push_back(probe_side_pipe->id()); + // Create appropriate sink operator based on child_id DataSinkOperatorPtr sink; if (child_id == 0) { sink.reset(new SetSinkOperatorX<is_intersect>(child_id, next_sink_operator_id(), @@ -1729,6 +1773,7 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node( sink.reset(new SetProbeSinkOperatorX<is_intersect>( child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs)); } + // Common code for both cases sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); RETURN_IF_ERROR(probe_side_pipe->set_sink(sink)); RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get())); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
