This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 628e84c4c28 branch-3.1: [fix](cloud) avoid broadcast remote read in 
topn query #58044 (#58112)
628e84c4c28 is described below

commit 628e84c4c2833c40dd639674127ba7730d1e0dbf
Author: zhengyu <[email protected]>
AuthorDate: Tue Nov 25 11:22:49 2025 +0800

    branch-3.1: [fix](cloud) avoid broadcast remote read in topn query #58044 
(#58112)
    
    picked from #58044
    
    ---------
    
    Signed-off-by: zhengyu <[email protected]>
---
 be/src/cloud/cloud_storage_engine.cpp              |   4 +-
 be/src/cloud/cloud_storage_engine.h                |   2 +-
 be/src/cloud/cloud_tablet_mgr.cpp                  |   9 +-
 be/src/olap/storage_engine.cpp                     |   2 +-
 be/src/olap/storage_engine.h                       |   4 +-
 be/src/runtime/exec_env.cpp                        |   4 +-
 be/src/runtime/exec_env.h                          |   2 +-
 .../cloud_p0/cache/test_topn_broadcast.groovy      | 148 +++++++++++++++++++++
 8 files changed, 162 insertions(+), 13 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index fe4c1f5f099..dc84d95eac5 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -273,8 +273,8 @@ bool CloudStorageEngine::stopped() {
 
 Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
                                                       SyncRowsetStats* 
sync_stats,
-                                                      bool force_use_cache) {
-    return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, 
force_use_cache)
+                                                      bool 
force_use_only_cached) {
+    return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, 
force_use_only_cached)
             .transform([](auto&& t) { return 
static_pointer_cast<BaseTablet>(std::move(t)); });
 }
 
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index cfa13cf89ea..7b7eda60db2 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -63,7 +63,7 @@ public:
     bool stopped() override;
 
     Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* 
sync_stats = nullptr,
-                                      bool force_use_cache = false) override;
+                                      bool force_use_only_cached = false) 
override;
 
     Status start_bg_threads() override;
 
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index 3512b7d94cd..9fec651a28e 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -161,7 +161,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
 Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t 
tablet_id, bool warmup_data,
                                                                 bool 
sync_delete_bitmap,
                                                                 
