This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6e38b0a3562 [Pick](query-cache) some query-cache bugfix 
(#47883)(#47961)(#51202) (#51204)
6e38b0a3562 is described below

commit 6e38b0a3562b1d924ec212ee199ef6595ae75157
Author: 924060929 <[email protected]>
AuthorDate: Tue May 27 21:25:02 2025 +0800

    [Pick](query-cache) some query-cache bugfix (#47883)(#47961)(#51202) 
(#51204)
    
    cherry pick from #47883, #47961, #51202
    
    ---------
    
    Co-authored-by: HappenLee <[email protected]>
---
 be/src/pipeline/exec/cache_source_operator.cpp     |   9 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  11 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |   3 +
 .../data/query_p0/cache/query_cache.out            | Bin 0 -> 635 bytes
 .../suites/query_p0/cache/query_cache.groovy       | 182 +++++++++++++++++++++
 5 files changed, 192 insertions(+), 13 deletions(-)

diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
index 1387de351d1..e2bee373f9d 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -70,14 +70,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
         _hit_cache_results = _query_cache_handle.get_cache_result();
         auto hit_cache_slot_orders = 
_query_cache_handle.get_cache_slot_orders();
 
-        bool need_reorder = _slot_orders.size() != 
hit_cache_slot_orders->size();
-        if (!need_reorder) {
-            for (int i = 0; i < _slot_orders.size(); ++i) {
-                need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i];
-            }
-        }
-
-        if (need_reorder) {
+        if (_slot_orders != *hit_cache_slot_orders) {
             for (auto slot_id : _slot_orders) {
                 auto find_res = std::find(hit_cache_slot_orders->begin(),
                                           hit_cache_slot_orders->end(), 
slot_id);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 220474c91e4..81a84451e89 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1235,7 +1235,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             return Status::InternalError("Illegal aggregate node " + 
std::to_string(tnode.node_id) +
                                          ": group by and output is empty");
         }
-
+        bool need_create_cache_op =
+                enable_query_cache && tnode.node_id == 
request.fragment.query_cache_param.node_id;
         auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
             auto cache_node_id = 
request.local_params[0].per_node_scan_ranges.begin()->first;
             auto cache_source_id = next_operator_id();
@@ -1269,7 +1270,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
             !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
-            if (enable_query_cache) {
+            if (need_create_cache_op) {
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
 
@@ -1293,7 +1294,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
                    !tnode.agg_node.grouping_exprs.empty()) {
-            if (enable_query_cache) {
+            if (need_create_cache_op) {
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
 
@@ -1310,7 +1311,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         } else {
             // create new pipeline to add query cache operator
             PipelinePtr new_pipe;
-            if (enable_query_cache) {
+            if (need_create_cache_op) {
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
             }
 
@@ -1319,7 +1320,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             } else {
                 op.reset(new AggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
             }
-            if (enable_query_cache) {
+            if (need_create_cache_op) {
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 RETURN_IF_ERROR(new_pipe->add_operator(
                         op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 05594f9c235..d05fb4e3694 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2811,6 +2811,9 @@ public class Coordinator implements CoordInterface {
                     //the scan instance num should not larger than the tablets 
num
                     expectedInstanceNum = Math.min(scanRange.size(), 
parallelExecInstanceNum);
                 }
+                if (params.fragment != null && params.fragment.queryCacheParam 
!= null) {
+                    expectedInstanceNum = scanRange.size();
+                }
                 // 2. split how many scanRange one instance should scan
                 List<List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> perInstanceScanRanges
                         = ListUtil.splitBySize(scanRange, expectedInstanceNum);
diff --git a/regression-test/data/query_p0/cache/query_cache.out 
b/regression-test/data/query_p0/cache/query_cache.out
new file mode 100644
index 00000000000..7786c7e04b0
Binary files /dev/null and 
b/regression-test/data/query_p0/cache/query_cache.out differ
diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy 
b/regression-test/suites/query_p0/cache/query_cache.groovy
new file mode 100644
index 00000000000..50ca483b6a3
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/query_cache.groovy
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("query_cache") {
+    def tableName = 
"table_3_undef_partitions2_keys3_properties4_distributed_by53"
+
+    def test = {
+        sql "set enable_query_cache=false"
+
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE ${tableName} (
+            `pk` int NULL,
+            `col_varchar_10__undef_signed` varchar(10) NULL,
+            `col_int_undef_signed` int NULL,
+            `col_varchar_1024__undef_signed` varchar(1024) NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`pk`, `col_varchar_10__undef_signed`)
+        DISTRIBUTED BY HASH(`pk`) BUCKETS 10
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+
+        sql """
+        INSERT INTO ${tableName}(pk, col_varchar_10__undef_signed, 
col_int_undef_signed, col_varchar_1024__undef_signed) 
+        VALUES
+            (0, "mean", null, "p"),
+            (1, "is", 6, "what"),
+            (2, "one", null, "e")
+    """
+
+        // First complex query - Run without cache
+        order_qt_query_cache1 """
+        SELECT 
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName} AS alias1
+        WHERE (
+            alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', 
'%')
+            AND (
+                (alias1.`pk` = 154 OR (
+                    alias1.col_varchar_1024__undef_signed LIKE 
CONCAT('lWpWJPFqXM', '%')
+                    AND alias1.`pk` = 111
+                ))
+                AND (
+                    alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+                    AND alias1.col_varchar_1024__undef_signed > 'with'
+                )
+                AND alias1.`pk` IS NULL
+            )
+            AND alias1.col_int_undef_signed < 7
+        )
+        GROUP BY field3
+    """
+
+        // Simple query - Run without cache
+        order_qt_query_cache2 """
+        SELECT
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName}
+        GROUP BY field3
+    """
+
+        // Enable query cache
+        sql "set enable_query_cache=true"
+
+        // Run the same complex query with cache enabled
+        order_qt_query_cache3 """
+        SELECT 
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName} AS alias1
+        WHERE (
+            alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', 
'%')
+            AND (
+                (alias1.`pk` = 154 OR (
+                    alias1.col_varchar_1024__undef_signed LIKE 
CONCAT('lWpWJPFqXM', '%')
+                    AND alias1.`pk` = 111
+                ))
+                AND (
+                    alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+                    AND alias1.col_varchar_1024__undef_signed > 'with'
+                )
+                AND alias1.`pk` IS NULL
+            )
+            AND alias1.col_int_undef_signed < 7
+        )
+        GROUP BY field3
+    """
+
+        // Run the same simple query with cache enabled
+        order_qt_query_cache4 """
+        SELECT
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName}
+        GROUP BY field3
+    """
+
+        // Run both queries again to test cache hit
+        order_qt_query_cache5 """
+        SELECT 
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName} AS alias1
+        WHERE (
+            alias1.col_varchar_1024__undef_signed LIKE CONCAT('aEIovabVCD', 
'%')
+            AND (
+                (alias1.`pk` = 154 OR (
+                    alias1.col_varchar_1024__undef_signed LIKE 
CONCAT('lWpWJPFqXM', '%')
+                    AND alias1.`pk` = 111
+                ))
+                AND (
+                    alias1.col_varchar_10__undef_signed != 'IfGTFZuqZr'
+                    AND alias1.col_varchar_1024__undef_signed > 'with'
+                )
+                AND alias1.`pk` IS NULL
+            )
+            AND alias1.col_int_undef_signed < 7
+        )
+        GROUP BY field3
+    """
+
+        order_qt_query_cache6 """
+        SELECT
+            MIN(`pk`) AS field1,
+            MAX(`pk`) AS field2,
+            `pk` AS field3
+        FROM ${tableName}
+        GROUP BY field3
+    """
+
+        order_qt_query_cache7 """
+        SELECT
+        col_int_undef_signed,
+            MIN(`col_int_undef_signed`) AS field1,
+            MAX(`col_int_undef_signed`) AS field2,
+            COUNT(`col_int_undef_signed`) AS field3,
+            SUM(`col_int_undef_signed`) AS field4
+        FROM ${tableName}
+        GROUP BY col_int_undef_signed
+    """
+
+        // reorder the order_qt_query_cache7 select list to test the cache hit
+        order_qt_query_cache8 """
+ SELECT
+    COUNT(`col_int_undef_signed`) AS field3,  -- Count of col_int_undef_signed 
(Original field3)
+    col_int_undef_signed,                      -- The original unsigned 
integer column (Original col_int_undef_signed)
+    SUM(`col_int_undef_signed`) AS field4,     -- Sum of col_int_undef_signed 
(Original field4)
+    MIN(`col_int_undef_signed`) AS field1,     -- Minimum value of 
col_int_undef_signed (Original field1)
+    MAX(`col_int_undef_signed`) AS field2      -- Maximum value of 
col_int_undef_signed (Original field2). Note: Trailing comma removed to avoid 
syntax error.
+FROM ${tableName}
+GROUP BY col_int_undef_signed;
+    """
+    }
+
+    sql "set enable_nereids_distribute_planner=false"
+    test()
+} 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to