This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b57dc3307 [feat](warm up) introduce immediate warmup on read cluster 
(#54611)
64b57dc3307 is described below

commit 64b57dc330749eb7fef4cd92d55bd2dfead91d25
Author: zhannngchen <[email protected]>
AuthorDate: Mon Aug 18 13:46:35 2025 +0800

    [feat](warm up) introduce immediate warmup on read cluster (#54611)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    introduce a config `enable_warmup_immediately_on_new_rowset`, if user
    set it to `true`
    warm up will be triggerd automatically and immediately on syncing new
    rowsets
    
    NOTE:
    the method `get_rowset_warmup_state()` is used for #53540
---
 be/src/cloud/cloud_internal_service.cpp            |  12 +
 be/src/cloud/cloud_meta_mgr.cpp                    |   3 +-
 be/src/cloud/cloud_tablet.cpp                      |  72 ++++-
 be/src/cloud/cloud_tablet.h                        |  11 +
 be/src/cloud/cloud_warm_up_manager.cpp             |  19 +-
 be/src/cloud/cloud_warm_up_manager.h               |  11 +-
 be/src/cloud/config.cpp                            |   2 +
 be/src/cloud/config.h                              |   2 +
 be/src/olap/base_tablet.h                          |   2 +-
 be/test/cloud/cloud_tablet_test.cpp                | 301 +++++++++++++++++++++
 .../cluster/test_immediate_warmup_basic.out        | Bin 0 -> 300 bytes
 .../test_schema_change_add_key_column.csv.gz       | Bin 0 -> 72233 bytes
 .../cluster/test_immediate_warmup_basic.groovy     | 154 +++++++++++
 .../test_immediate_warmup_multi_segments.groovy    | 232 ++++++++++++++++
 14 files changed, 813 insertions(+), 8 deletions(-)

diff --git a/be/src/cloud/cloud_internal_service.cpp 
b/be/src/cloud/cloud_internal_service.cpp
index 5641bc1ac54..68715c69560 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -21,6 +21,7 @@
 
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_mgr.h"
+#include "cloud/cloud_warm_up_manager.h"
 #include "cloud/config.h"
 #include "io/cache/block_file_cache.h"
 #include "io/cache/block_file_cache_downloader.h"
@@ -219,6 +220,12 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
             expiration_time = 0;
         }
 
+        if (!tablet->add_rowset_warmup_state(rs_meta, 
WarmUpState::TRIGGERED_BY_JOB)) {
+            LOG(INFO) << "found duplicate warmup task for rowset " << 
rs_meta.rowset_id()
+                      << ", skip it";
+            continue;
+        }
+
         for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); 
segment_id++) {
             auto download_done = [&, tablet_id = rs_meta.tablet_id(),
                                   rowset_id = rs_meta.rowset_id().to_string(),
@@ -252,6 +259,11 @@ void 
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
                     LOG(WARNING) << "download segment failed, tablet_id: " << 
tablet_id
                                  << " rowset_id: " << rowset_id << ", error: " 
<< st;
                 }
+                if 
(tablet->complete_rowset_segment_warmup(rs_meta.rowset_id(), st) ==
+                    WarmUpState::DONE) {
+                    VLOG_DEBUG << "warmup rowset " << rs_meta.version() << "(" 
<< rowset_id
+                               << ") completed";
+                }
                 if (wait) {
                     wait->signal();
                 }
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 9c78a7529f1..86de6080585 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -737,7 +737,8 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
                 bool version_overlap =
                         tablet->max_version_unlocked() >= 
rowsets.front()->start_version();
                 tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
-                                    options.warmup_delta_data);
+                                    options.warmup_delta_data ||
+                                            
config::enable_warmup_immediately_on_new_rowset);
             }
             tablet->last_base_compaction_success_time_ms = 
stats.last_base_compaction_time_ms();
             tablet->last_cumu_compaction_success_time_ms = 
stats.last_cumu_compaction_time_ms();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index afe77b28955..88a3a66c2ab 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -94,6 +94,17 @@ bvar::Adder<uint64_t> 
g_file_cache_recycle_cached_data_segment_size(
 bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
         "file_cache_recycle_cached_data_index_num");
 
+bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
+        "file_cache_warm_up_segment_complete_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
+        "file_cache_warm_up_segment_failed_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
+        "file_cache_warm_up_rowset_complete_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
+        "file_cache_warm_up_rowset_triggered_by_job_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
+        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
+
 CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
         : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
 
@@ -243,6 +254,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
         for (auto& rs : rowsets) {
             if (version_overlap || warmup_delta_data) {
 #ifndef BE_TEST
+                bool warm_up_state_updated = false;
                 // Warmup rowset data in background
                 for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
                     const auto& rowset_meta = rs->rowset_meta();
@@ -271,7 +283,19 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
                         g_file_cache_cloud_tablet_submitted_segment_size
                                 << 
rs->rowset_meta()->segment_file_size(seg_id);
                     }
+                    if (!warm_up_state_updated) {
+                        VLOG_DEBUG << "warm up rowset " << rs->version() << 
"(" << rs->rowset_id()
+                                   << ") triggerd by sync rowset";
+                        if (!add_rowset_warmup_state_unlocked(
+                                    *(rs->rowset_meta()), 
WarmUpState::TRIGGERED_BY_SYNC_ROWSET)) {
+                            LOG(INFO) << "found duplicate warmup task for 
rowset "
+                                      << rs->rowset_id() << ", skip it";
+                            break;
+                        }
+                        warm_up_state_updated = true;
+                    }
                     // clang-format off
+                    auto self = 
std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
                     
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
                             .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
                             .file_size = 
rs->rowset_meta()->segment_file_size(seg_id),
@@ -281,7 +305,8 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> 
to_add, bool version_
                                             .expiration_time = expiration_time,
                                             .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                                     },
