This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/tpc_preview6 by this push:
     new 18d82d7200f Revert "support count pushdown. tpcds 14/ tpch13"
18d82d7200f is described below

commit 18d82d7200f9746c7a805ccdec943b09560e88ab
Author: happenlee <[email protected]>
AuthorDate: Mon Feb 9 22:32:08 2026 +0800

    Revert "support count pushdown. tpcds 14/ tpch13"
    
    This reverts commit ff0972f4c83b043eb6ee949350cace453850769c.
---
 be/src/pipeline/exec/operator.cpp             |  4 --
 be/src/pipeline/pipeline_fragment_context.cpp | 57 +++------------------------
 2 files changed, 6 insertions(+), 55 deletions(-)

diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 5ef03bc5d07..26b63443dd6 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -62,8 +62,6 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/queue_sink_operator.h"
-#include "pipeline/exec/queue_source_operator.h"
 #include "pipeline/exec/rec_cte_anchor_sink_operator.h"
 #include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/exec/rec_cte_sink_operator.h"
@@ -811,7 +809,6 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
 DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
-DECLARE_OPERATOR(QueueSinkLocalState)
 DECLARE_OPERATOR(DictSinkLocalState)
 DECLARE_OPERATOR(RecCTESinkLocalState)
 DECLARE_OPERATOR(RecCTEAnchorSinkLocalState)
@@ -848,7 +845,6 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
-DECLARE_OPERATOR(QueueSourceLocalState)
 DECLARE_OPERATOR(RecCTESourceLocalState)
 DECLARE_OPERATOR(RecCTEScanLocalState)
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 75627d85b83..83196022f4e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -82,8 +82,6 @@
 #include "pipeline/exec/partitioned_aggregation_source_operator.h"
 #include "pipeline/exec/partitioned_hash_join_probe_operator.h"
 #include "pipeline/exec/partitioned_hash_join_sink_operator.h"
-#include "pipeline/exec/queue_sink_operator.h"
-#include "pipeline/exec/queue_source_operator.h"
 #include "pipeline/exec/rec_cte_anchor_sink_operator.h"
 #include "pipeline/exec/rec_cte_scan_operator.h"
 #include "pipeline/exec/rec_cte_sink_operator.h"
@@ -1316,53 +1314,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 cur_pipe = new_pipe;
             } else {
-                // Check if parent is SetProbeSinkOperatorX by checking if the 
sink is a SetProbeSinkOperatorX
-                bool parent_is_set_probe = false;
-                if (cur_pipe->sink() != nullptr) {
-                    auto sink = cur_pipe->sink();
-                    // Try to dynamic_cast to both template instances of 
SetProbeSinkOperatorX
-                    if (dynamic_cast<SetProbeSinkOperatorX<true>*>(sink) ||
-                        dynamic_cast<SetProbeSinkOperatorX<false>*>(sink)) {
-                        parent_is_set_probe = true;
-                    }
-                }
-
-                if (parent_is_set_probe) {
-                    // Create QueueSourceOperatorX
-                    auto queue_source_id = next_operator_id();
-                    OperatorPtr queue_source_op = 
std::make_shared<QueueSourceOperatorX>(
-                            pool, tnode.node_id, queue_source_id);
-                    RETURN_IF_ERROR(cur_pipe->add_operator(queue_source_op, 
_parallel_instances));
-
-                    // Create new pipeline for QueueSinkOperatorX
-                    const auto downstream_pipeline_id = cur_pipe->id();
-                    if (!_dag.contains(downstream_pipeline_id)) {
-                        _dag.insert({downstream_pipeline_id, {}});
-                    }
-                    PipelinePtr queue_side_pipe = add_pipeline(cur_pipe);
-                    
_dag[downstream_pipeline_id].push_back(queue_side_pipe->id());
-
-                    // Create QueueSinkOperatorX
-                    auto queue_sink_id = next_sink_operator_id();
-                    DataSinkOperatorPtr queue_sink_op = 
std::make_shared<QueueSinkOperatorX>(
-                            queue_sink_id, queue_source_id, 
queue_source_op->operator_id());
-                    RETURN_IF_ERROR(queue_side_pipe->set_sink(queue_sink_op));
-
-                    // Create DistinctStreamingAggOperatorX in the new pipeline
-                    // Note: we assign to op so that _create_tree_helper will 
set its child correctly
-                    op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
-                                                                         
tnode, descs, _require_bucket_distribution);
-                    RETURN_IF_ERROR(queue_side_pipe->add_operator(op, 
_parallel_instances));
-                    RETURN_IF_ERROR(queue_source_op->set_child(op));
-                    cur_pipe = queue_side_pipe;
-                } else {
-                    op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
-                                                                         
tnode, descs, _require_bucket_distribution);
-                    
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-                    _require_bucket_distribution =
-                            _require_bucket_distribution || 
op->require_data_distribution();
-                    RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
-                }
+                op = std::make_shared<DistinctStreamingAggOperatorX>(
+                        pool, next_operator_id(), tnode, descs, 
_require_bucket_distribution);
+                
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
+                _require_bucket_distribution =
+                        _require_bucket_distribution || 
op->require_data_distribution();
+                RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
             }
         } else if (is_streaming_agg) {
             if (need_create_cache_op) {
@@ -1764,7 +1721,6 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
         PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
 
-        // Create appropriate sink operator based on child_id
         DataSinkOperatorPtr sink;
         if (child_id == 0) {
             sink.reset(new SetSinkOperatorX<is_intersect>(child_id, 
next_sink_operator_id(),
@@ -1773,7 +1729,6 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
             sink.reset(new SetProbeSinkOperatorX<is_intersect>(
                     child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         }
-        // Common code for both cases
         sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, 
_runtime_state.get()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to