This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c9341741076 [fix](cloud) fix packed file warmup cannot read small 
files (#60160)
c9341741076 is described below

commit c9341741076cb0eb37a4fc53e8aba3a3f5f71603
Author: Xin Liao <[email protected]>
AuthorDate: Sat Jan 24 22:01:43 2026 +0800

    [fix](cloud) fix packed file warmup cannot read small files (#60160)
    
    1. Use rs_meta.fs() instead of storage_resource.value()->fs in warm up
    functions to support packed files. PackedFileSystem wrapper in
    rs_meta.fs() handles the index_map lookup and reads from the correct
    packed file path with proper offset.
    
    Without this fix, warm up would try to directly open the segment/index
    path which does not exist on S3 because the data is actually stored in a
    packed file.
    
       Fixed locations:
    - CloudWarmUpManager::handle_jobs(): segment and inverted index download
    - CloudInternalServiceImpl::warm_up_rowset(): segment and inverted index
    download
    
    2. Fix packed_file_manager to use correct TTL cache type when writing
    small files to file cache. Previously it always used
    FileCacheType::NORMAL, causing ttl_cache_size mismatch between source
    and target clusters during warm up.
    
    Now expiration_time is passed through PackedAppendContext and used to
    set the correct cache type (TTL when expiration_time > 0, NORMAL
    otherwise).
---
 be/src/cloud/cloud_internal_service.cpp            |  10 +-
 be/src/cloud/cloud_warm_up_manager.cpp             |  16 +-
 be/src/io/fs/packed_file_manager.cpp               |  28 +--
 be/src/io/fs/packed_file_manager.h                 |   1 +
 be/src/olap/rowset/rowset_writer_context.h         |   1 +
 .../test_packed_file_warm_up_cluster_event.groovy  | 205 +++++++++++++++++++++
 6 files changed, 238 insertions(+), 23 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index ca696409e5f..67595a3f798 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -516,12 +516,15 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                     }
                 };
 