-                            .download_done {[](Status st) {
+                            .download_done {[=](Status st) {
+                                
self->complete_rowset_segment_warmup(rowset_meta->rowset_id(), st);
                                 if (!st) {
                                     LOG_WARNING("add rowset warm up error 
").error(st);
                                 }
@@ -441,6 +466,7 @@ void CloudTablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete,
     _timestamped_version_tracker.add_stale_path_version(rs_metas);
     for (auto&& rs : to_delete) {
         _rs_version_map.erase(rs->version());
+        _rowset_warm_up_states.erase(rs->rowset_id());
     }
 
     _tablet_meta->modify_rs_metas({}, rs_metas, false);
@@ -1307,5 +1333,49 @@ Status CloudTablet::check_delete_bitmap_cache(int64_t 
txn_id,
     return Status::OK();
 }
 
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset.rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset.rowset_id()] = std::make_pair(state, 
rowset.num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    g_file_cache_warm_up_segment_complete_num << 1;
+    if (!status.ok()) {
+        g_file_cache_warm_up_segment_failed_num << 1;
+    }
+    _rowset_warm_up_states[rowset_id].second--;
+    if (_rowset_warm_up_states[rowset_id].second <= 0) {
+        g_file_cache_warm_up_rowset_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 69a086e9ba9..71b1055c119 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -25,6 +25,7 @@
 namespace doris {
 
 class CloudStorageEngine;
+enum class WarmUpState : int;
 
 struct SyncRowsetStats {
     int64_t get_remote_rowsets_num {0};
@@ -289,12 +290,19 @@ public:
     static std::vector<RecycledRowsets> recycle_cached_data(
             const std::vector<RowsetSharedPtr>& rowsets);
 
+    // Add warmup state management
+    WarmUpState get_rowset_warmup_state(RowsetId rowset_id);
+    bool add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpState state);
+    WarmUpState complete_rowset_segment_warmup(RowsetId rowset_id, Status 
status);
+
 private:
     // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
     void update_base_size(const Rowset& rs);
 
     Status sync_if_not_running(SyncRowsetStats* stats = nullptr);
 
+    bool add_rowset_warmup_state_unlocked(const RowsetMeta& rowset, 
WarmUpState state);
+
     CloudStorageEngine& _engine;
 
     // this mutex MUST ONLY be used when sync meta
@@ -350,6 +358,9 @@ private:
     std::mutex _gc_mutex;
     std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
     std::vector<std::pair<std::vector<RowsetId>, DeleteBitmapKeyRanges>> 
_unused_delete_bitmap;
+
+    // for warm up states management
+    std::unordered_map<RowsetId, std::pair<WarmUpState, int32_t>> 
_rowset_warm_up_states;
 };
 
 using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 2ed92c856b9..17906672cdf 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -108,7 +108,7 @@ void CloudWarmUpManager::submit_download_tasks(io::Path 
path, int64_t file_size,
                                                io::FileSystemSPtr file_system,
                                                int64_t expiration_time,
                                                
std::shared_ptr<bthread::CountdownEvent> wait,
-                                               bool is_index) {
+                                               bool is_index, 
std::function<void(Status)> done_cb) {
     if (file_size < 0) {
         auto st = file_system->file_size(path, &file_size);
         if (!st.ok()) [[unlikely]] {
@@ -145,7 +145,8 @@ void CloudWarmUpManager::submit_download_tasks(io::Path 
path, int64_t file_size,
                                 .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                         },
                 .download_done =
-                        [=](Status st) {
+                        [&](Status st) {
+                            if (done_cb) done_cb(st);
                             if (!st) {
                                 LOG_WARNING("Warm up error ").error(st);
                             } else if (is_index) {
@@ -225,12 +226,24 @@ void CloudWarmUpManager::handle_jobs() {
                     if (expiration_time <= UnixSeconds()) {
                         expiration_time = 0;
                     }
+                    if (!tablet->add_rowset_warmup_state(*rs, 
WarmUpState::TRIGGERED_BY_JOB)) {
+                        LOG(INFO) << "found duplicate warmup task for rowset " 
<< rs->rowset_id()
+                                  << ", skip it";
+                        continue;
+                    }
 
                     // 1st. download segment files
                     submit_download_tasks(
                             storage_resource.value()->remote_segment_path(*rs, 
seg_id),
                             rs->segment_file_size(seg_id), 
storage_resource.value()->fs,
-                            expiration_time, wait);
+                            expiration_time, wait, false, [tablet, rs, 
seg_id](Status st) {
+                                VLOG_DEBUG << "warmup rowset " << 
rs->version() << " segment "
+                                           << seg_id << " completed";
+                                if 
(tablet->complete_rowset_segment_warmup(rs->rowset_id(), st) ==
+                                    WarmUpState::DONE) {
+                                    VLOG_DEBUG << "warmup rowset " << 
rs->version() << " completed";
+                                }
+                            });
 
                     // 2nd. download inverted index files
                     int64_t file_size = -1;
diff --git a/be/src/cloud/cloud_warm_up_manager.h 
b/be/src/cloud/cloud_warm_up_manager.h
index 0d96dd12ede..f92280052ac 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -38,6 +38,13 @@ enum class DownloadType {
     S3,
 };
 
+enum class WarmUpState : int {
+    NONE,
+    TRIGGERED_BY_SYNC_ROWSET,
+    TRIGGERED_BY_JOB,
+    DONE,
+};
+
 struct JobMeta {
     JobMeta() = default;
     JobMeta(const TJobMeta& meta);
@@ -91,8 +98,8 @@ private:
 
     void submit_download_tasks(io::Path path, int64_t file_size, 
io::FileSystemSPtr file_system,
                                int64_t expiration_time,
-                               std::shared_ptr<bthread::CountdownEvent> wait,
-                               bool is_index = false);
+                               std::shared_ptr<bthread::CountdownEvent> wait, 
bool is_index = false,
+                               std::function<void(Status)> done_cb = nullptr);
     std::mutex _mtx;
     std::condition_variable _cond;
     int64_t _cur_job_id {0};
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index ab35268afc4..96e53d66b40 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -101,5 +101,7 @@ DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms, 
"10000");
 
 DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000");
 
+DEFINE_mBool(enable_warmup_immediately_on_new_rowset, "false");
+
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 535bf3146a1..3a949746818 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -142,5 +142,7 @@ DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms);
 
 DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms);
 
+DECLARE_mBool(enable_warmup_immediately_on_new_rowset);
+
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 4267d182cfc..1d3eb7d5060 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -53,7 +53,7 @@ struct TabletWithVersion {
 enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
 
 // Base class for all tablet classes
-class BaseTablet {
+class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
 public:
     explicit BaseTablet(TabletMetaSharedPtr tablet_meta);
     virtual ~BaseTablet();
diff --git a/be/test/cloud/cloud_tablet_test.cpp 
b/be/test/cloud/cloud_tablet_test.cpp
new file mode 100644
index 00000000000..5ec3df04175
--- /dev/null
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <cstdint>
+#include <ranges>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_warm_up_manager.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+using namespace std::chrono;
+
+class CloudTabletWarmUpStateTest : public testing::Test {
+public:
+    CloudTabletWarmUpStateTest() : _engine(CloudStorageEngine(EngineOptions 
{})) {}
+
+    void SetUp() override {
+        _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                          UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                          TCompressionType::LZ4F));
+        _tablet =
+                std::make_shared<CloudTablet>(_engine, 
std::make_shared<TabletMeta>(*_tablet_meta));
+    }
+    void TearDown() override {}
+
+    RowsetSharedPtr create_rowset(Version version, int num_segments = 1) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_version(version);
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        rs_meta->set_num_segments(num_segments);
+        RowsetSharedPtr rowset;
+        Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, 
&rowset);
+        if (!st.ok()) {
+            return nullptr;
+        }
+        return rowset;
+    }
+
+protected:
+    std::string _json_rowset_meta;
+    TabletMetaSharedPtr _tablet_meta;
+    std::shared_ptr<CloudTablet> _tablet;
+    CloudStorageEngine _engine;
+};
+
+// Test get_rowset_warmup_state for non-existent rowset
+TEST_F(CloudTabletWarmUpStateTest, TestGetRowsetWarmupStateNonExistent) {
+    auto rowset = create_rowset(Version(1, 1));
+    ASSERT_NE(rowset, nullptr);
+
+    auto non_existent_id = _engine.next_rowset_id();
+
+    WarmUpState state = _tablet->get_rowset_warmup_state(non_existent_id);
+    EXPECT_EQ(state, WarmUpState::NONE);
+}
+
+// Test add_rowset_warmup_state with TRIGGERED_BY_JOB state
+TEST_F(CloudTabletWarmUpStateTest, TestAddRowsetWarmupStateTriggeredByJob) {
+    auto rowset = create_rowset(Version(1, 1), 5);
+    ASSERT_NE(rowset, nullptr);
+
+    bool result = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                   
WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_TRUE(result);
+
+    // Verify the state is correctly set
+    WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test add_rowset_warmup_state with TRIGGERED_BY_SYNC_ROWSET state
+TEST_F(CloudTabletWarmUpStateTest, 
TestAddRowsetWarmupStateTriggeredBySyncRowset) {
+    auto rowset = create_rowset(Version(2, 2), 3);
+    ASSERT_NE(rowset, nullptr);
+
+    bool result = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                   
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_TRUE(result);
+
+    // Verify the state is correctly set
+    WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+}
+
+// Test adding duplicate rowset warmup state should fail
+TEST_F(CloudTabletWarmUpStateTest, TestAddDuplicateRowsetWarmupState) {
+    auto rowset = create_rowset(Version(3, 3), 2);
+    ASSERT_NE(rowset, nullptr);
+
+    // First addition should succeed
+    bool result1 = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                    
WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_TRUE(result1);
+
+    // Second addition should fail
+    bool result2 = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                    
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_FALSE(result2);
+
+    // State should remain the original one
+    WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test complete_rowset_segment_warmup for non-existent rowset
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupNonExistent) 
{
+    auto non_existent_id = _engine.next_rowset_id();
+
+    WarmUpState result = 
_tablet->complete_rowset_segment_warmup(non_existent_id, Status::OK());
+    EXPECT_EQ(result, WarmUpState::NONE);
+}
+
+// Test complete_rowset_segment_warmup with partial completion
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupPartial) {
+    auto rowset = create_rowset(Version(4, 4), 3);
+    ASSERT_NE(rowset, nullptr);
+
+    // Add rowset warmup state
+    bool add_result = 
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                       
WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_TRUE(add_result);
+
+    // Complete one segment, should still be in TRIGGERED_BY_JOB state
+    WarmUpState result1 =
+            _tablet->complete_rowset_segment_warmup(rowset->rowset_id(), 
Status::OK());
+    EXPECT_EQ(result1, WarmUpState::TRIGGERED_BY_JOB);
+
+    // Complete second segment, should still be in TRIGGERED_BY_JOB state
+    WarmUpState result2 =
+            _tablet->complete_rowset_segment_warmup(rowset->rowset_id(), 
Status::OK());
+    EXPECT_EQ(result2, WarmUpState::TRIGGERED_BY_JOB);
+
+    // Verify current state is still TRIGGERED_BY_JOB
+    WarmUpState current_state = 
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(current_state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test complete_rowset_segment_warmup with full completion
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupFull) {
+    auto rowset = create_rowset(Version(5, 5), 2);
+    ASSERT_NE(rowset, nullptr);
+
+    // Add rowset warmup state
+    bool add_result = 
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                       
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_TRUE(add_result);
+
+    // Complete first segment
+    WarmUpState result1 =
+            _tablet->complete_rowset_segment_warmup(rowset->rowset_id(), 
Status::OK());
+    EXPECT_EQ(result1, WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+
+    // Complete second segment, should transition to DONE state
+    WarmUpState result2 =
+            _tablet->complete_rowset_segment_warmup(rowset->rowset_id(), 
Status::OK());
+    EXPECT_EQ(result2, WarmUpState::DONE);
+
+    // Verify final state is DONE
+    WarmUpState final_state = 
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(final_state, WarmUpState::DONE);
+}
+
+// Test complete_rowset_segment_warmup with error status
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupWithError) {
+    auto rowset = create_rowset(Version(6, 6), 1);
+    ASSERT_NE(rowset, nullptr);
+
+    // Add rowset warmup state
+    bool add_result = 
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                       
WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_TRUE(add_result);
+
+    // Complete with error status, should still transition to DONE when all 
segments complete
+    Status error_status = Status::InternalError("Test error");
+    WarmUpState result = 
_tablet->complete_rowset_segment_warmup(rowset->rowset_id(), error_status);
+    EXPECT_EQ(result, WarmUpState::DONE);
+
+    // Verify final state is DONE even with error
+    WarmUpState final_state = 
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(final_state, WarmUpState::DONE);
+}
+
+// Test multiple rowsets warmup state management
+TEST_F(CloudTabletWarmUpStateTest, TestMultipleRowsetsWarmupState) {
+    auto rowset1 = create_rowset(Version(7, 7), 2);
+    auto rowset2 = create_rowset(Version(8, 8), 3);
+    auto rowset3 = create_rowset(Version(9, 9), 1);
+    ASSERT_NE(rowset1, nullptr);
+    ASSERT_NE(rowset2, nullptr);
+    ASSERT_NE(rowset3, nullptr);
+
+    // Add multiple rowsets
+    EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset1->rowset_meta()),
+                                                 
WarmUpState::TRIGGERED_BY_JOB));
+    EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset2->rowset_meta()),
+                                                 
WarmUpState::TRIGGERED_BY_SYNC_ROWSET));
+    EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset3->rowset_meta()),
+                                                 
WarmUpState::TRIGGERED_BY_JOB));
+
+    // Verify all states
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()),
+              WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+              WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset3->rowset_id()),
+              WarmUpState::TRIGGERED_BY_JOB);
+
+    // Complete rowset1 (2 segments)
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(), 
Status::OK()),
+              WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(), 
Status::OK()),
+              WarmUpState::DONE);
+
+    // Complete rowset3 (1 segment)
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset3->rowset_id(), 
Status::OK()),
+              WarmUpState::DONE);
+
+    // Verify states after completion
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()), 
WarmUpState::DONE);
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+              WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset3->rowset_id()), 
WarmUpState::DONE);
+}
+
+// Test warmup state with zero segments (edge case)
+TEST_F(CloudTabletWarmUpStateTest, TestWarmupStateWithZeroSegments) {
+    auto rowset = create_rowset(Version(10, 10), 0);
+    ASSERT_NE(rowset, nullptr);
+
+    // Add rowset with zero segments
+    bool add_result = 
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+                                                       
WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_TRUE(add_result);
+
+    // State should be immediately ready for completion since there are no 
segments to warm up
+    WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+    EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+
+    // Any completion call should handle the edge case gracefully
+    WarmUpState result = 
_tablet->complete_rowset_segment_warmup(rowset->rowset_id(), Status::OK());
+    // With 0 segments, the counter should already be 0, so this should 
transition to DONE
+    EXPECT_EQ(result, WarmUpState::DONE);
+}
+
+// Test concurrent access to warmup state (basic thread safety verification)
+TEST_F(CloudTabletWarmUpStateTest, TestConcurrentWarmupStateAccess) {
+    auto rowset1 = create_rowset(Version(11, 11), 4);
+    auto rowset2 = create_rowset(Version(12, 12), 3);
+    ASSERT_NE(rowset1, nullptr);
+    ASSERT_NE(rowset2, nullptr);
+
+    // Add rowsets from different "threads" (simulated by sequential calls)
+    EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset1->rowset_meta()),
+                                                 
WarmUpState::TRIGGERED_BY_JOB));
+    EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset2->rowset_meta()),
+                                                 
WarmUpState::TRIGGERED_BY_SYNC_ROWSET));
+
+    // Interleaved completion operations
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(), 
Status::OK()),
+              WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset2->rowset_id(), 
Status::OK()),
+              WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(), 
Status::OK()),
+              WarmUpState::TRIGGERED_BY_JOB);
+
+    // Check states are maintained correctly
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()),
+              WarmUpState::TRIGGERED_BY_JOB);
+    EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+              WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+}
+} // namespace doris
diff --git 
a/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
 
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
new file mode 100644
index 00000000000..99e1f4ad641
Binary files /dev/null and 
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
 differ
