This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 290dc2a05ed [opt](warm up) support delaying compaction commit until 
rowset warm-up finishes (#54416)
290dc2a05ed is described below

commit 290dc2a05ed5b28ce886036a481b72838da3a054
Author: zhannngchen <[email protected]>
AuthorDate: Thu Aug 7 20:08:44 2025 +0800

    [opt](warm up) support delaying compaction commit until rowset warm-up 
finishes (#54416)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    This PR introduces an optimization to the event driven warm-up mechanism
    by implementing delayed compaction commit functionality. The change
    ensures that compaction operations wait for rowset warm-up processes to
    complete before finalizing the commit, improving file-cache hit ratio on
    read cluster.
---
 be/src/cloud/cloud_internal_service.cpp            |  38 ++-
 be/src/cloud/cloud_meta_mgr.cpp                    |  19 +-
 be/src/cloud/cloud_warm_up_manager.cpp             |  29 +-
 be/src/cloud/cloud_warm_up_manager.h               |  10 +-
 be/src/cloud/config.cpp                            |   6 +
 be/src/cloud/config.h                              |  11 +
 be/src/io/fs/s3_file_reader.cpp                    |   9 +
 gensrc/proto/internal_service.proto                |   1 +
 ...rm_up_cluster_event_compaction_sync_wait.groovy | 375 ++++++++++++++++++++
 ...uster_event_compaction_sync_wait_timeout.groovy | 377 +++++++++++++++++++++
 10 files changed, 869 insertions(+), 6 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 7beae0b825e..e35cfd0e01f 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -17,6 +17,8 @@
 
 #include "cloud/cloud_internal_service.h"
 
+#include <bthread/countdown_event.h>
+
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "cloud/config.h"
@@ -151,6 +153,10 @@ bvar::Adder<uint64_t> 
g_file_cache_warm_up_rowset_request_to_handle_slow_count(
         "file_cache_warm_up_rowset_request_to_handle_slow_count");
 bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_handle_to_finish_slow_count(
         "file_cache_warm_up_rowset_handle_to_finish_slow_count");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
+        "file_cache_warm_up_rowset_wait_for_compaction_num");
+bvar::Adder<uint64_t> 
g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
+        "file_cache_warm_up_rowset_wait_for_compaction_timeout_num");
 
 void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* 
controller
                                               [[maybe_unused]],
@@ -158,6 +164,15 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                                               PWarmUpRowsetResponse* response,
                                               google::protobuf::Closure* done) 
{
     brpc::ClosureGuard closure_guard(done);
+    std::shared_ptr<bthread::CountdownEvent> wait = nullptr;
+    timespec due_time;
+    if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms() 
> 0) {
+        g_file_cache_warm_up_rowset_wait_for_compaction_num << 1;
+        wait = std::make_shared<bthread::CountdownEvent>(0);
+        VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms() 
<< " ms";
+        due_time = 
butil::milliseconds_from_now(request->sync_wait_timeout_ms());
+    }
+
     for (auto& rs_meta_pb : request->rowset_metas()) {
         RowsetMeta rs_meta;
         rs_meta.init_from_pb(rs_meta_pb);
@@ -196,9 +211,10 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
         }
 
         for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); 
