This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6afb961e8ea [minor](fuzzy) make enableQueryCache fuzzy (#61583)
6afb961e8ea is described below

commit 6afb961e8ea62a3d8343464747a89da3030a4a41
Author: Gabriel <[email protected]>
AuthorDate: Tue Mar 24 15:53:16 2026 +0800

    [minor](fuzzy) make enableQueryCache fuzzy (#61583)
    
    Fix Query Cache the bug to enable fuzzy test
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 15 ++++++++++-----
 be/src/exec/pipeline/pipeline_fragment_context.h   |  2 +-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index b667a4b7274..44fd7bdc37d 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -676,18 +676,20 @@ Status PipelineFragmentContext::_create_tree_helper(
     int num_children = tnodes[*node_idx].num_children;
     bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
     bool current_require_bucket_distribution = require_bucket_distribution;
+    // TODO: Create CacheOperator is confused now
     OperatorPtr op = nullptr;
+    OperatorPtr cache_op = nullptr;
     RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, 
cur_pipe,
                                      parent == nullptr ? -1 : 
parent->node_id(), child_idx,
                                      followed_by_shuffled_operator,
-                                     current_require_bucket_distribution));
+                                     current_require_bucket_distribution, 
cache_op));
     // Initialization must be done here. For example, group by expressions in 
agg will be used to
     // decide if a local shuffle should be planed, so it must be initialized 
here.
     RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
     // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
     if (parent != nullptr) {
         // add to parent's child(s)
-        RETURN_IF_ERROR(parent->set_child(op));
+        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
     } else {
         *root = op;
     }
@@ -1235,7 +1237,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                                  PipelinePtr& cur_pipe, int 
parent_idx,
                                                  int child_idx,
                                                  const bool 
followed_by_shuffled_operator,
-                                                 const bool 
require_bucket_distribution) {
+                                                 const bool 
require_bucket_distribution,
+                                                 OperatorPtr& cache_op) {
     std::vector<DataSinkOperatorPtr> sink_ops;
     Defer defer = Defer([&]() {
         if (op) {
@@ -1332,7 +1335,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             _dag[downstream_pipeline_id].push_back(new_pipe->id());
 
             DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
-                    next_sink_operator_id(), cache_source_id, 
op->operator_id()));
+                    next_sink_operator_id(), op->node_id(), 
op->operator_id()));
             RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
             return Status::OK();
         };
@@ -1358,6 +1361,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
 
+                cache_op = op;
                 op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
                                                                      tnode, 
descs);
                 RETURN_IF_ERROR(new_pipe->add_operator(op, 
_parallel_instances));
@@ -1372,7 +1376,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             if (need_create_cache_op) {
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
-
+                cache_op = op;
                 op = std::make_shared<StreamingAggOperatorX>(pool, 
next_operator_id(), tnode,
                                                              descs);
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
@@ -1388,6 +1392,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             PipelinePtr new_pipe;
             if (need_create_cache_op) {
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+                cache_op = op;
             }
 
             if (enable_spill) {
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h 
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 64ce1b665ac..e523e711898 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -148,7 +148,7 @@ private:
     Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs,
                             OperatorPtr& op, PipelinePtr& cur_pipe, int 
parent_idx, int child_idx,
                             const bool followed_by_shuffled_join,
-                            const bool require_bucket_distribution);
+                            const bool require_bucket_distribution, 
OperatorPtr& cache_op);
     template <bool is_intersect>
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to