diff --git 
a/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
 
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
new file mode 100644
index 00000000000..bc9d3dd70ea
Binary files /dev/null and 
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
 differ
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
new file mode 100644
index 00000000000..7bbe3f01895
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_immediate_warmup_basic', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'block_file_cache_monitor_interval_sec=1',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    def clearFileCache = {ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+
+        // Check the status
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+
+        // clear file cache is async, wait it done
+        sleep(5000)
+    }
+
+    def updateBeConf = {cluster, key, value ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[4]
+            def (code, out, err) = update_be_config(ip, port, key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def getBrpcMetricsByCluster = {cluster, name->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        assert cluster_bes.size() > 0, "No backend found for cluster 
${cluster}"
+        def be = cluster_bes[0]
+        def ip = be[1]
+        def port = be[5]
+        return getBrpcMetrics(ip, port, name)
+    }
+
+    docker(options) {
+        def clusterName1 = "warmup_source"
+        def clusterName2 = "warmup_target"
+
+        // Add two clusters
+        cluster.addBackend(1, clusterName1)
+        cluster.addBackend(1, clusterName2)
+
+        def tag1 = getCloudBeTagByName(clusterName1)
+        def tag2 = getCloudBeTagByName(clusterName2)
+
+        logger.info("Cluster tag1: {}", tag1)
+        logger.info("Cluster tag2: {}", tag2)
+
+        updateBeConf(clusterName2, "enable_warmup_immediately_on_new_rowset", 
"true")
+
+        // Ensure we are in source cluster
+        sql """use @${clusterName1}"""
+
+        sql """
+            create table test (
+                col0 int not null,
+                col1 variant NOT NULL
+            ) DUPLICATE KEY(`col0`)
+            DISTRIBUTED BY HASH(col0) BUCKETS 1
+            PROPERTIES ("file_cache_ttl_seconds" = "3600", 
"disable_auto_compaction" = "true");
+        """
+
+        clearFileCacheOnAllBackends()
+        sleep(15000)
+
+        sql """insert into test values (1, '{"a" : 1.0}')"""
+        sql """insert into test values (2, '{"a" : 111.1111}')"""
+        sql """insert into test values (3, '{"a" : "11111"}')"""
+        sql """insert into test values (4, '{"a" : 1111111111}')"""
+        sql """insert into test values (5, '{"a" : 1111.11111}')"""
+
+        // switch to read cluster, trigger a sync rowset
+        sql """use @${clusterName2}"""
+        qt_sql """select * from test"""
+        assertEquals(5, getBrpcMetricsByCluster(clusterName2, 
"file_cache_download_submitted_num"))
+        assertEquals(5, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+        assertEquals(0, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_triggered_by_job_num"))
+
+        // switch to source cluster and trigger compaction
+        sql """use @${clusterName1}"""
+        trigger_and_wait_compaction("test", "cumulative")
+        sql """insert into test values (6, '{"a" : 1111.11111}')"""
+        sleep(2000)
+
+        // switch to read cluster, trigger a sync rowset
+        sql """use @${clusterName2}"""
+        qt_sql """select * from test"""
+        // wait until the injection complete
+        sleep(1000)
+
+        assertEquals(7, getBrpcMetricsByCluster(clusterName2, 
"file_cache_download_submitted_num"))
+        assertEquals(7, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+        assertEquals(0, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_triggered_by_job_num"))
+        sleep(5000)
+        assertEquals(7, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_complete_num"))
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
new file mode 100644
index 00000000000..fc1416984d2
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_immediate_warmup_multi_segments', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+    ]
+    options.beConfigs += [
+        'file_cache_enter_disk_resource_limit_mode_percent=99',
+        'enable_evict_file_cache_in_advance=false',
+        'block_file_cache_monitor_interval_sec=1',
+        'tablet_rowset_stale_sweep_time_sec=0',
+        'vacuum_stale_rowsets_interval_s=10',
+        'doris_scanner_row_bytes=1',
+    ]
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    def testTable = "test"
+
+    def clearFileCache = {ip, port ->
+        def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true";
+        def response = new URL(url).text
+        def json = new JsonSlurper().parseText(response)
+
+        // Check the status
+        if (json.status != "OK") {
+            throw new RuntimeException("Clear cache on ${ip}:${port} failed: 
${json.status}")
+        }
+    }
+
+    def clearFileCacheOnAllBackends = {
+        def backends = sql """SHOW BACKENDS"""
+
+        for (be in backends) {
+            def ip = be[1]
+            def port = be[4]
+            clearFileCache(ip, port)
+        }
+
+        // clear file cache is async, wait it done
+        sleep(5000)
+    }
+
+    def updateBeConf = {cluster, key, value ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[4]
+            def (code, out, err) = update_be_config(ip, port, key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+    }
+
+    def getBrpcMetrics = {ip, port, name ->
+        def url = "http://${ip}:${port}/brpc_metrics";
+        def metrics = new URL(url).text
+        def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+        if (matcher.find()) {
+            return matcher[0][1] as long
+        } else {
+            throw new RuntimeException("${name} not found for ${ip}:${port}")
+        }
+    }
+
+    def getBrpcMetricsByCluster = {cluster, name->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        assert cluster_bes.size() > 0, "No backend found for cluster 
${cluster}"
+        def be = cluster_bes[0]
+        def ip = be[1]
+        def port = be[5]
+        return getBrpcMetrics(ip, port, name)
+    }
+
+    def injectS3FileReadSlow = {cluster, sleep_s ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        def injectName = 'S3FileReader::read_at_impl.io_slow'
+        for (be in cluster_bes) {
+            def ip = be[1]
+            def port = be[4]
+            GetDebugPoint().enableDebugPoint(ip, port as int, NodeType.BE, 
injectName, [sleep:sleep_s, execute:1])
+        }
+    }
+
+    def getTabletStatus = { cluster, tablet_id, rowsetIndex, 
lastRowsetSegmentNum, enableAssert = false ->
+        def backends = sql """SHOW BACKENDS"""
+        def cluster_bes = backends.findAll { 
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+        assert cluster_bes.size() > 0, "No backend found for cluster 
${cluster}"
+        def be = cluster_bes[0]
+        def ip = be[1]
+        def port = be[4]
+        StringBuilder sb = new StringBuilder();
+        sb.append("curl -X GET http://${ip}:${port}";)
+        sb.append("/api/compaction/show?tablet_id=")
+        sb.append(tablet_id)
+
+        String command = sb.toString()
+        logger.info(command)
+        def process = command.execute()
+        def code = process.waitFor()
+        def out = process.getText()
+        logger.info("Get tablet status:  =" + code + ", out=" + out)
+        assertEquals(code, 0)
+
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
+        def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
+        logger.info("rowset: ${rowset}")
+
+        int start_index = rowset.indexOf("]")
+        int end_index = rowset.indexOf("DATA")
+        def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+        logger.info("segmentNumStr: ${segmentNumStr}")
+        if (enableAssert) {
+            assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
+        } else {
+            return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr);
+        }
+    }
+
+    docker(options) {
+        def clusterName1 = "warmup_source"
+        def clusterName2 = "warmup_target"
+
+        // Add two clusters
+        cluster.addBackend(1, clusterName1)
+        cluster.addBackend(1, clusterName2)
+
+        def tag1 = getCloudBeTagByName(clusterName1)
+        def tag2 = getCloudBeTagByName(clusterName2)
+
+        logger.info("Cluster tag1: {}", tag1)
+        logger.info("Cluster tag2: {}", tag2)
+
+        updateBeConf(clusterName2, "enable_warmup_immediately_on_new_rowset", 
"true")
+
+        // Ensure we are in source cluster
+        sql """use @${clusterName1}"""
+        sql """ DROP TABLE IF EXISTS ${testTable} """
+        sql """ CREATE TABLE IF NOT EXISTS ${testTable} (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL,
+            `v3` int(11) NULL,
+            `v4` int(11) NULL
+        ) unique KEY(`k1`, `k2`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true"
+            );
+        """
+
+        clearFileCacheOnAllBackends()
+        sleep(15000)
+
+        def tablets = sql_return_maparray """ show tablets from test; """
+        logger.info("tablets: " + tablets)
+        assertEquals(1, tablets.size())
+        def tablet = tablets[0]
+        String tablet_id = tablet.TabletId
+
+        GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+        try {
+            // load 1
+            streamLoad {
+                table "${testTable}"
+                set 'column_separator', ','
+                set 'compress_type', 'GZ'
+                file 'test_schema_change_add_key_column.csv.gz'
+                time 10000 // limit inflight 10s
+
+                check { res, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    def json = parseJson(res)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(8192, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                }
+            }
+            sql "sync"
+            def rowCount1 = sql """ select count() from ${testTable}; """
+            logger.info("rowCount1: ${rowCount1}")
+            // check generate 3 segments
+            getTabletStatus(clusterName1, tablet_id, 2, 3, true)
+
+            // switch to read cluster, trigger a sync rowset
+            injectS3FileReadSlow(clusterName2, 10)
+            // the query will be blocked by the injection, we call it async
+            def future = thread {
+                sql """use @${clusterName2}"""
+                sql """select * from test"""
+            }
+            sleep(1000)
+            assertEquals(1, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+            assertEquals(2, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_segment_complete_num"))
+            assertEquals(0, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_complete_num"))
+
+            future.get()
+            assertEquals(3, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_segment_complete_num"))
+            assertEquals(1, getBrpcMetricsByCluster(clusterName2, 
"file_cache_warm_up_rowset_complete_num"))
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to