segment_id++) {
-            auto download_done = [=, tablet_id = rs_meta.tablet_id(),
+            auto download_done = [&, tablet_id = rs_meta.tablet_id(),
                                   rowset_id = rs_meta.rowset_id().to_string(),
-                                  segment_size = 
rs_meta.segment_file_size(segment_id)](Status st) {
+                                  segment_size = 
rs_meta.segment_file_size(segment_id),
+                                  wait](Status st) {
                 if (st.ok()) {
                     g_file_cache_event_driven_warm_up_finished_segment_num << 
1;
                     g_file_cache_event_driven_warm_up_finished_segment_size << 
segment_size;
@@ -227,6 +243,9 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                     LOG(WARNING) << "download segment failed, tablet_id: " << 
tablet_id
                                  << " rowset_id: " << rowset_id << ", error: " 
<< st;
                 }
+                if (wait) {
+                    wait->signal();
+                }
             };
 
             io::DownloadFileMeta download_meta {
@@ -247,6 +266,9 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
             g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
             g_file_cache_event_driven_warm_up_submitted_segment_size
                     << rs_meta.segment_file_size(segment_id);
+            if (wait) {
+                wait->add_count();
+            }
             
_engine.file_cache_block_downloader().submit_download_task(download_meta);
 
             auto download_inverted_index = [&](std::string index_path, 
uint64_t idx_size) {
@@ -285,6 +307,9 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                         LOG(WARNING) << "download inverted index failed, 
tablet_id: " << tablet_id
                                      << " rowset_id: " << rowset_id << ", 
error: " << st;
                     }
+                    if (wait) {
+                        wait->signal();
+                    }
                 };
                 io::DownloadFileMeta download_meta {
                         .path = io::Path(index_path),
@@ -301,6 +326,10 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                 };
                 g_file_cache_event_driven_warm_up_submitted_index_num << 1;
                 g_file_cache_event_driven_warm_up_submitted_index_size << 
idx_size;
+
+                if (wait) {
+                    wait->add_count();
+                }
                 
_engine.file_cache_block_downloader().submit_download_task(download_meta);
             };
 
@@ -341,6 +370,11 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
             }
         }
     }
+    if (wait && wait->timed_wait(due_time)) {
+        g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1;
+        LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms",
+                    request->rowset_metas().size(), 
request->sync_wait_timeout_ms());
+    }
 }
 
 bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_segment_num(
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 6e33a711d2a..f1e1d1d6fa2 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1083,8 +1083,25 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, 
const std::string& job_i
         RETURN_IF_ERROR(
                 
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
     }
+
+    int64_t timeout_ms = -1;
+    // if the `job_id` is not empty, it means this rowset was produced by a 
compaction job.
+    if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) 
{
+        // 1. assume the download speed is 100MB/s
+        // 2. we double the download time as timeout for safety
+        // 3. for small rowsets, the timeout we calculate maybe quite small, 
so we need a min_time_out
+        const double speed_mbps = 100.0; // 100MB/s
+        const double safety_factor = 2.0;
+        timeout_ms = std::min(
+                
std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) /
+                                              (speed_mbps * 1024 * 1024) * 
safety_factor * 1000),
+                         config::warm_up_rowset_sync_wait_min_timeout_ms),
+                config::warm_up_rowset_sync_wait_max_timeout_ms);
+        LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " 
<< job_id
+                  << ", with timeout: " << timeout_ms << " ms";
+    }
     auto& manager = 
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
-    manager.warm_up_rowset(rs_meta);
+    manager.warm_up_rowset(rs_meta, timeout_ms);
     return st;
 }
 
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 6d4136b53a1..5e5376a2da8 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -76,6 +76,9 @@ bvar::Status<int64_t> 
g_file_cache_warm_up_rowset_last_call_unix_ts(
         "file_cache_warm_up_rowset_last_call_unix_ts", 0);
 bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", 
"failed_task_num");
 
+bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
+        "file_cache_warm_up_rowset_wait_for_compaction_latency");
+
 CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : 
_engine(engine) {
     _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
 }
@@ -497,8 +500,9 @@ std::vector<TReplicaInfo> 
CloudWarmUpManager::get_replica_info(int64_t tablet_id
     return replicas;
 }
 
