This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview6 by this push:
new 18d82d7200f Revert "support count pushdown. tpcds 14/ tpch13"
18d82d7200f is described below
commit 18d82d7200f9746c7a805ccdec943b09560e88ab
Author: happenlee <[email protected]>
AuthorDate: Mon Feb 9 22:32:08 2026 +0800
Revert "support count pushdown. tpcds 14/ tpch13"
This reverts commit ff0972f4c83b043eb6ee949350cace453850769c.
---
be/src/pipeline/exec/operator.cpp | 4 --
be/src/pipeline/pipeline_fragment_context.cpp | 57 +++------------------------
2 files changed, 6 insertions(+), 55 deletions(-)
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 5ef03bc5d07..26b63443dd6 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,8 +62,6 @@
#include "pipeline/exec/partitioned_aggregation_source_operator.h"
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/queue_sink_operator.h"
-#include "pipeline/exec/queue_source_operator.h"
#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
#include "pipeline/exec/rec_cte_scan_operator.h"
#include "pipeline/exec/rec_cte_sink_operator.h"
@@ -811,7 +809,6 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
DECLARE_OPERATOR(CacheSinkLocalState)
-DECLARE_OPERATOR(QueueSinkLocalState)
DECLARE_OPERATOR(DictSinkLocalState)
DECLARE_OPERATOR(RecCTESinkLocalState)
DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
@@ -848,7 +845,6 @@ DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
DECLARE_OPERATOR(CacheSourceLocalState)
-DECLARE_OPERATOR(QueueSourceLocalState)
DECLARE_OPERATOR(RecCTESourceLocalState)
DECLARE_OPERATOR(RecCTEScanLocalState)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 75627d85b83..83196022f4e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,8 +82,6 @@
#include "pipeline/exec/partitioned_aggregation_source_operator.h"
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/queue_sink_operator.h"
-#include "pipeline/exec/queue_source_operator.h"
#include "pipeline/exec/rec_cte_anchor_sink_operator.h"
#include "pipeline/exec/rec_cte_scan_operator.h"
#include "pipeline/exec/rec_cte_sink_operator.h"
@@ -1316,53 +1314,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
- // Check if parent is SetProbeSinkOperatorX by checking if the
sink is a SetProbeSinkOperatorX
- bool parent_is_set_probe = false;
- if (cur_pipe->sink() != nullptr) {
- auto sink = cur_pipe->sink();
- // Try to dynamic_cast to both template instances of
SetProbeSinkOperatorX
- if (dynamic_cast<SetProbeSinkOperatorX<true>*>(sink) ||
- dynamic_cast<SetProbeSinkOperatorX<false>*>(sink)) {
- parent_is_set_probe = true;
- }
- }
-
- if (parent_is_set_probe) {
- // Create QueueSourceOperatorX
- auto queue_source_id = next_operator_id();
- OperatorPtr queue_source_op =
std::make_shared<QueueSourceOperatorX>(
- pool, tnode.node_id, queue_source_id);
- RETURN_IF_ERROR(cur_pipe->add_operator(queue_source_op,
_parallel_instances));
-
- // Create new pipeline for QueueSinkOperatorX
- const auto downstream_pipeline_id = cur_pipe->id();
- if (!_dag.contains(downstream_pipeline_id)) {
- _dag.insert({downstream_pipeline_id, {}});
- }
- PipelinePtr queue_side_pipe = add_pipeline(cur_pipe);
-
_dag[downstream_pipeline_id].push_back(queue_side_pipe->id());
-
- // Create QueueSinkOperatorX
- auto queue_sink_id = next_sink_operator_id();
- DataSinkOperatorPtr queue_sink_op =
std::make_shared<QueueSinkOperatorX>(
- queue_sink_id, queue_source_id,
queue_source_op->operator_id());
- RETURN_IF_ERROR(queue_side_pipe->set_sink(queue_sink_op));
-
- // Create DistinctStreamingAggOperatorX in the new pipeline
- // Note: we assign to op so that _create_tree_helper will
set its child correctly
- op = std::make_shared<DistinctStreamingAggOperatorX>(pool,
next_operator_id(),
-
tnode, descs, _require_bucket_distribution);
- RETURN_IF_ERROR(queue_side_pipe->add_operator(op,
_parallel_instances));
- RETURN_IF_ERROR(queue_source_op->set_child(op));
- cur_pipe = queue_side_pipe;
- } else {
- op = std::make_shared<DistinctStreamingAggOperatorX>(pool,
next_operator_id(),
-
tnode, descs, _require_bucket_distribution);
-
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
op->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->add_operator(op,
_parallel_instances));
- }
+ op = std::make_shared<DistinctStreamingAggOperatorX>(
+ pool, next_operator_id(), tnode, descs,
_require_bucket_distribution);
+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
op->require_data_distribution();
+ RETURN_IF_ERROR(cur_pipe->add_operator(op,
_parallel_instances));
}
} else if (is_streaming_agg) {
if (need_create_cache_op) {
@@ -1764,7 +1721,6 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(
PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
- // Create appropriate sink operator based on child_id
DataSinkOperatorPtr sink;
if (child_id == 0) {
sink.reset(new SetSinkOperatorX<is_intersect>(child_id,
next_sink_operator_id(),
@@ -1773,7 +1729,6 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(
sink.reset(new SetProbeSinkOperatorX<is_intersect>(
child_id, next_sink_operator_id(), op->operator_id(),
pool, tnode, descs));
}
- // Common code for both cases
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode,
_runtime_state.get()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]