This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 628e84c4c28 branch-3.1: [fix](cloud) avoid broadcast remote read in
topn query #58044 (#58112)
628e84c4c28 is described below
commit 628e84c4c2833c40dd639674127ba7730d1e0dbf
Author: zhengyu <[email protected]>
AuthorDate: Tue Nov 25 11:22:49 2025 +0800
branch-3.1: [fix](cloud) avoid broadcast remote read in topn query #58044
(#58112)
picked from #58044
---------
Signed-off-by: zhengyu <[email protected]>
---
be/src/cloud/cloud_storage_engine.cpp | 4 +-
be/src/cloud/cloud_storage_engine.h | 2 +-
be/src/cloud/cloud_tablet_mgr.cpp | 9 +-
be/src/olap/storage_engine.cpp | 2 +-
be/src/olap/storage_engine.h | 4 +-
be/src/runtime/exec_env.cpp | 4 +-
be/src/runtime/exec_env.h | 2 +-
.../cloud_p0/cache/test_topn_broadcast.groovy | 148 +++++++++++++++++++++
8 files changed, 162 insertions(+), 13 deletions(-)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index fe4c1f5f099..dc84d95eac5 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -273,8 +273,8 @@ bool CloudStorageEngine::stopped() {
Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
SyncRowsetStats*
sync_stats,
- bool force_use_cache) {
- return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats,
force_use_cache)
+ bool
force_use_only_cached) {
+ return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats,
force_use_only_cached)
.transform([](auto&& t) { return
static_pointer_cast<BaseTablet>(std::move(t)); });
}
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index cfa13cf89ea..7b7eda60db2 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -63,7 +63,7 @@ public:
bool stopped() override;
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats*
sync_stats = nullptr,
- bool force_use_cache = false) override;
+ bool force_use_only_cached = false)
override;
Status start_bg_threads() override;
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index 3512b7d94cd..9fec651a28e 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -161,7 +161,7 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t
tablet_id, bool warmup_data,
bool
sync_delete_bitmap,
SyncRowsetStats* sync_stats,
- bool
local_only) {
+ bool
force_use_only_cached) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than
`CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
@@ -180,13 +180,14 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
auto* handle = _cache->lookup(key);
if (handle == nullptr) {
- if (local_only) {
+ if (force_use_only_cached) {
LOG(INFO) << "tablet=" << tablet_id
- << "does not exists in local tablet cache, because param
local_only=true, "
+ << "does not exists in local tablet cache, because param
"
+ "force_use_only_cached=true, "
"treat it as an error";
return ResultError(Status::InternalError(
"tablet={} does not exists in local tablet cache, because
param "
- "local_only=true, "
+ "force_use_only_cached=true, "
"treat it as an error",
tablet_id));
}
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index fba45325484..e426ca30570 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1344,7 +1344,7 @@ Status StorageEngine::create_tablet(const
TCreateTabletReq& request, RuntimeProf
}
Result<BaseTabletSPtr> StorageEngine::get_tablet(int64_t tablet_id,
SyncRowsetStats* sync_stats,
- bool force_use_cache) {
+ bool force_use_only_cached) {
BaseTabletSPtr tablet;
std::string err;
tablet = _tablet_manager->get_tablet(tablet_id, true, &err);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index ebc6a2ede3d..5c362514c53 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -114,7 +114,7 @@ public:
virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
SyncRowsetStats* sync_stats =
nullptr,
- bool force_use_cache = false) =
0;
+ bool force_use_only_cached =
false) = 0;
void register_report_listener(ReportWorker* listener);
void deregister_report_listener(ReportWorker* listener);
@@ -233,7 +233,7 @@ public:
Status create_tablet(const TCreateTabletReq& request, RuntimeProfile*
profile);
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats*
sync_stats = nullptr,
- bool force_use_cache = false) override;
+ bool force_use_only_cached = false)
override;
void clear_transaction_task(const TTransactionId transaction_id);
void clear_transaction_task(const TTransactionId transaction_id,
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 70835a7284f..5f6a53055d7 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -58,10 +58,10 @@ void ExecEnv::set_write_cooldown_meta_executors() {
#endif // BE_TEST
Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id, SyncRowsetStats*
sync_stats,
- bool force_use_cache) {
+ bool force_use_only_cached) {
auto storage_engine = GetInstance()->_storage_engine.get();
return storage_engine != nullptr
- ? storage_engine->get_tablet(tablet_id, sync_stats)
+ ? storage_engine->get_tablet(tablet_id, sync_stats,
force_use_only_cached)
: ResultError(Status::InternalError("failed to get tablet
{}", tablet_id));
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index d740ad3eed0..28ef5156a54 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -167,7 +167,7 @@ public:
// Requires ExenEnv ready
static Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
SyncRowsetStats* sync_stats =
nullptr,
- bool force_use_cache = false);
+ bool force_use_only_cached =
false);
static bool ready() { return _s_ready.load(std::memory_order_acquire); }
static bool tracking_memory() { return
_s_tracking_memory.load(std::memory_order_acquire); }
diff --git a/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy
b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy
new file mode 100644
index 00000000000..2fb8d834f11
--- /dev/null
+++ b/regression-test/suites/cloud_p0/cache/test_topn_broadcast.groovy
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.Http
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_topn_broadcast", "docker") {
+
+
+ def options = new ClusterOptions()
+
+ options.feNum = 1
+ options.beNum = 3
+ options.msNum = 1
+ options.cloudMode = true
+ options.feConfigs += ['example_conf_k1=v1', 'example_conf_k2=v2']
+ options.beConfigs += ['enable_file_cache=true',
'enable_java_support=false',
'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'file_cache_background_lru_dump_interval_ms=2000',
'file_cache_background_lru_log_replay_interval_ms=500',
+ 'disable_auto_compation=true',
'file_cache_enter_need_evict_cache_in_advance_percent=99',
+
'file_cache_background_lru_dump_update_cnt_threshold=0'
+ ]
+
+ docker(options) {
+ // define a sql table
+ def indexTbName = "test_topn_broadcast"
+
+ sql "set global enable_two_phase_read_opt = true"
+ sql " set global enable_common_expr_pushdown = true "
+ sql " set global topn_opt_limit_threshold = 1024 "
+ sql "DROP TABLE IF EXISTS ${indexTbName}"
+ sql """
+ CREATE TABLE ${indexTbName} (
+ `@timestamp` int(11) NULL COMMENT "",
+ `clientip` varchar(20) NULL COMMENT "",
+ `request` text NULL COMMENT "",
+ `status` int(11) NULL COMMENT "",
+ `size` int(11) NULL COMMENT "",
+ INDEX clientip_idx (`clientip`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT '',
+ INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" =
"english", "support_phrase" = "true") COMMENT ''
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`@timestamp`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY RANDOM BUCKETS 3
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+
+ def load_httplogs_data = {table_name, label, read_flag, format_flag,
ignore_failure=false,
+ expected_succ_rows = -1, load_to_single_tablet =
'true' ->
+ // load the json data
+ streamLoad {
+ table "${table_name}"
+
+ // set http request header params
+ set 'label', label + "_" + UUID.randomUUID().toString()
+ set 'read_json_by_line', read_flag
+ set 'format', format_flag
+ file context.config.dataPath +
"/fault_injection_p0/documents-1000.json"
+ time 10000 // limit inflight 10s
+ if (expected_succ_rows >= 0) {
+ set 'max_filter_ratio', '1'
+ }
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (ignore_failure && expected_succ_rows < 0) { return }
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ }
+ }
+ }
+
+ try {
+ load_httplogs_data.call(indexTbName, 'test_topn_broadcast1', 'true',
'json')
+ load_httplogs_data.call(indexTbName, 'test_topn_broadcast2', 'true',
'json')
+ load_httplogs_data.call(indexTbName, 'test_topn_broadcast3', 'true',
'json')
+ sql "sync"
+
+ def explain_result = sql """ explain select * from ${indexTbName}
order by `@timestamp` limit 512; """
+ println explain_result
+
+ sql """ select * from ${indexTbName} order by `@timestamp` limit 512;
"""
+
+ //
TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
+ List<List<Object>> tabletRows = sql """ show tablets from
${indexTbName}; """
+ def tabletIds = tabletRows.collect { row -> row[0].toString()
}.unique()
+ assertTrue(tabletIds.size() > 0, "table ${indexTbName} should contain
at least one tablet")
+ // print tabletIds
+ println "Tablet IDs: ${tabletIds}"
+
+ List<List<Object>> backendRows = sql """ show backends """
+ def bes = backendRows
+ .findAll { row -> row[9].toString().equalsIgnoreCase("true") }
+ .collect { row ->
+ [
+ host : row[1].toString(),
+ httpPort: row[4].toString().toInteger()
+ ]
+ }
+ assertTrue(!bes.isEmpty(), "no alive backend hosts available for
verification")
+ def expectedHostCount = bes.size()
+
+ // Collect which backends report each tablet. New requirement:
+ // If any tablet appears in more than one backend's tablets_json, fail
the test.
+ def tabletPresence = [:].withDefault { [] as List<String> }
+ bes.each { be ->
+ def response =
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true)
+ assertEquals(0, response.code as Integer)
+ def data = response.data
+ def beTablets = data.tablets.collect { it.tablet_id as String }
+ tabletIds.each { tabletId ->
+ if (beTablets.contains(tabletId)) {
+ tabletPresence[tabletId] << be.host
+ }
+ }
+ }
+
+ tabletIds.each { tabletId ->
+ def hosts = tabletPresence[tabletId].unique()
+ assertFalse(hosts.size() > 1, "tablet ${tabletId} appears on
multiple backends: ${hosts}")
+ }
+
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]