-void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
-    auto replicas = get_replica_info(rs_meta.tablet_id());
+void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t 
sync_wait_timeout_ms) {
+    auto tablet_id = rs_meta.tablet_id();
+    auto replicas = get_replica_info(tablet_id);
     if (replicas.empty()) {
         LOG(INFO) << "There is no need to warmup tablet=" << 
rs_meta.tablet_id()
                   << ", skipping rowset=" << rs_meta.rowset_id().to_string();
@@ -512,6 +516,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta) {
     PWarmUpRowsetRequest request;
     request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
     request.set_unix_ts_us(now_ts);
+    request.set_sync_wait_timeout_ms(sync_wait_timeout_ms);
     for (auto& replica : replicas) {
         // send sync request
         std::string host = replica.host;
@@ -576,8 +581,28 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta& 
rs_meta) {
         }
 
         brpc::Controller cntl;
+        if (sync_wait_timeout_ms > 0) {
+            cntl.set_timeout_ms(sync_wait_timeout_ms + 1000);
+        }
         PWarmUpRowsetResponse response;
+        MonotonicStopWatch watch;
+        watch.start();
         brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
+        if (cntl.Failed()) {
+            LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error: 
{}",
+                        rs_meta.rowset_id().to_string(), tablet_id, 
cntl.ErrorText());
+            return;
+        }
+        if (sync_wait_timeout_ms > 0) {
+            auto cost_us = watch.elapsed_time_microseconds();
+            VLOG_DEBUG << "warm up rowset wait for compaction: " << cost_us << 
" us";
+            if (cost_us / 1000 > sync_wait_timeout_ms) {
+                LOG_WARNING(
+                        "Warm up rowset {} for tabelt {} wait for compaction 
timeout, takes {} ms",
+                        rs_meta.rowset_id().to_string(), tablet_id, cost_us / 
1000);
+            }
+            g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us;
+        }
     }
 }
 
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 7418e47011c..465dbcf33db 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -71,7 +71,15 @@ public:
 
     Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear 
= false);
 
-    void warm_up_rowset(RowsetMeta& rs_meta);
+    // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
+    // and return immediately without waiting for the warm-up to complete.
+    // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
+    // to finish or until the specified timeout (in milliseconds) is reached.
+    //
+    // @param rs_meta Metadata of the rowset to be warmed up.
+    // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the 
warm-up
+    //                              to complete. Non-positive value means no 
waiting.
+    void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = 
-1);
 
     void recycle_cache(int64_t tablet_id, const std::vector<RowsetId>& 
rowset_ids,
                        const std::vector<int64_t>& num_segments,
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 5d8d1aadbeb..ab35268afc4 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -95,5 +95,11 @@ DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec, 
"600");
 
 DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000");
 
+DEFINE_mBool(enable_compaction_delay_commit_for_warm_up, "false");
+
+DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms, "10000");
+
+DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000");
+
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index d54f9f6948d..535bf3146a1 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -131,5 +131,16 @@ DECLARE_mInt64(warmup_tablet_replica_info_cache_ttl_sec);
 
 DECLARE_mInt64(warm_up_rowset_slow_log_ms);
 
+// When event driven warm-up is enabled by the user, turning on this option 
can help
+// avoid file cache misses in the read cluster caused by compaction.
+// If enabled, compaction will wait for the warm-up to complete before 
committing.
+//
+// ATTN: Enabling this option may slow down compaction due to the added wait.
+DECLARE_mBool(enable_compaction_delay_commit_for_warm_up);
+
+DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms);
+
+DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms);
+
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 80c58235300..eede868468f 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -38,6 +38,7 @@
 #include "runtime/thread_context.h"
 #include "runtime/workload_management/io_throttle.h"
 #include "util/bvar_helper.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
 #include "util/s3_util.h"
@@ -130,6 +131,14 @@ Status S3FileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_rea
 
     LIMIT_REMOTE_SCAN_IO(bytes_read);
 
