This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8c8c43274f3 [fix](cloud) Fix cloud balance warm up lack of latest rs
information (#54404)
8c8c43274f3 is described below
commit 8c8c43274f3f74ae7704f0ec6f3bc3b66156a3ce
Author: deardeng <[email protected]>
AuthorDate: Fri Aug 8 21:51:28 2025 +0800
[fix](cloud) Fix cloud balance warm up lack of latest rs information
(#54404)
### What problem does this PR solve?
On the cloud, the rs layout is not immediately visible after load. Query
or compaction is required to trigger sync rowsets. During the balance
warm-up, rs information from a long time ago may be synchronized to the
dest be, causing cache misses.
---
be/src/cloud/cloud_internal_service.cpp | 6 +
be/src/io/cache/block_file_cache_downloader.cpp | 3 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 6 +-
.../cloud_p0/balance/test_balance_warm_up.groovy | 147 +++++++++++++++++++++
4 files changed, 158 insertions(+), 4 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index e35cfd0e01f..fd0c98bb028 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -85,6 +85,12 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
return;
}
CloudTabletSPtr tablet = std::move(res.value());
+ auto st = tablet->sync_rowsets();
+ if (!st) {
+ // just log failed, try it best
+ LOG(WARNING) << "failed to sync rowsets: " << tablet_id
+ << " err msg: " << st.to_string();
+ }
auto rowsets = tablet->get_snapshot_rowset();
std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const
RowsetSharedPtr& rowset) {
std::string rowset_id = rowset->rowset_id().to_string();
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 000308a7e2d..d79fe269971 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -174,7 +174,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" <<
meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" <<
meta.segment_id()
- << ", offset=" << meta.offset() << ", size=" << meta.size();
+ << ", offset=" << meta.offset() << ", size=" << meta.size()
+ << ", type=" << meta.cache_type();
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(),
false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : "
<< res.error();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index e7f080ea329..887490d95f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -830,15 +830,15 @@ public class CloudTabletRebalancer extends MasterDaemon {
req.setTablets(tabletIds);
TCheckWarmUpCacheAsyncResponse result =
client.checkWarmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
- LOG.warn("check pre cache status {} {}",
result.getStatus().getStatusCode(),
+ LOG.warn("check pre tablets {} cache status {} {}", tabletIds,
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
} else {
- LOG.info("check pre cache succ status {} {}",
result.getStatus().getStatusCode(),
+ LOG.info("check pre tablets {} cache succ status {} {}",
tabletIds, result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
return result.getTaskDone();
} catch (Exception e) {
- LOG.warn("send check pre cache rpc error. backend[{}]",
destBackend.getId(), e);
+ LOG.warn("send check pre cache rpc error. tablets{} backend[{}]",
tabletIds, destBackend.getId(), e);
ok = false;
} finally {
if (ok) {
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
new file mode 100644
index 00000000000..f6afcfb2f99
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'enable_cloud_warm_up_for_rebalance=true'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeMergedCacheDir = beforeCacheDirVersion2 +
beforeCacheDirVersion3.collectEntries { host, hashFiles ->
+ [(host): beforeCacheDirVersion2[host] ?
(beforeCacheDirVersion2[host] + hashFiles) : hashFiles]
+ }
+ logger.info("before fe tablets {}, be tablets {}, cache dir {}",
beforeGetFromFe, beforeGetFromBe, beforeMergedCacheDir)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ cluster.addBackend(1, "compute_cluster")
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+ // balance tablet
+ awaitUntil(500) {
+ def afterWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("after warm up result {}", afterWarmUpResult)
+ afterWarmUpResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ // from be1 -> be2, warm up this tablet
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+ // version 3
+ def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+
+ def afterMergedCacheDir = afterCacheDirVersion2 +
afterCacheDirVersion3.collectEntries { host, hashFiles ->
+ [(host): afterCacheDirVersion2[host] ?
(afterCacheDirVersion2[host] + hashFiles) : hashFiles]
+ }
+ logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+ def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ assert newAddBeCacheDir.size() != 0
+ assert
beforeMergedCacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+ def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+ def dataPath = new File("${be.path}/storage/file_cache")
+ logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+ logger.info("Directory exists: {}", dataPath.exists())
+
+ def subDirs = []
+
+ def collectDirs
+ collectDirs = { File dir ->
+ if (dir.exists()) {
+ dir.eachDir { subDir ->
+ logger.info("Found subdir: {}", subDir.name)
+ subDirs << subDir.name
+ collectDirs(subDir)
+ }
+ }
+ }
+
+ collectDirs(dataPath)
+ logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+ newAddBeCacheDir.each { hashFile ->
+ assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} not found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_tbl")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]