+                // Use rs_meta.fs() instead of storage_resource.value()->fs to 
support packed files.
+                // PackedFileSystem wrapper in rs_meta.fs() handles the 
index_map lookup and
+                // reads from the correct packed file.
                 io::DownloadFileMeta download_meta {
                         .path = 
storage_resource.value()->remote_segment_path(rs_meta, segment_id),
                         .file_size = segment_size,
                         .offset = 0,
                         .download_size = segment_size,
-                        .file_system = storage_resource.value()->fs,
+                        .file_system = rs_meta.fs(),
                         .ctx = {.is_index_data = false,
                                 .expiration_time = expiration_time,
                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
@@ -537,8 +540,9 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
 
                 
_engine.file_cache_block_downloader().submit_download_task(download_meta);
             }
+
+            // Use rs_meta.fs() to support packed files for inverted index 
download.
             auto download_inverted_index = [&, tablet](std::string index_path, 
uint64_t idx_size) {
-                auto storage_resource = rs_meta.remote_storage_resource();
                 auto download_done = [=, version = rs_meta.version()](Status 
st) {
                     DBUG_EXECUTE_IF(
                             
"CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
@@ -594,7 +598,7 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                 io::DownloadFileMeta download_meta {
                         .path = io::Path(index_path),
                         .file_size = static_cast<int64_t>(idx_size),
-                        .file_system = storage_resource.value()->fs,
+                        .file_system = rs_meta.fs(),
                         .ctx = {.is_index_data = false, // DORIS-20877
                                 .expiration_time = expiration_time,
                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index dfdef1b3091..40b4734c8ba 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -249,12 +249,14 @@ void CloudWarmUpManager::handle_jobs() {
                 }
                 for (int64_t seg_id = 0; seg_id < rs->num_segments(); 
seg_id++) {
                     // 1st. download segment files
+                    // Use rs->fs() instead of storage_resource.value()->fs to 
support packed
+                    // files. PackedFileSystem wrapper in RowsetMeta::fs() 
handles the index_map
+                    // lookup and reads from the correct packed file.
                     if (!config::file_cache_enable_only_warm_up_idx) {
                         submit_download_tasks(
                                 
storage_resource.value()->remote_segment_path(*rs, seg_id),
-                                rs->segment_file_size(cast_set<int>(seg_id)),
-                                storage_resource.value()->fs, expiration_time, 
wait, false,
-                                [tablet, rs, seg_id](Status st) {
+                                rs->segment_file_size(cast_set<int>(seg_id)), 
rs->fs(),
+                                expiration_time, wait, false, [tablet, rs, 
seg_id](Status st) {
                                     VLOG_DEBUG << "warmup rowset " << 
rs->version() << " segment "
                                                << seg_id << " completed";
                                     if (tablet->complete_rowset_segment_warmup(
@@ -299,8 +301,8 @@ void CloudWarmUpManager::handle_jobs() {
                             
tablet->update_rowset_warmup_state_inverted_idx_num(
                                     WarmUpTriggerSource::JOB, rs->rowset_id(), 
1);
                             submit_download_tasks(
-                                    idx_path, file_size, 
storage_resource.value()->fs,
-                                    expiration_time, wait, true, [=](Status 
st) {
+                                    idx_path, file_size, rs->fs(), 
expiration_time, wait, true,
+                                    [=](Status st) {
                                         VLOG_DEBUG << "warmup rowset " << 
rs->version()
                                                    << " segment " << seg_id
                                                    << "inverted idx:" << 
idx_path << " completed";
@@ -322,8 +324,8 @@ void CloudWarmUpManager::handle_jobs() {
                             
tablet->update_rowset_warmup_state_inverted_idx_num(
                                     WarmUpTriggerSource::JOB, rs->rowset_id(), 
1);
                             submit_download_tasks(
-                                    idx_path, file_size, 
storage_resource.value()->fs,
-                                    expiration_time, wait, true, [=](Status 
st) {
+                                    idx_path, file_size, rs->fs(), 
expiration_time, wait, true,
+                                    [=](Status st) {
                                         VLOG_DEBUG << "warmup rowset " << 
rs->version()
                                                    << " segment " << seg_id
                                                    << "inverted idx:" << 
idx_path << " completed";
diff --git a/be/src/io/fs/packed_file_manager.cpp 
b/be/src/io/fs/packed_file_manager.cpp
index 4651fc08305..eaf8afe96c0 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -121,7 +121,7 @@ Status append_packed_info_trailer(FileWriter* writer, const 
std::string& packed_
 
 // write small file data to file cache
 void do_write_to_file_cache(const std::string& small_file_path, const 
std::string& data,
-                            int64_t tablet_id) {
+                            int64_t tablet_id, uint64_t expiration_time) {
     if (data.empty()) {
         return;
     }
@@ -132,7 +132,8 @@ void do_write_to_file_cache(const std::string& 
small_file_path, const std::strin
 
     VLOG_DEBUG << "packed_file_cache_write: path=" << small_file_path
                << " filename=" << path.filename().native() << " hash=" << 
cache_hash.to_string()
-               << " size=" << data.size() << " tablet_id=" << tablet_id;
+               << " size=" << data.size() << " tablet_id=" << tablet_id
+               << " expiration_time=" << expiration_time;
 
     BlockFileCache* file_cache = 
FileCacheFactory::instance()->get_by_path(cache_hash);
     if (file_cache == nullptr) {
@@ -141,7 +142,8 @@ void do_write_to_file_cache(const std::string& 
small_file_path, const std::strin
 
     // Allocate cache blocks
     CacheContext ctx;
-    ctx.cache_type = FileCacheType::NORMAL;
+    ctx.cache_type = expiration_time > 0 ? FileCacheType::TTL : 
FileCacheType::NORMAL;
+    ctx.expiration_time = expiration_time;
     ctx.tablet_id = tablet_id;
     ReadStatistics stats;
     ctx.stats = &stats;
@@ -179,7 +181,7 @@ void do_write_to_file_cache(const std::string& 
small_file_path, const std::strin
 // the async task execution. The original Slice may reference a buffer that 
gets
 // reused or freed before the async task runs.
 void write_small_file_to_cache_async(const std::string& small_file_path, const 
Slice& data,
-                                     int64_t tablet_id) {
+                                     int64_t tablet_id, uint64_t 
expiration_time) {
     if (!config::enable_file_cache || data.size == 0) {
         return;
     }
@@ -192,7 +194,7 @@ void write_small_file_to_cache_async(const std::string& 
small_file_path, const S
     auto* thread_pool = ExecEnv::GetInstance()->s3_file_upload_thread_pool();
     if (thread_pool == nullptr) {
         // Fallback to sync write if thread pool not available
-        do_write_to_file_cache(small_file_path, data_copy, tablet_id);
+        do_write_to_file_cache(small_file_path, data_copy, tablet_id, 
expiration_time);
         return;
     }
 
@@ -200,13 +202,13 @@ void write_small_file_to_cache_async(const std::string& 
small_file_path, const S
     g_packed_file_cache_async_write_count << 1;
     g_packed_file_cache_async_write_bytes << static_cast<int64_t>(data_size);
 
-    Status st = thread_pool->submit_func(
-            [path = small_file_path, data = std::move(data_copy), tablet_id, 
data_size]() {
-                do_write_to_file_cache(path, data, tablet_id);
-                // Decrement async write count after completion
-                g_packed_file_cache_async_write_count << -1;
-                g_packed_file_cache_async_write_bytes << 
-static_cast<int64_t>(data_size);
-            });
+    Status st = thread_pool->submit_func([path = small_file_path, data = 
std::move(data_copy),
+                                          tablet_id, data_size, 
expiration_time]() {
+        do_write_to_file_cache(path, data, tablet_id, expiration_time);
+        // Decrement async write count after completion
+        g_packed_file_cache_async_write_count << -1;
+        g_packed_file_cache_async_write_bytes << 
-static_cast<int64_t>(data_size);
+    });
 
     if (!st.ok()) {
         // Revert metrics since task was not submitted
@@ -369,7 +371,7 @@ Status PackedFileManager::append_small_file(const 
std::string& path, const Slice
     // Async write data to file cache using small file path as cache key.
     // This ensures cache key matches the cleanup key in Rowset::clear_cache(),
     // allowing proper cache cleanup when stale rowsets are removed.
-    write_small_file_to_cache_async(path, data, info.tablet_id);
+    write_small_file_to_cache_async(path, data, info.tablet_id, 
info.expiration_time);
 
     // Update index
     PackedSliceLocation location;
diff --git a/be/src/io/fs/packed_file_manager.h 
b/be/src/io/fs/packed_file_manager.h
index bfa83a240e0..8a9758bf314 100644
--- a/be/src/io/fs/packed_file_manager.h
+++ b/be/src/io/fs/packed_file_manager.h
@@ -58,6 +58,7 @@ struct PackedAppendContext {
     int64_t tablet_id = 0;
     std::string rowset_id;
     int64_t txn_id = 0;
+    uint64_t expiration_time = 0; // TTL expiration time in seconds since 
epoch, 0 means no TTL
 };
 
 // Global object that manages packing small files into larger files for S3 
optimization
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index 5dac109204f..8ca69b37dd4 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -221,6 +221,7 @@ struct RowsetWriterContext {
             append_info.tablet_id = tablet_id;
             append_info.rowset_id = rowset_id.to_string();
             append_info.txn_id = txn_id;
+            append_info.expiration_time = file_cache_ttl_sec;
             fs = std::make_shared<io::PackedFileSystem>(fs, append_info);
         }
 
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
new file mode 100644
index 00000000000..f293741aa5f
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_packed_file_warm_up_cluster_event.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+suite('test_packed_file_warm_up_cluster_event', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'file_cache_background_monitor_interval_ms=1000',
+        'enable_packed_file=true',
+        'disable_auto_compaction=true'
+    ]
+    options.cloudMode = true
+
+    def clearFileCache = {ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+
+        // Check the status
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+
+        // clear file cache is async, wait it done
+        sleep(10000)
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        if 
((context.config.otherConfigs.get("enableTLS")?.toString()?.equalsIgnoreCase("true"))
 ?: false) {
+            url = url.replace("http://";, "https://";) + " --cert " + 
context.config.otherConfigs.get("trustCert") + " --cacert " + 
context.config.otherConfigs.get("trustCACert") + " --key " + 
context.config.otherConfigs.get("trustCAKey")
+        }
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def logFileCacheDownloadMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted = getBrpcMetrics(ip, port, 
"file_cache_download_submitted_num")
+            def finished = getBrpcMetrics(ip, port, 
"file_cache_download_finished_num")
+            def failed = getBrpcMetrics(ip, port, 
"file_cache_download_failed_num")
+            logger.info("${cluster} be ${ip}:${port}, downloader 
submitted=${submitted}"
+                    + ", finished=${finished}, failed=${failed}")
+        }
+    }
+
+    def logWarmUpRowsetMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_segment_num")
+            def finished_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_segment_num")
+            def failed_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_segment_num")
+            def submitted_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_index_num")
+            def finished_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_index_num")
+            def failed_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_index_num")
+            logger.info("${cluster} be ${ip}:${port}, 
submitted_segment=${submitted_segment}"
+                    + ", finished_segment=${finished_segment}, 
failed_segment=${failed_segment}"
+                    + ", submitted_index=${submitted_index}"
+                    + ", finished_index=${finished_index}"
+                    + ", failed_index=${failed_index}")
+        }
+    }
+
+    def getTTLCacheSize = { ip, port ->
+        return getBrpcMetrics(ip, port, "ttl_cache_size")
+    }
+
+    def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+        def backends = sql """SHOW BACKENDS"""
+
+        def srcBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+        def tgtBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+        long srcSum = 0
+        for (src in srcBes) {
+            def ip = src[1]
+            def port = src[5]
+            srcSum += getTTLCacheSize(ip, port)
+        }
+
+        long tgtSum = 0
+        for (tgt in tgtBes) {
+            def ip = tgt[1]
+            def port = tgt[5]
+            tgtSum += getTTLCacheSize(ip, port)
+        }
+
+        logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+        assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+        assertEquals(srcSum, tgtSum)
+    }
+
+    docker(options) {
+        def clusterName1 = "warmup_source"
+        def clusterName2 = "warmup_target"
+
+        // Add two clusters
+        cluster.addBackend(1, clusterName1)
+        cluster.addBackend(1, clusterName2)
+
+        def tag1 = getCloudBeTagByName(clusterName1)
+        def tag2 = getCloudBeTagByName(clusterName2)
+
+        logger.info("Cluster tag1: {}", tag1)
+        logger.info("Cluster tag2: {}", tag2)
+
+        def jsonSlurper = new JsonSlurper()
+        def clusterId1 = jsonSlurper.parseText(tag1).compute_group_id
+        def clusterId2 = jsonSlurper.parseText(tag2).compute_group_id
+
+        def getJobState = { jobId ->
+            def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+            return jobStateResult[0][3]
+        }
+
+        // Ensure we are in source cluster
+        sql """use @${clusterName1}"""
+
+        // Simple setup to simulate data load and access
+        sql """CREATE TABLE IF NOT EXISTS customer (id INT, name STRING) 
DUPLICATE KEY(id) DISTRIBUTED BY HASH(id) BUCKETS 3 PROPERTIES 
("file_cache_ttl_seconds" = "3600")"""
+
+        // Start warm up job
+        def jobId_ = sql """
+            WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+            PROPERTIES (
+                "sync_mode" = "event_driven",
+                "sync_event" = "load"
+            )
+        """
+
+        def jobId = jobId_[0][0]
+        logger.info("Warm-up job ID: ${jobId}")
+        clearFileCacheOnAllBackends()
+
+        sleep(15000)
+
+        for (int i = 0; i < 100; i++) {
+            sql """INSERT INTO customer VALUES (1, 'A'), (2, 'B'), (3, 'C')"""
+        }
+        sleep(15000)
+        logWarmUpRowsetMetrics(clusterName2)
+        logFileCacheDownloadMetrics(clusterName2)
+        checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+        def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(jobInfo[0][0], jobId)
+        assertEquals(jobInfo[0][1], clusterName1)
+        assertEquals(jobInfo[0][2], clusterName2)
+        assertEquals(jobInfo[0][4], "CLUSTER")
+        assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+            "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+        assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+        // Cancel job and confirm
+        sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+        def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(cancelInfo[0][3], "CANCELLED")
+
+        // Clean up
+        sql """DROP TABLE IF EXISTS customer"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to