+    DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
+        auto sleep_time = dp->param("sleep", 3);
+        LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s", 
sleep_time)
+                .tag("bucket", _bucket)
+                .tag("key", _key);
+        std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+    });
+
     int total_sleep_time = 0;
     while (retry_count <= max_retries) {
         *bytes_read = 0;
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 8cd989683c2..fa7084e3f0b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -863,6 +863,7 @@ message PGetFileCacheMetaResponse {
 message PWarmUpRowsetRequest {
     repeated RowsetMetaPB rowset_metas = 1;
     optional int64 unix_ts_us = 2;
+    optional int64 sync_wait_timeout_ms = 3;
 }
 
 message PWarmUpRowsetResponse {
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
new file mode 100644
index 00000000000..4879f83aab8
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_warm_up_cluster_event_compaction_sync_wait', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'file_cache_background_monitor_interval_ms=1000',
+        'warm_up_rowset_slow_log_ms=1',
+        'enable_compaction_delay_commit_for_warm_up=true',
+        'warm_up_rowset_sync_wait_min_timeout_ms=20000',
+        'warm_up_rowset_sync_wait_max_timeout_ms=30000',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    def clearFileCache = {ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+
+        // Check the status
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+
+        // clear file cache is async, wait it done
+        sleep(5000)
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def getBeIpAndPort = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+
+        if (cluster_bes.isEmpty()) {
+            throw new RuntimeException("No BE found for cluster: ${cluster}")
+        }
+
+        def firstBe = cluster_bes[0]
+        return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]]
+    }
+
+    def logFileCacheDownloadMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted = getBrpcMetrics(ip, port, 
"file_cache_download_submitted_num")
+            def finished = getBrpcMetrics(ip, port, 
"file_cache_download_finished_num")
+            def failed = getBrpcMetrics(ip, port, 
"file_cache_download_failed_num")
+            logger.info("${cluster} be ${ip}:${port}, downloader 
submitted=${submitted}"
+                    + ", finished=${finished}, failed=${failed}")
+        }
+    }
+
+    def logWarmUpRowsetMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_segment_num")
+            def finished_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_segment_num")
+            def failed_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_segment_num")
+            def submitted_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_index_num")
+            def finished_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_index_num")
+            def failed_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_index_num")
+            def compaction_sync_wait = getBrpcMetrics(ip, port, 
"file_cache_warm_up_rowset_wait_for_compaction_num")
+            logger.info("${cluster} be ${ip}:${port}, 
submitted_segment=${submitted_segment}"
+                    + ", finished_segment=${finished_segment}, 
failed_segment=${failed_segment}"
+                    + ", submitted_index=${submitted_index}"
+                    + ", finished_index=${finished_index}"
+                    + ", failed_index=${failed_index}"
+                    + ", compaction_sync_wait=${compaction_sync_wait}")
+        }
+    }
+
+    def getTTLCacheSize = { ip, port ->
+        return getBrpcMetrics(ip, port, "ttl_cache_size")
+    }
+
+    def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+        def backends = sql """SHOW BACKENDS"""
+
+        def srcBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+        def tgtBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+        long srcSum = 0
+        for (src in srcBes) {
+            def ip = src[1]
+            def port = src[5]
+            srcSum += getTTLCacheSize(ip, port)
+        }
+
+        long tgtSum = 0
+        for (tgt in tgtBes) {
+            def ip = tgt[1]
+            def port = tgt[5]
+            tgtSum += getTTLCacheSize(ip, port)
+        }
+
+        logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+        assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+        assertEquals(srcSum, tgtSum)
+    }
+
+    def waitForBrpcMetricValue = { ip, port, metricName, targetValue, 
timeoutMs ->
+        def delta_time = 100
+        def useTime = 0
+
+        for(int t = delta_time; t <= timeoutMs; t += delta_time){
+            try {
+                def currentValue = getBrpcMetrics(ip, port, metricName)
+
+                if (currentValue == targetValue) {
+                    logger.info("BE ${ip}:${port} metric ${metricName} reached 
target value: ${targetValue}")
+                    return true
+                }
+
+                logger.info("BE ${ip}:${port} metric ${metricName} current 
value: ${currentValue}, target: ${targetValue}")
+
+            } catch (Exception e) {
+                logger.warn("Failed to get metric ${metricName} from BE 
${ip}:${port}: ${e.message}")
+            }
+
+            useTime = t
+            sleep(delta_time)
+        }
+
+        assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout")
+    }
+
+    def getTabletStatus = { ip, port, tablet_id ->
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${ip}:${port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+        def tabletStatus = parseJson(out.trim())
+        return tabletStatus
+    }
+
+    def checkFileCacheRecycle = { cluster, rowsets ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        assert cluster_bes.size() > 0, "No backend found for cluster 
${cluster}"
+        def be = cluster_bes[0]
+        def ip = be[1]
+        def port = be[4]
+
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+
+            logger.info("rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            // in this case only [2-11] and [12-12] should have data in cache
+            if ((start_version == 2 && end_version == 11) || (start_version == 
12)) {
+                assertTrue(data.size() > 0)
+            } else {
+                assertTrue(data.size() == 0)
+            }
+        }
+    }
+
+    docker(options) {
+        def clusterName1 = "warmup_source"
+        def clusterName2 = "warmup_target"
+
+        // Add two clusters
+        cluster.addBackend(1, clusterName1)
+        cluster.addBackend(1, clusterName2)
+
+        def tag1 = getCloudBeTagByName(clusterName1)
+        def tag2 = getCloudBeTagByName(clusterName2)
+
+        logger.info("Cluster tag1: {}", tag1)
+        logger.info("Cluster tag2: {}", tag2)
+
+        def jsonSlurper = new JsonSlurper()
+
+        def getJobState = { jobId ->
+            def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+            return jobStateResult[0][3]
+        }
+
+        // Ensure we are in source cluster
+        sql """use @${clusterName1}"""
+
+        // Start warm up job
+        def jobId_ = sql """
+            WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+            PROPERTIES (
+                "sync_mode" = "event_driven",
+                "sync_event" = "load"
+            )
+        """
+
+        def jobId = jobId_[0][0]
+        logger.info("Warm-up job ID: ${jobId}")
+
+        sql """
+            create table test (
+                col0 int not null,
+                col1 variant NOT NULL
+            ) UNIQUE KEY(`col0`)
+            DISTRIBUTED BY HASH(col0) BUCKETS 1
+            PROPERTIES ("file_cache_ttl_seconds" = "3600", 
"disable_auto_compaction" = "true");
+        """
+
+        clearFileCacheOnAllBackends()
+        sleep(15000)
+
+        sql """use @${clusterName1}"""
+        // load data
+        sql """insert into test values (1, '{"a" : 1.0}')"""
+        sql """insert into test values (2, '{"a" : 111.1111}')"""
+        sql """insert into test values (3, '{"a" : "11111"}')"""
+        sql """insert into test values (4, '{"a" : 1111111111}')"""
+        sql """insert into test values (5, '{"a" : 1111.11111}')"""
+        sql """insert into test values (6, '{"a" : "11111"}')"""
+        sql """insert into test values (7, '{"a" : 11111.11111}')"""
+        sql """insert into test values (7, '{"a" : 11111.11111}')"""
+        sleep(15000)
+
+        def tablets = sql_return_maparray """ show tablets from test; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+
+        def be = getBeIpAndPort(clusterName2)
+
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        def num_submitted = getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_submitted_segment_num")
+        def num_finished = getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num")
+        assertTrue(num_submitted >= 8)
+        assertEquals(num_finished, num_submitted)
+
+        // inject slow io, which should cause the warmup takes longger than 10s
+        GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int, 
NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:10])
+
+        // trigger and wait compaction async
+        def future = thread {
+            sql """use @${clusterName1}"""
+            trigger_and_wait_compaction("test", "cumulative")
+        }
+        // wait until the warmup for compaction started
+        waitForBrpcMetricValue(be.ip, be.rpc_port, 
"file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000)
+        sleep(1000)
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_submitted_segment_num"))
+        assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num"))
+
+        // in this moment, compaction has completed, but not commited, it's 
waiting for warm up
+        // trigger a query on read cluster, can't read the compaction data
+        sql """use @${clusterName2}"""
+        sql "select * from test"
+        def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+        def rowsets = tablet_status ["rowsets"]
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            if (start_version != 0) {
+                assertEquals(start_version, end_version)
+            }
+        }
+
+        // wait the compaction complete
+        future.get()
+
+        assertEquals(num_finished + 1, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num"))
+        assertEquals(0, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num"))
+
+        // a new insert will trigger the sync rowset operation in the 
following query
+        sql """insert into test values (9, '{"a" : 11111.11111}')"""
+
+        // now the compaction rowsets it accessible
+        sql """use @${clusterName2}"""
+        sql "select * from test"
+        tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+        rowsets = tablet_status ["rowsets"]
+        def found_compaction_rowsets = false
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            if (start_version != 0) {
+                if (start_version != end_version) {
+                    found_compaction_rowsets = true;
+                }
+            }
+        }
+        assertTrue(found_compaction_rowsets)
+
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        // checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+        def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(jobInfo[0][0], jobId)
+        assertEquals(jobInfo[0][1], clusterName1)
+        assertEquals(jobInfo[0][2], clusterName2)
+        assertEquals(jobInfo[0][4], "CLUSTER")
+        assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+            "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+        assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+        // Cancel job and confirm
+        sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+        def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(cancelInfo[0][3], "CANCELLED")
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
new file mode 100644
index 00000000000..64b72dfcec5
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_warm_up_cluster_event_compaction_sync_wait_timeout', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'file_cache_background_monitor_interval_ms=1000',
+        'warm_up_rowset_slow_log_ms=1',
+        'enable_compaction_delay_commit_for_warm_up=true',
+        'warm_up_rowset_sync_wait_min_timeout_ms=10000',
+        'warm_up_rowset_sync_wait_max_timeout_ms=10000',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    def clearFileCache = {ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+
+        // Check the status
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+
+        // clear file cache is async, wait it done
+        sleep(5000)
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def getBeIpAndPort = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+
+        if (cluster_bes.isEmpty()) {
+            throw new RuntimeException("No BE found for cluster: ${cluster}")
+        }
+
+        def firstBe = cluster_bes[0]
+        return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]]
+    }
+
+    def logFileCacheDownloadMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted = getBrpcMetrics(ip, port, 
"file_cache_download_submitted_num")
+            def finished = getBrpcMetrics(ip, port, 
"file_cache_download_finished_num")
+            def failed = getBrpcMetrics(ip, port, 
"file_cache_download_failed_num")
+            logger.info("${cluster} be ${ip}:${port}, downloader 
submitted=${submitted}"
+                    + ", finished=${finished}, failed=${failed}")
+        }
+    }
+
+    def logWarmUpRowsetMetrics = { cluster ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[5]
+            def submitted_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_segment_num")
+            def finished_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_segment_num")
+            def failed_segment = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_segment_num")
+            def submitted_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_submitted_index_num")
+            def finished_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_finished_index_num")
+            def failed_index = getBrpcMetrics(ip, port, 
"file_cache_event_driven_warm_up_failed_index_num")
+            def compaction_sync_wait = getBrpcMetrics(ip, port, 
"file_cache_warm_up_rowset_wait_for_compaction_num")
+            logger.info("${cluster} be ${ip}:${port}, 
submitted_segment=${submitted_segment}"
+                    + ", finished_segment=${finished_segment}, 
failed_segment=${failed_segment}"
+                    + ", submitted_index=${submitted_index}"
+                    + ", finished_index=${finished_index}"
+                    + ", failed_index=${failed_index}"
+                    + ", compaction_sync_wait=${compaction_sync_wait}")
+        }
+    }
+
+    def getTTLCacheSize = { ip, port ->
+        return getBrpcMetrics(ip, port, "ttl_cache_size")
+    }
+
+    def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+        def backends = sql """SHOW BACKENDS"""
+
+        def srcBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+        def tgtBes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+        long srcSum = 0
+        for (src in srcBes) {
+            def ip = src[1]
+            def port = src[5]
+            srcSum += getTTLCacheSize(ip, port)
+        }
+
+        long tgtSum = 0
+        for (tgt in tgtBes) {
+            def ip = tgt[1]
+            def port = tgt[5]
+            tgtSum += getTTLCacheSize(ip, port)
+        }
+
+        logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+        assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+        assertEquals(srcSum, tgtSum)
+    }
+
+    def waitForBrpcMetricValue = { ip, port, metricName, targetValue, 
timeoutMs ->
+        def delta_time = 100
+        def useTime = 0
+
+        for(int t = delta_time; t <= timeoutMs; t += delta_time){
+            try {
+                def currentValue = getBrpcMetrics(ip, port, metricName)
+
+                if (currentValue == targetValue) {
+                    logger.info("BE ${ip}:${port} metric ${metricName} reached 
target value: ${targetValue}")
+                    return true
+                }
+
+                logger.info("BE ${ip}:${port} metric ${metricName} current 
value: ${currentValue}, target: ${targetValue}")
+
+            } catch (Exception e) {
+                logger.warn("Failed to get metric ${metricName} from BE 
${ip}:${port}: ${e.message}")
+            }
+
+            useTime = t
+            sleep(delta_time)
+        }
+
+        assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout")
+    }
+
+    def getTabletStatus = { ip, port, tablet_id ->
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${ip}:${port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+        def tabletStatus = parseJson(out.trim())
+        return tabletStatus
+    }
+
+    def checkFileCacheRecycle = { cluster, rowsets ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        assert cluster_bes.size() > 0, "No backend found for cluster 
${cluster}"
+        def be = cluster_bes[0]
+        def ip = be[1]
+        def port = be[4]
+
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            def rowset_id = rowsetStr.split(" ")[4]
+
+            logger.info("rowset ${i}, start: ${start_version}, end: 
${end_version}, id: ${rowset_id}")
+            def data = 
Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat";,
 true)
+            logger.info("file cache data: ${data}")
+            // in this case only [2-11] and [12-12] should have data in cache
+            if ((start_version == 2 && end_version == 11) || (start_version == 
12)) {
+                assertTrue(data.size() > 0)
+            } else {
+                assertTrue(data.size() == 0)
+            }
+        }
+    }
+
+    docker(options) {
+        def clusterName1 = "warmup_source"
+        def clusterName2 = "warmup_target"
+
+        // Add two clusters
+        cluster.addBackend(1, clusterName1)
+        cluster.addBackend(1, clusterName2)
+
+        def tag1 = getCloudBeTagByName(clusterName1)
+        def tag2 = getCloudBeTagByName(clusterName2)
+
+        logger.info("Cluster tag1: {}", tag1)
+        logger.info("Cluster tag2: {}", tag2)
+
+        def jsonSlurper = new JsonSlurper()
+
+        def getJobState = { jobId ->
+            def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+            return jobStateResult[0][3]
+        }
+
+        // Ensure we are in source cluster
+        sql """use @${clusterName1}"""
+
+        // Start warm up job
+        def jobId_ = sql """
+            WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+            PROPERTIES (
+                "sync_mode" = "event_driven",
+                "sync_event" = "load"
+            )
+        """
+
+        def jobId = jobId_[0][0]
+        logger.info("Warm-up job ID: ${jobId}")
+
+        sql """
+            create table test (
+                col0 int not null,
+                col1 variant NOT NULL
+            ) UNIQUE KEY(`col0`)
+            DISTRIBUTED BY HASH(col0) BUCKETS 1
+            PROPERTIES ("file_cache_ttl_seconds" = "3600", 
"disable_auto_compaction" = "true");
+        """
+
+        clearFileCacheOnAllBackends()
+        sleep(15000)
+
+        sql """use @${clusterName1}"""
+        // load data
+        sql """insert into test values (1, '{"a" : 1.0}')"""
+        sql """insert into test values (2, '{"a" : 111.1111}')"""
+        sql """insert into test values (3, '{"a" : "11111"}')"""
+        sql """insert into test values (4, '{"a" : 1111111111}')"""
+        sql """insert into test values (5, '{"a" : 1111.11111}')"""
+        sql """insert into test values (6, '{"a" : "11111"}')"""
+        sql """insert into test values (7, '{"a" : 11111.11111}')"""
+        sql """insert into test values (7, '{"a" : 11111.11111}')"""
+        sleep(15000)
+
+        def tablets = sql_return_maparray """ show tablets from test; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+
+        def be = getBeIpAndPort(clusterName2)
+
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        def num_submitted = getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_submitted_segment_num")
+        def num_finished = getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num")
+        assertTrue(num_submitted >= 8)
+        assertEquals(num_finished, num_submitted)
+
+        // inject slow io, which should cause the warmup takes longger than 10s
+        GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int, 
NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:20])
+
+        // trigger and wait compaction async
+        def future = thread {
+            sql """use @${clusterName1}"""
+            trigger_and_wait_compaction("test", "cumulative")
+        }
+        // wait until the warmup for compaction started
+        waitForBrpcMetricValue(be.ip, be.rpc_port, 
"file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000)
+        sleep(1000)
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_submitted_segment_num"))
+        assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num"))
+
+        // in this moment, compaction has completed, but not commited, it's 
waiting for warm up
+        // trigger a query on read cluster, can't read the compaction data
+        sql """use @${clusterName2}"""
+        sql "select * from test"
+        def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+        def rowsets = tablet_status ["rowsets"]
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            if (start_version != 0) {
+                assertEquals(start_version, end_version)
+            }
+        }
+
+        // wait the compaction complete
+        // we inject 20s sleep on s3 file read, so the compaction will be 
timeout
+        future.get()
+
+        // still not finished, so `num_finished` not change
+        assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_event_driven_warm_up_finished_segment_num"))
+        assertEquals(1, getBrpcMetrics(be.ip, be.rpc_port, 
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num"))
+
+        // a new insert will trigger the sync rowset operation in the 
following query
+        sql """insert into test values (9, '{"a" : 11111.11111}')"""
+
+        // now the compaction rowsets it accessible
+        sql """use @${clusterName2}"""
+        sql "select * from test"
+        tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+        rowsets = tablet_status ["rowsets"]
+        def found_compaction_rowsets = false
+        for (int i = 0; i < rowsets.size(); i++) {
+            def rowsetStr = rowsets[i]
+            // [12-12] 1 DATA NONOVERLAPPING 
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+            def start_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[0].toInteger()
+            def end_version = rowsetStr.split(" ")[0].replace('[', 
'').replace(']', '').split("-")[1].toInteger()
+            if (start_version != 0) {
+                if (start_version != end_version) {
+                    found_compaction_rowsets = true;
+                }
+            }
+        }
+        assertTrue(found_compaction_rowsets)
+
+        logFileCacheDownloadMetrics(clusterName2)
+        logWarmUpRowsetMetrics(clusterName2)
+        // checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+        def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(jobInfo[0][0], jobId)
+        assertEquals(jobInfo[0][1], clusterName1)
+        assertEquals(jobInfo[0][2], clusterName2)
+        assertEquals(jobInfo[0][4], "CLUSTER")
+        assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+            "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+        assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+        // Cancel job and confirm
+        sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+        def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+        assertEquals(cancelInfo[0][3], "CANCELLED")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to