This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c9341741076 [fix](cloud) fix packed file warmup cannot read small
files (#60160)
c9341741076 is described below
commit c9341741076cb0eb37a4fc53e8aba3a3f5f71603
Author: Xin Liao <[email protected]>
AuthorDate: Sat Jan 24 22:01:43 2026 +0800
[fix](cloud) fix packed file warmup cannot read small files (#60160)
1. Use rs_meta.fs() instead of storage_resource.value()->fs in warm up
functions to support packed files. PackedFileSystem wrapper in
rs_meta.fs() handles the index_map lookup and reads from the correct
packed file path with proper offset.
Without this fix, warm up would try to directly open the segment/index
path which does not exist on S3 because the data is actually stored in a
packed file.
Fixed locations:
- CloudWarmUpManager::handle_jobs(): segment and inverted index download
- CloudInternalServiceImpl::warm_up_rowset(): segment and inverted index
download
2. Fix packed_file_manager to use correct TTL cache type when writing
small files to file cache. Previously it always used
FileCacheType::NORMAL, causing ttl_cache_size mismatch between source
and target clusters during warm up.
Now expiration_time is passed through PackedAppendContext and used to
set the correct cache type (TTL when expiration_time > 0, NORMAL
otherwise).
---
be/src/cloud/cloud_internal_service.cpp | 10 +-
be/src/cloud/cloud_warm_up_manager.cpp | 16 +-
be/src/io/fs/packed_file_manager.cpp | 28 +--
be/src/io/fs/packed_file_manager.h | 1 +
be/src/olap/rowset/rowset_writer_context.h | 1 +
.../test_packed_file_warm_up_cluster_event.groovy | 205 +++++++++++++++++++++
6 files changed, 238 insertions(+), 23 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index ca696409e5f..67595a3f798 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -516,12 +516,15 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
};
+ // Use rs_meta.fs() instead of storage_resource.value()->fs to
support packed files.
+ // PackedFileSystem wrapper in rs_meta.fs() handles the
index_map lookup and
+ // reads from the correct packed file.
io::DownloadFileMeta download_meta {
.path =
storage_resource.value()->remote_segment_path(rs_meta, segment_id),
.file_size = segment_size,
.offset = 0,
.download_size = segment_size,
- .file_system = storage_resource.value()->fs,
+ .file_system = rs_meta.fs(),
.ctx = {.is_index_data = false,
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
@@ -537,8 +540,9 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
_engine.file_cache_block_downloader().submit_download_task(download_meta);
}
+
+ // Use rs_meta.fs() to support packed files for inverted index
download.
auto download_inverted_index = [&, tablet](std::string index_path,
uint64_t idx_size) {
- auto storage_resource = rs_meta.remote_storage_resource();
auto download_done = [=, version = rs_meta.version()](Status
st) {
DBUG_EXECUTE_IF(
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
@@ -594,7 +598,7 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
.file_size = static_cast<int64_t>(idx_size),
- .file_system = storage_resource.value()->fs,
+ .file_system = rs_meta.fs(),
.ctx = {.is_index_data = false, // DORIS-20877
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index dfdef1b3091..40b4734c8ba 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -249,12 +249,14 @@ void CloudWarmUpManager::handle_jobs() {
}
for (int64_t seg_id = 0; seg_id < rs->num_segments();
seg_id++) {
// 1st. download segment files
+ // Use rs->fs() instead of storage_resource.value()->fs to
support packed
+ // files. PackedFileSystem wrapper in RowsetMeta::fs()
handles the index_map
+ // lookup and reads from the correct packed file.
if (!config::file_cache_enable_only_warm_up_idx) {
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
- rs->segment_file_size(cast_set<int>(seg_id)),
- storage_resource.value()->fs, expiration_time,
wait, false,
- [tablet, rs, seg_id](Status st) {
+ rs->segment_file_size(cast_set<int>(seg_id)),
rs->fs(),
+ expiration_time, wait, false, [tablet, rs,
seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " <<
rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(
@@ -299,8 +301,8 @@ void CloudWarmUpManager::handle_jobs() {
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::JOB, rs->rowset_id(),
1);
submit_download_tasks(
- idx_path, file_size,
storage_resource.value()->fs,
- expiration_time, wait, true, [=](Status
st) {
+ idx_path, file_size, rs->fs(),
expiration_time, wait, true,
+ [=](Status st) {
VLOG_DEBUG << "warmup rowset " <<
rs->version()
<< " segment " << seg_id
<< "inverted idx:" <<
idx_path << " completed";
@@ -322,8 +324,8 @@ void CloudWarmUpManager::handle_jobs() {
tablet->update_rowset_warmup_state_inverted_idx_num(
WarmUpTriggerSource::JOB, rs->rowset_id(),
1);
submit_download_tasks(
- idx_path, file_size,
storage_resource.value()->fs,
- expiration_time, wait, true, [=](Status
st) {
+ idx_path, file_size, rs->fs(),
expiration_time, wait, true,
+ [=](Status st) {
VLOG_DEBUG << "warmup rowset " <<
rs->version()
<< " segment " << seg_id
<< "inverted idx:" <<
idx_path << " completed";
diff --git a/be/src/io/fs/packed_file_manager.cpp
b/be/src/io/fs/packed_file_manager.cpp
index 4651fc08305..eaf8afe96c0 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -121,7 +121,7 @@ Status append_packed_info_trailer(FileWriter* writer, const
std::string& packed_
// write small file data to file cache
void do_write_to_file_cache(const std::string& small_file_path, const
std::string& data,
- int64_t tablet_id) {
+ int64_t tablet_id, uint64_t expiration_time) {
if (data.empty()) {
return;
}
@@ -132,7 +132,8 @@ void do_write_to_file_cache(const std::string&
small_file_path, const std::strin
VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
<< " filename=" << path.filename().native() << " hash=" <<
cache_hash.to_string()
- << " size=" << data.size() << " tablet_id=" << tablet_id;
+ << " size=" << data.size() << " tablet_id=" << tablet_id
+ << " expiration_time=" << expiration_time;
BlockFileCache* file_cache =
FileCacheFactory::instance()->get_by_path(cache_hash);
if (file_cache == nullptr) {
@@ -141,7 +142,8 @@ void do_write_to_file_cache(const std::string&
small_file_path, const std::strin
// Allocate cache blocks
CacheContext ctx;
- ctx.cache_type = FileCacheType::NORMAL;
+ ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL :
FileCacheType::NORMAL;
+ ctx.expiration_time = expiration_time;
ctx.tablet_id = tablet_id;
ReadStatistics stats;
ctx.stats = &stats;
@@ -179,7 +181,7 @@ void do_write_to_file_cache(const std::string&
small_file_path, const std::strin
// the async task execution. The original Slice may reference a buffer that
gets
// reused or freed before the async task runs.
void write_small_file_to_cache_async(const std::string& small_file_path, const
Slice& data,
- int64_t tablet_id) {
+ int64_t tablet_id, uint64_t
expiration_time) {
if (!config::enable_file_cache || data.size == 0) {
return;
}
@@ -192,7 +194,7 @@ void write_small_file_to_cache_async(const std::string&
small_file_path, const S
auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
if (thread_pool == nullptr) {
// Fallback to sync write if thread pool not available
- do_write_to_file_cache(small_file_path, data_copy, tablet_id);
+ do_write_to_file_cache(small_file_path, data_copy, tablet_id,
expiration_time);
return;
}
@@ -200,13 +202,13 @@ void write_small_file_to_cache_async(const std::string&
small_file_path, const S
g_packed_file_cache_async_write_count << 1;
g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
- Status st = thread_pool->submit_func(
- [path = small_file_path, data = std::move(data_copy), tablet_id,
data_size]() {
- do_write_to_file_cache(path, data, tablet_id);
- // Decrement async write count after completion
- g_packed_file_cache_async_write_count << -1;
- g_packed_file_cache_async_write_bytes <<
-static_cast<int64_t>(data_size);
- });
+ Status st = thread_pool->submit_func([path = small_file_path, data =
std::move(data_copy),
+ tablet_id, data_size,
expiration_time]() {
+ do_write_to_file_cache(path, data, tablet_id, expiration_time);
+ // Decrement async write count after completion
+ g_packed_file_cache_async_write_count << -1;
+ g_packed_file_cache_async_write_bytes <<
-static_cast<int64_t>(data_size);
+ });
if (!st.ok()) {
// Revert metrics since task was not submitted
@@ -369,7 +371,7 @@ Status PackedFileManager::append_small_file(const
std::string& path, const Slice
// Async write data to file cache using small file path as cache key.
// This ensures cache key matches the cleanup key in Rowset::clear_cache(),
// allowing proper cache cleanup when stale rowsets are removed.
- write_small_file_to_cache_async(path, data, info.tablet_id);
+ write_small_file_to_cache_async(path, data, info.tablet_id,
info.expiration_time);
// Update index
PackedSliceLocation location;
diff --git a/be/src/io/fs/packed_file_manager.h
b/be/src/io/fs/packed_file_manager.h
index bfa83a240e0..8a9758bf314 100644
--- a/be/src/io/fs/packed_file_manager.h
+++ b/be/src/io/fs/packed_file_manager.h
@@ -58,6 +58,7 @@ struct PackedAppendContext {
int64_t tablet_id = 0;
std::string rowset_id;
int64_t txn_id = 0;
+ uint64_t expiration_time = 0; // TTL expiration time in seconds since
epoch, 0 means no TTL
};
// Global object that manages packing small files into larger files for S3
optimization
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 5dac109204f..8ca69b37dd4 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -221,6 +221,7 @@ struct RowsetWriterContext {
append_info.tablet_id = tablet_id;
append_info.rowset_id = rowset_id.to_string();
append_info.txn_id = txn_id;
+ append_info.expiration_time = file_cache_ttl_sec;
fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
}
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
new file mode 100644
index 00000000000..f293741aa5f
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+suite('test_packed_file_warm_up_cluster_event', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ 'enable_packed_file=true',
+ 'disable_auto_compaction=true'
+ ]
+ options.cloudMode = true
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(10000)
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ if
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
?: false) {
+ url = url.replace("http://", "https://") + " --cert " +
context.config.otherConfigs.get("trustCert") + " --cacert " +
context.config.otherConfigs.get("trustCACert") + " --key " +
context.config.otherConfigs.get("trustCAKey")
+ }
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def logFileCacheDownloadMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted = getBrpcMetrics(ip, port,
"file_cache_download_submitted_num")
+ def finished = getBrpcMetrics(ip, port,
"file_cache_download_finished_num")
+ def failed = getBrpcMetrics(ip, port,
"file_cache_download_failed_num")
+ logger.info("${cluster} be ${ip}:${port}, downloader
submitted=${submitted}"
+ + ", finished=${finished}, failed=${failed}")
+ }
+ }
+
+ def logWarmUpRowsetMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def finished_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_segment_num")
+ def failed_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_segment_num")
+ def submitted_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_index_num")
+ def finished_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_index_num")
+ def failed_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_index_num")
+ logger.info("${cluster} be ${ip}:${port},
submitted_segment=${submitted_segment}"
+ + ", finished_segment=${finished_segment},
failed_segment=${failed_segment}"
+ + ", submitted_index=${submitted_index}"
+ + ", finished_index=${finished_index}"
+ + ", failed_index=${failed_index}")
+ }
+ }
+
+ def getTTLCacheSize = { ip, port ->
+ return getBrpcMetrics(ip, port, "ttl_cache_size")
+ }
+
+ def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+ def backends = sql """SHOW BACKENDS"""
+
+ def srcBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+ def tgtBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+ long srcSum = 0
+ for (src in srcBes) {
+ def ip = src[1]
+ def port = src[5]
+ srcSum += getTTLCacheSize(ip, port)
+ }
+
+ long tgtSum = 0
+ for (tgt in tgtBes) {
+ def ip = tgt[1]
+ def port = tgt[5]
+ tgtSum += getTTLCacheSize(ip, port)
+ }
+
+ logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+ assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+ assertEquals(srcSum, tgtSum)
+ }
+
+ docker(options) {
+ def clusterName1 = "warmup_source"
+ def clusterName2 = "warmup_target"
+
+ // Add two clusters
+ cluster.addBackend(1, clusterName1)
+ cluster.addBackend(1, clusterName2)
+
+ def tag1 = getCloudBeTagByName(clusterName1)
+ def tag2 = getCloudBeTagByName(clusterName2)
+
+ logger.info("Cluster tag1: {}", tag1)
+ logger.info("Cluster tag2: {}", tag2)
+
+ def jsonSlurper = new JsonSlurper()
+ def clusterId1 = jsonSlurper.parseText(tag1).compute_group_id
+ def clusterId2 = jsonSlurper.parseText(tag2).compute_group_id
+
+ def getJobState = { jobId ->
+ def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ return jobStateResult[0][3]
+ }
+
+ // Ensure we are in source cluster
+ sql """use @${clusterName1}"""
+
+ // Simple setup to simulate data load and access
+ sql """CREATE TABLE IF NOT EXISTS customer (id INT, name STRING)
DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES
("file_cache_ttl_seconds" = "3600")"""
+
+ // Start warm up job
+ def jobId_ = sql """
+ WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+ PROPERTIES (
+ "sync_mode" = "event_driven",
+ "sync_event" = "load"
+ )
+ """
+
+ def jobId = jobId_[0][0]
+ logger.info("Warm-up job ID: ${jobId}")
+ clearFileCacheOnAllBackends()
+
+ sleep(15000)
+
+ for (int i = 0; i < 100; i++) {
+ sql """INSERT INTO customer VALUES (1, 'A'), (2, 'B'), (3, 'C')"""
+ }
+ sleep(15000)
+ logWarmUpRowsetMetrics(clusterName2)
+ logFileCacheDownloadMetrics(clusterName2)
+ checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(jobInfo[0][0], jobId)
+ assertEquals(jobInfo[0][1], clusterName1)
+ assertEquals(jobInfo[0][2], clusterName2)
+ assertEquals(jobInfo[0][4], "CLUSTER")
+ assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+ "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+ assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+ // Cancel job and confirm
+ sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+ def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(cancelInfo[0][3], "CANCELLED")
+
+ // Clean up
+ sql """DROP TABLE IF EXISTS customer"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]