This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6afb961e8ea [minor](fuzzy) make enableQueryCache fuzzy (#61583)
6afb961e8ea is described below
commit 6afb961e8ea62a3d8343464747a89da3030a4a41
Author: Gabriel <[email protected]>
AuthorDate: Tue Mar 24 15:53:16 2026 +0800
[minor](fuzzy) make enableQueryCache fuzzy (#61583)
Fix Query Cache the bug to enable fuzzy test
---
be/src/exec/pipeline/pipeline_fragment_context.cpp | 15 ++++++++++-----
be/src/exec/pipeline/pipeline_fragment_context.h | 2 +-
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index b667a4b7274..44fd7bdc37d 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -676,18 +676,20 @@ Status PipelineFragmentContext::_create_tree_helper(
int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
bool current_require_bucket_distribution = require_bucket_distribution;
+ // TODO: Create CacheOperator is confused now
OperatorPtr op = nullptr;
+ OperatorPtr cache_op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op,
cur_pipe,
parent == nullptr ? -1 :
parent->node_id(), child_idx,
followed_by_shuffled_operator,
- current_require_bucket_distribution));
+ current_require_bucket_distribution,
cache_op));
// Initialization must be done here. For example, group by expressions in
agg will be used to
// decide if a local shuffle should be planed, so it must be initialized
here.
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
// add to parent's child(s)
- RETURN_IF_ERROR(parent->set_child(op));
+ RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
} else {
*root = op;
}
@@ -1235,7 +1237,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx,
const bool
followed_by_shuffled_operator,
- const bool
require_bucket_distribution) {
+ const bool
require_bucket_distribution,
+ OperatorPtr& cache_op) {
std::vector<DataSinkOperatorPtr> sink_ops;
Defer defer = Defer([&]() {
if (op) {
@@ -1332,7 +1335,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(new_pipe->id());
DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
- next_sink_operator_id(), cache_source_id,
op->operator_id()));
+ next_sink_operator_id(), op->node_id(),
op->operator_id()));
RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
return Status::OK();
};
@@ -1358,6 +1361,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+ cache_op = op;
op = std::make_shared<DistinctStreamingAggOperatorX>(pool,
next_operator_id(),
tnode,
descs);
RETURN_IF_ERROR(new_pipe->add_operator(op,
_parallel_instances));
@@ -1372,7 +1376,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
-
+ cache_op = op;
op = std::make_shared<StreamingAggOperatorX>(pool,
next_operator_id(), tnode,
descs);
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
@@ -1388,6 +1392,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
if (need_create_cache_op) {
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+ cache_op = op;
}
if (enable_spill) {
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 64ce1b665ac..e523e711898 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -148,7 +148,7 @@ private:
Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs,
OperatorPtr& op, PipelinePtr& cur_pipe, int
parent_idx, int child_idx,
const bool followed_by_shuffled_join,
- const bool require_bucket_distribution);
+ const bool require_bucket_distribution,
OperatorPtr& cache_op);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]