SyncRowsetStats* sync_stats,
-                                                                bool 
local_only) {
+                                                                bool 
force_use_only_cached) {
     // LRU value type. `Value`'s lifetime MUST NOT be longer than 
`CloudTabletMgr`
     class Value : public LRUCacheValueBase {
     public:
@@ -180,13 +180,14 @@ Result<std::shared_ptr<CloudTablet>> 
CloudTabletMgr::get_tablet(int64_t tablet_i
     auto* handle = _cache->lookup(key);
 
     if (handle == nullptr) {
-        if (local_only) {
+        if (force_use_only_cached) {
             LOG(INFO) << "tablet=" << tablet_id
-                      << "does not exists in local tablet cache, because param 
local_only=true, "
+                      << "does not exists in local tablet cache, because param 
"
+                         "force_use_only_cached=true, "
                          "treat it as an error";
             return ResultError(Status::InternalError(
                     "tablet={} does not exists in local tablet cache, because 
param "
-                    "local_only=true, "
+                    "force_use_only_cached=true, "
                     "treat it as an error",
                     tablet_id));
         }
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index fba45325484..e426ca30570 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1344,7 +1344,7 @@ Status StorageEngine::create_tablet(const 
TCreateTabletReq& request, RuntimeProf
 }
 
 Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id, 
SyncRowsetStats* sync_stats,
-                                                 bool force_use_cache) {
+                                                 bool force_use_only_cached) {
     BaseTabletSPtr tablet;
     std::string err;
     tablet = _tablet_manager->get_tablet(tablet_id, true, &err);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index ebc6a2ede3d..5c362514c53 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -114,7 +114,7 @@ public:
 
     virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
                                               SyncRowsetStats* sync_stats = 
nullptr,
-                                              bool force_use_cache = false) = 
0;
+                                              bool force_use_only_cached = 
false) = 0;
 
     void register_report_listener(ReportWorker* listener);
     void deregister_report_listener(ReportWorker* listener);
@@ -233,7 +233,7 @@ public:
     Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* 
profile);
 
     Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* 
sync_stats = nullptr,
-                                      bool force_use_cache = false) override;
+                                      bool force_use_only_cached = false) 
override;
 
     void clear_transaction_task(const TTransactionId transaction_id);
     void clear_transaction_task(const TTransactionId transaction_id,
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 70835a7284f..5f6a53055d7 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -58,10 +58,10 @@ void ExecEnv::set_write_cooldown_meta_executors() {
 #endif // BE_TEST
 
 Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats* 
sync_stats,
-                                           bool force_use_cache) {
+                                           bool force_use_only_cached) {
     auto storage_engine = GetInstance()->_storage_engine.get();
     return storage_engine != nullptr
-                   ? storage_engine->get_tablet(tablet_id, sync_stats)
+                   ? storage_engine->get_tablet(tablet_id, sync_stats, 
force_use_only_cached)
                    : ResultError(Status::InternalError("failed to get tablet 
{}", tablet_id));
 }
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index d740ad3eed0..28ef5156a54 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -167,7 +167,7 @@ public:
     // Requires ExenEnv ready
     static Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
                                              SyncRowsetStats* sync_stats = 
nullptr,
-                                             bool force_use_cache = false);
+                                             bool force_use_only_cached = 
false);
 
     static bool ready() { return _s_ready.load(std::memory_order_acquire); }
     static bool tracking_memory() { return 
_s_tracking_memory.load(std::memory_order_acquire); }
diff --git a/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy 
b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy
new file mode 100644
index 00000000000..2fb8d834f11
--- /dev/null
+++ b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Http
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_topn_broadcast", "docker") {
+
+
+    def options = new ClusterOptions()
+
+    options.feNum = 1
+    options.beNum = 3
+    options.msNum = 1
+    options.cloudMode = true
+    options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2']
+    options.beConfigs += ['enable_file_cache=true', 
'enable_java_support=false', 
'file_cache_enter_disk_resource_limit_mode_percent=99',
+                          'file_cache_background_lru_dump_interval_ms=2000', 
'file_cache_background_lru_log_replay_interval_ms=500',
+                          'disable_auto_compation=true', 
'file_cache_enter_need_evict_cache_in_advance_percent=99',
+                          
'file_cache_background_lru_dump_update_cnt_threshold=0'
+                        ]
+
+    docker(options) {
+    // define a sql table
+    def indexTbName = "test_topn_broadcast"
+
+    sql "set global enable_two_phase_read_opt = true"
+    sql " set global enable_common_expr_pushdown = true "
+    sql " set global topn_opt_limit_threshold = 1024 "
+    sql "DROP TABLE IF EXISTS ${indexTbName}"
+    sql """
+      CREATE TABLE ${indexTbName} (
+        `@timestamp` int(11) NULL COMMENT "",
+        `clientip` varchar(20) NULL COMMENT "",
+        `request` text NULL COMMENT "",
+        `status` int(11) NULL COMMENT "",
+        `size` int(11) NULL COMMENT "",
+        INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT '',
+        INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT ''
+      ) ENGINE=OLAP
+      DUPLICATE KEY(`@timestamp`)
+      COMMENT "OLAP"
+      DISTRIBUTED BY RANDOM BUCKETS 3
+      PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "disable_auto_compaction" = "true"
+      );
+    """
+
+
+    def load_httplogs_data = {table_name, label, read_flag, format_flag, 
ignore_failure=false,
+                        expected_succ_rows = -1, load_to_single_tablet = 
'true' ->
+        // load the json data
+        streamLoad {
+            table "${table_name}"
+
+            // set http request header params
+            set 'label', label + "_" + UUID.randomUUID().toString()
+            set 'read_json_by_line', read_flag
+            set 'format', format_flag
+            file context.config.dataPath + 
"/fault_injection_p0/documents-1000.json"
+            time 10000 // limit inflight 10s
+            if (expected_succ_rows >= 0) {
+                set 'max_filter_ratio', '1'
+            }
+
+            // if declared a check callback, the default check condition will 
ignore.
+            // So you must check all condition
+            check { result, exception, startTime, endTime ->
+                       if (ignore_failure && expected_succ_rows < 0) { return }
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+            }
+        }
+    }
+
+    try {
+        load_httplogs_data.call(indexTbName, 'test_topn_broadcast1', 'true', 
'json')
+        load_httplogs_data.call(indexTbName, 'test_topn_broadcast2', 'true', 
'json')
+        load_httplogs_data.call(indexTbName, 'test_topn_broadcast3', 'true', 
'json')
+        sql "sync"
+
+        def explain_result = sql """ explain select * from ${indexTbName} 
order by `@timestamp` limit 512; """
+        println explain_result
+
+        sql """ select * from ${indexTbName} order by `@timestamp` limit 512; 
"""
+
+        // 
TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
+        List<List<Object>> tabletRows = sql """ show tablets from 
${indexTbName}; """
+        def tabletIds = tabletRows.collect { row -> row[0].toString() 
}.unique()
+        assertTrue(tabletIds.size() > 0, "table ${indexTbName} should contain 
at least one tablet")
+        // print tabletIds
+        println "Tablet IDs: ${tabletIds}"
+
+        List<List<Object>> backendRows = sql """ show backends """
+        def bes = backendRows
+            .findAll { row -> row[9].toString().equalsIgnoreCase("true") }
+            .collect { row ->
+                [
+                    host    : row[1].toString(),
+                    httpPort: row[4].toString().toInteger()
+                ]
+            }
+        assertTrue(!bes.isEmpty(), "no alive backend hosts available for 
verification")
+        def expectedHostCount = bes.size()
+
+        // Collect which backends report each tablet. New requirement:
+        // If any tablet appears in more than one backend's tablets_json, fail 
the test.
+        def tabletPresence = [:].withDefault { [] as List<String> }
+        bes.each { be ->
+            def response = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true)
+            assertEquals(0, response.code as Integer)
+            def data = response.data
+            def beTablets = data.tablets.collect { it.tablet_id as String }
+            tabletIds.each { tabletId ->
+                if (beTablets.contains(tabletId)) {
+                    tabletPresence[tabletId] << be.host
+                }
+            }
+        }
+
+        tabletIds.each { tabletId ->
+            def hosts = tabletPresence[tabletId].unique()
+            assertFalse(hosts.size() > 1, "tablet ${tabletId} appears on 
multiple backends: ${hosts}")
+        }
+
+    } finally {
+    }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to