This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4794327ac03 branch-3.1: [fix](cloud) Fix cloud balance warm up lack of
latest rs information #54404 (#54505)
4794327ac03 is described below
commit 4794327ac03d37485633f0654250633371125585
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Aug 13 11:47:54 2025 +0800
branch-3.1: [fix](cloud) Fix cloud balance warm up lack of latest rs
information #54404 (#54505)
Cherry-picked from #54404
Co-authored-by: deardeng <[email protected]>
---
be/src/cloud/cloud_internal_service.cpp | 6 +
be/src/io/cache/block_file_cache_downloader.cpp | 3 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 6 +-
.../cloud_p0/balance/test_balance_warm_up.groovy | 147 +++++++++++++++++++++
4 files changed, 158 insertions(+), 4 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 0576cbe4deb..bc6689a4635 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -84,6 +84,12 @@ void
CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
return;
}
CloudTabletSPtr tablet = std::move(res.value());
+ auto st = tablet->sync_rowsets();
+ if (!st) {
+ // just log failed, try it best
+ LOG(WARNING) << "failed to sync rowsets: " << tablet_id
+ << " err msg: " << st.to_string();
+ }
auto rowsets = tablet->get_snapshot_rowset();
std::for_each(rowsets.cbegin(), rowsets.cend(), [&](const
RowsetSharedPtr& rowset) {
std::string rowset_id = rowset->rowset_id().to_string();
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index c7cc5476436..92bb91a7a3f 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -174,7 +174,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
std::ranges::for_each(metas, [&](const FileCacheBlockMeta& meta) {
VLOG_DEBUG << "download_file_cache_block: start, tablet_id=" <<
meta.tablet_id()
<< ", rowset_id=" << meta.rowset_id() << ", segment_id=" <<
meta.segment_id()
- << ", offset=" << meta.offset() << ", size=" << meta.size();
+ << ", offset=" << meta.offset() << ", size=" << meta.size()
+ << ", type=" << meta.cache_type();
CloudTabletSPtr tablet;
if (auto res = _engine.tablet_mgr().get_tablet(meta.tablet_id(),
false); !res.has_value()) {
LOG(INFO) << "failed to find tablet " << meta.tablet_id() << " : "
<< res.error();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index e7f080ea329..887490d95f6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -830,15 +830,15 @@ public class CloudTabletRebalancer extends MasterDaemon {
req.setTablets(tabletIds);
TCheckWarmUpCacheAsyncResponse result =
client.checkWarmUpCacheAsync(req);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
- LOG.warn("check pre cache status {} {}",
result.getStatus().getStatusCode(),
+ LOG.warn("check pre tablets {} cache status {} {}", tabletIds,
result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
} else {
- LOG.info("check pre cache succ status {} {}",
result.getStatus().getStatusCode(),
+ LOG.info("check pre tablets {} cache succ status {} {}",
tabletIds, result.getStatus().getStatusCode(),
result.getStatus().getErrorMsgs());
}
return result.getTaskDone();
} catch (Exception e) {
- LOG.warn("send check pre cache rpc error. backend[{}]",
destBackend.getId(), e);
+ LOG.warn("send check pre cache rpc error. tablets{} backend[{}]",
tabletIds, destBackend.getId(), e);
ok = false;
} finally {
if (ok) {
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
new file mode 100644
index 00000000000..f6afcfb2f99
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_balance_warm_up.groovy
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_warm_up', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'enable_cloud_warm_up_for_rebalance=true'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def testCase = { table ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ sql """
+ insert into $table values (10, '1'), (20, '2')
+ """
+ sql """
+ insert into $table values (30, '3'), (40, '4')
+ """
+
+ // before add be
+ def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+ def beforeGetFromBe =
getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def beforeCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("cache dir version 2 {}", beforeCacheDirVersion2)
+ // version 3
+ def beforeCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("cache dir version 3 {}", beforeCacheDirVersion3)
+
+ def beforeMergedCacheDir = beforeCacheDirVersion2 +
beforeCacheDirVersion3.collectEntries { host, hashFiles ->
+ [(host): beforeCacheDirVersion2[host] ?
(beforeCacheDirVersion2[host] + hashFiles) : hashFiles]
+ }
+ logger.info("before fe tablets {}, be tablets {}, cache dir {}",
beforeGetFromFe, beforeGetFromBe, beforeMergedCacheDir)
+
+ def beforeWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("before warm up result {}", beforeWarmUpResult)
+
+ cluster.addBackend(1, "compute_cluster")
+ def oldBe = sql_return_maparray('show backends').get(0)
+ def newAddBe = sql_return_maparray('show backends').get(1)
+ // balance tablet
+ awaitUntil(500) {
+ def afterWarmUpResult = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM $table"""
+ logger.info("after warm up result {}", afterWarmUpResult)
+ afterWarmUpResult.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 1
+ }
+ }
+
+ // from be1 -> be2, warm up this tablet
+ // after add be
+ def afterGetFromFe = getTabletAndBeHostFromFe(table)
+ def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+ // version 2
+ def afterCacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort,
table, 2)
+ logger.info("after cache dir version 2 {}", afterCacheDirVersion2)
+ // version 3
+ def afterCacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort,
table, 3)
+ logger.info("after cache dir version 3 {}", afterCacheDirVersion3)
+
+ def afterMergedCacheDir = afterCacheDirVersion2 +
afterCacheDirVersion3.collectEntries { host, hashFiles ->
+ [(host): afterCacheDirVersion2[host] ?
(afterCacheDirVersion2[host] + hashFiles) : hashFiles]
+ }
+ logger.info("after fe tablets {}, be tablets {}, cache dir {}",
afterGetFromFe, afterGetFromBe, afterMergedCacheDir)
+ def newAddBeCacheDir = afterMergedCacheDir.get(newAddBe.Host)
+ logger.info("new add be cache dir {}", newAddBeCacheDir)
+ assert newAddBeCacheDir.size() != 0
+ assert
beforeMergedCacheDir[oldBe.Host].containsAll(afterMergedCacheDir[newAddBe.Host])
+
+ def be = cluster.getBeByBackendId(newAddBe.BackendId.toLong())
+ def dataPath = new File("${be.path}/storage/file_cache")
+ logger.info("Checking file_cache directory: {}", dataPath.absolutePath)
+ logger.info("Directory exists: {}", dataPath.exists())
+
+ def subDirs = []
+
+ def collectDirs
+ collectDirs = { File dir ->
+ if (dir.exists()) {
+ dir.eachDir { subDir ->
+ logger.info("Found subdir: {}", subDir.name)
+ subDirs << subDir.name
+ collectDirs(subDir)
+ }
+ }
+ }
+
+ collectDirs(dataPath)
+ logger.info("BE {} file_cache subdirs: {}", newAddBe.Host, subDirs)
+
+ newAddBeCacheDir.each { hashFile ->
+ assertTrue(subDirs.any { subDir -> subDir.startsWith(hashFile) },
+ "Expected cache file pattern ${hashFile} not found in BE
${newAddBe.Host}'s file_cache directory. " +
+ "Available subdirs: ${subDirs}")
+ }
+ }
+
+ docker(options) {
+ testCase("test_balance_warm_up_tbl")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]