This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6e38b0a3562 [Pick](query-cache) some query-cache bugfix
(#47883)(#47961)(#51202) (#51204)
6e38b0a3562 is described below
commit 6e38b0a3562b1d924ec212ee199ef6595ae75157
Author: 924060929 <[email protected]>
AuthorDate: Tue May 27 21:25:02 2025 +0800
[Pick](query-cache) some query-cache bugfix (#47883)(#47961)(#51202)
(#51204)
cherry pick from #47883, #47961, #51202
---------
Co-authored-by: HappenLee <[email protected]>
---
be/src/pipeline/exec/cache_source_operator.cpp | 9 +-
be/src/pipeline/pipeline_fragment_context.cpp | 11 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 3 +
.../data/query_p0/cache/query_cache.out | Bin 0 -> 635 bytes
.../suites/query_p0/cache/query_cache.groovy | 182 +++++++++++++++++++++
5 files changed, 192 insertions(+), 13 deletions(-)
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp
b/be/src/pipeline/exec/cache_source_operator.cpp
index 1387de351d1..e2bee373f9d 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -70,14 +70,7 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_hit_cache_results = _query_cache_handle.get_cache_result();
auto hit_cache_slot_orders =
_query_cache_handle.get_cache_slot_orders();
- bool need_reorder = _slot_orders.size() !=
hit_cache_slot_orders->size();
- if (!need_reorder) {
- for (int i = 0; i < _slot_orders.size(); ++i) {
- need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i];
- }
- }
-
- if (need_reorder) {
+ if (_slot_orders != *hit_cache_slot_orders) {
for (auto slot_id : _slot_orders) {
auto find_res = std::find(hit_cache_slot_orders->begin(),
hit_cache_slot_orders->end(),
slot_id);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 220474c91e4..81a84451e89 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1235,7 +1235,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Illegal aggregate node " +
std::to_string(tnode.node_id) +
": group by and output is empty");
}
-
+ bool need_create_cache_op =
+ enable_query_cache && tnode.node_id ==
request.fragment.query_cache_param.node_id;
auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
auto cache_node_id =
request.local_params[0].per_node_scan_ranges.begin()->first;
auto cache_source_id = next_operator_id();
@@ -1269,7 +1270,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
- if (enable_query_cache) {
+ if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
@@ -1293,7 +1294,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
- if (enable_query_cache) {
+ if (need_create_cache_op) {
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
@@ -1310,7 +1311,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
// create new pipeline to add query cache operator
PipelinePtr new_pipe;
- if (enable_query_cache) {
+ if (need_create_cache_op) {
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
}
@@ -1319,7 +1320,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
} else {
op.reset(new AggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
}
- if (enable_query_cache) {
+ if (need_create_cache_op) {
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
RETURN_IF_ERROR(new_pipe->add_operator(
op, request.__isset.parallel_instances ?
request.parallel_instances : 0));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 05594f9c235..d05fb4e3694 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2811,6 +2811,9 @@ public class Coordinator implements CoordInterface {
//the scan instance num should not larger than the tablets
num
expectedInstanceNum = Math.min(scanRange.size(),
parallelExecInstanceNum);
}
+ if (params.fragment != null && params.fragment.queryCacheParam
!= null) {
+ expectedInstanceNum = scanRange.size();
+ }
// 2. split how many scanRange one instance should scan
List<List<Pair<Integer, Map<Integer,
List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, expectedInstanceNum);
diff --git a/regression-test/data/query_p0/cache/query_cache.out
b/regression-test/data/query_p0/cache/query_cache.out
new file mode 100644
index 00000000000..7786c7e04b0
Binary files /dev/null and
b/regression-test/data/query_p0/cache/query_cache.out differ
diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy
b/regression-test/suites/query_p0/cache/query_cache.groovy
new file mode 100644
index 00000000000..50ca483b6a3
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/query_cache.groovy
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("query_cache") {
+ def tableName =
"table_3_undef_partitions2_keys3_properties4_distributed_by53"
+
+ def test = {
+ sql "set enable_query_cache=false"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `pk` int NULL,
+ `col_varchar_10__undef_signed` varchar(10) NULL,
+ `col_int_undef_signed` int NULL,
+ `col_varchar_1024__undef_signed` varchar(1024) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`)
+ DISTRIBUTED BY HASH(`pk`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName}(pk, col_varchar_10__undef_signed,
col_int_undef_signed, col_varchar_1024__undef_signed)
+ VALUES
+ (0, "mean", null, "p"),
+ (1, "is", 6, "what"),
+ (2, "one", null, "e")
+ """
+
+ // First complex query - Run without cache
+ order_qt_query_cache1 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName} AS alias1
+ WHERE (
+ alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD',
'%')
+ AND (
+ (alias1.`pk` = 154 OR (
+ alias1.col_varchar_1024__undef_signed LIKE
CONCAT('lWpWJPFqXM', '%')
+ AND alias1.`pk` = 111
+ ))
+ AND (
+ alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+ AND alias1.col_varchar_1024__undef_signed > 'with'
+ )
+ AND alias1.`pk` IS NULL
+ )
+ AND alias1.col_int_undef_signed < 7
+ )
+ GROUP BY field3
+ """
+
+ // Simple query - Run without cache
+ order_qt_query_cache2 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName}
+ GROUP BY field3
+ """
+
+ // Enable query cache
+ sql "set enable_query_cache=true"
+
+ // Run the same complex query with cache enabled
+ order_qt_query_cache3 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName} AS alias1
+ WHERE (
+ alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD',
'%')
+ AND (
+ (alias1.`pk` = 154 OR (
+ alias1.col_varchar_1024__undef_signed LIKE
CONCAT('lWpWJPFqXM', '%')
+ AND alias1.`pk` = 111
+ ))
+ AND (
+ alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+ AND alias1.col_varchar_1024__undef_signed > 'with'
+ )
+ AND alias1.`pk` IS NULL
+ )
+ AND alias1.col_int_undef_signed < 7
+ )
+ GROUP BY field3
+ """
+
+ // Run the same simple query with cache enabled
+ order_qt_query_cache4 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName}
+ GROUP BY field3
+ """
+
+ // Run both queries again to test cache hit
+ order_qt_query_cache5 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName} AS alias1
+ WHERE (
+ alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD',
'%')
+ AND (
+ (alias1.`pk` = 154 OR (
+ alias1.col_varchar_1024__undef_signed LIKE
CONCAT('lWpWJPFqXM', '%')
+ AND alias1.`pk` = 111
+ ))
+ AND (
+ alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+ AND alias1.col_varchar_1024__undef_signed > 'with'
+ )
+ AND alias1.`pk` IS NULL
+ )
+ AND alias1.col_int_undef_signed < 7
+ )
+ GROUP BY field3
+ """
+
+ order_qt_query_cache6 """
+ SELECT
+ MIN(`pk`) AS field1,
+ MAX(`pk`) AS field2,
+ `pk` AS field3
+ FROM ${tableName}
+ GROUP BY field3
+ """
+
+ order_qt_query_cache7 """
+ SELECT
+ col_int_undef_signed,
+ MIN(`col_int_undef_signed`) AS field1,
+ MAX(`col_int_undef_signed`) AS field2,
+ COUNT(`col_int_undef_signed`) AS field3,
+ SUM(`col_int_undef_signed`) AS field4
+ FROM ${tableName}
+ GROUP BY col_int_undef_signed
+ """
+
+ // reorder the order_qt_query_cache7 select list to test the cache hit
+ order_qt_query_cache8 """
+ SELECT
+ COUNT(`col_int_undef_signed`) AS field3, -- Count of col_int_undef_signed
(Original field3)
+ col_int_undef_signed, -- The original unsigned
integer column (Original col_int_undef_signed)
+ SUM(`col_int_undef_signed`) AS field4, -- Sum of col_int_undef_signed
(Original field4)
+ MIN(`col_int_undef_signed`) AS field1, -- Minimum value of
col_int_undef_signed (Original field1)
+ MAX(`col_int_undef_signed`) AS field2 -- Maximum value of
col_int_undef_signed (Original field2). Note: Trailing comma removed to avoid
syntax error.
+FROM ${tableName}
+GROUP BY col_int_undef_signed;
+ """
+ }
+
+ sql "set enable_nereids_distribute_planner=false"
+ test()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]