This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 64b57dc3307 [feat](warm up) introduce immediate warmup on read cluster
(#54611)
64b57dc3307 is described below
commit 64b57dc330749eb7fef4cd92d55bd2dfead91d25
Author: zhannngchen <[email protected]>
AuthorDate: Mon Aug 18 13:46:35 2025 +0800
[feat](warm up) introduce immediate warmup on read cluster (#54611)
### What problem does this PR solve?
Problem Summary:
introduce a config `enable_warmup_immediately_on_new_rowset`, if user
set it to `true`
warm up will be triggerd automatically and immediately on syncing new
rowsets
NOTEļ¼
the method `get_rowset_warmup_state()` is used for #53540
---
be/src/cloud/cloud_internal_service.cpp | 12 +
be/src/cloud/cloud_meta_mgr.cpp | 3 +-
be/src/cloud/cloud_tablet.cpp | 72 ++++-
be/src/cloud/cloud_tablet.h | 11 +
be/src/cloud/cloud_warm_up_manager.cpp | 19 +-
be/src/cloud/cloud_warm_up_manager.h | 11 +-
be/src/cloud/config.cpp | 2 +
be/src/cloud/config.h | 2 +
be/src/olap/base_tablet.h | 2 +-
be/test/cloud/cloud_tablet_test.cpp | 301 +++++++++++++++++++++
.../cluster/test_immediate_warmup_basic.out | Bin 0 -> 300 bytes
.../test_schema_change_add_key_column.csv.gz | Bin 0 -> 72233 bytes
.../cluster/test_immediate_warmup_basic.groovy | 154 +++++++++++
.../test_immediate_warmup_multi_segments.groovy | 232 ++++++++++++++++
14 files changed, 813 insertions(+), 8 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 5641bc1ac54..68715c69560 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -21,6 +21,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_downloader.h"
@@ -219,6 +220,12 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
expiration_time = 0;
}
+ if (!tablet->add_rowset_warmup_state(rs_meta,
WarmUpState::TRIGGERED_BY_JOB)) {
+ LOG(INFO) << "found duplicate warmup task for rowset " <<
rs_meta.rowset_id()
+ << ", skip it";
+ continue;
+ }
+
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments();
segment_id++) {
auto download_done = [&, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string(),
@@ -252,6 +259,11 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download segment failed, tablet_id: " <<
tablet_id
<< " rowset_id: " << rowset_id << ", error: "
<< st;
}
+ if
(tablet->complete_rowset_segment_warmup(rs_meta.rowset_id(), st) ==
+ WarmUpState::DONE) {
+ VLOG_DEBUG << "warmup rowset " << rs_meta.version() << "("
<< rowset_id
+ << ") completed";
+ }
if (wait) {
wait->signal();
}
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 9c78a7529f1..86de6080585 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -737,7 +737,8 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
bool version_overlap =
tablet->max_version_unlocked() >=
rowsets.front()->start_version();
tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
- options.warmup_delta_data);
+ options.warmup_delta_data ||
+
config::enable_warmup_immediately_on_new_rowset);
}
tablet->last_base_compaction_success_time_ms =
stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index afe77b28955..88a3a66c2ab 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -94,6 +94,17 @@ bvar::Adder<uint64_t>
g_file_cache_recycle_cached_data_segment_size(
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
"file_cache_recycle_cached_data_index_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
+ "file_cache_warm_up_segment_complete_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
+ "file_cache_warm_up_segment_failed_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
+ "file_cache_warm_up_rowset_complete_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
+ "file_cache_warm_up_rowset_triggered_by_job_num");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
+ "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
+
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {}
@@ -243,6 +254,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
for (auto& rs : rowsets) {
if (version_overlap || warmup_delta_data) {
#ifndef BE_TEST
+ bool warm_up_state_updated = false;
// Warmup rowset data in background
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
const auto& rowset_meta = rs->rowset_meta();
@@ -271,7 +283,19 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
g_file_cache_cloud_tablet_submitted_segment_size
<<
rs->rowset_meta()->segment_file_size(seg_id);
}
+ if (!warm_up_state_updated) {
+ VLOG_DEBUG << "warm up rowset " << rs->version() <<
"(" << rs->rowset_id()
+ << ") triggerd by sync rowset";
+ if (!add_rowset_warmup_state_unlocked(
+ *(rs->rowset_meta()),
WarmUpState::TRIGGERED_BY_SYNC_ROWSET)) {
+ LOG(INFO) << "found duplicate warmup task for
rowset "
+ << rs->rowset_id() << ", skip it";
+ break;
+ }
+ warm_up_state_updated = true;
+ }
// clang-format off
+ auto self =
std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
.path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
.file_size =
rs->rowset_meta()->segment_file_size(seg_id),
@@ -281,7 +305,8 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
.expiration_time = expiration_time,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
- .download_done {[](Status st) {
+ .download_done {[=](Status st) {
+
self->complete_rowset_segment_warmup(rowset_meta->rowset_id(), st);
if (!st) {
LOG_WARNING("add rowset warm up error
").error(st);
}
@@ -441,6 +466,7 @@ void CloudTablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete,
_timestamped_version_tracker.add_stale_path_version(rs_metas);
for (auto&& rs : to_delete) {
_rs_version_map.erase(rs->version());
+ _rowset_warm_up_states.erase(rs->rowset_id());
}
_tablet_meta->modify_rs_metas({}, rs_metas, false);
@@ -1307,5 +1333,49 @@ Status CloudTablet::check_delete_bitmap_cache(int64_t
txn_id,
return Status::OK();
}
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+ std::shared_lock rlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset,
WarmUpState state) {
+ std::lock_guard wlock(_meta_lock);
+ return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
WarmUpState state) {
+ if (_rowset_warm_up_states.find(rowset.rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return false;
+ }
+ if (state == WarmUpState::TRIGGERED_BY_JOB) {
+ g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+ } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+ }
+ _rowset_warm_up_states[rowset.rowset_id()] = std::make_pair(state,
rowset.num_segments());
+ return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id,
Status status) {
+ std::lock_guard wlock(_meta_lock);
+ if (_rowset_warm_up_states.find(rowset_id) ==
_rowset_warm_up_states.end()) {
+ return WarmUpState::NONE;
+ }
+ VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id <<
", " << status;
+ g_file_cache_warm_up_segment_complete_num << 1;
+ if (!status.ok()) {
+ g_file_cache_warm_up_segment_failed_num << 1;
+ }
+ _rowset_warm_up_states[rowset_id].second--;
+ if (_rowset_warm_up_states[rowset_id].second <= 0) {
+ g_file_cache_warm_up_rowset_complete_num << 1;
+ _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+ }
+ return _rowset_warm_up_states[rowset_id].first;
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 69a086e9ba9..71b1055c119 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -25,6 +25,7 @@
namespace doris {
class CloudStorageEngine;
+enum class WarmUpState : int;
struct SyncRowsetStats {
int64_t get_remote_rowsets_num {0};
@@ -289,12 +290,19 @@ public:
static std::vector<RecycledRowsets> recycle_cached_data(
const std::vector<RowsetSharedPtr>& rowsets);
+ // Add warmup state management
+ WarmUpState get_rowset_warmup_state(RowsetId rowset_id);
+ bool add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpState state);
+ WarmUpState complete_rowset_segment_warmup(RowsetId rowset_id, Status
status);
+
private:
// FIXME(plat1ko): No need to record base size if rowsets are ordered by
version
void update_base_size(const Rowset& rs);
Status sync_if_not_running(SyncRowsetStats* stats = nullptr);
+ bool add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
WarmUpState state);
+
CloudStorageEngine& _engine;
// this mutex MUST ONLY be used when sync meta
@@ -350,6 +358,9 @@ private:
std::mutex _gc_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
std::vector<std::pair<std::vector<RowsetId>, DeleteBitmapKeyRanges>>
_unused_delete_bitmap;
+
+ // for warm up states management
+ std::unordered_map<RowsetId, std::pair<WarmUpState, int32_t>>
_rowset_warm_up_states;
};
using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 2ed92c856b9..17906672cdf 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -108,7 +108,7 @@ void CloudWarmUpManager::submit_download_tasks(io::Path
path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait,
- bool is_index) {
+ bool is_index,
std::function<void(Status)> done_cb) {
if (file_size < 0) {
auto st = file_system->file_size(path, &file_size);
if (!st.ok()) [[unlikely]] {
@@ -145,7 +145,8 @@ void CloudWarmUpManager::submit_download_tasks(io::Path
path, int64_t file_size,
.is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
- [=](Status st) {
+ [&](Status st) {
+ if (done_cb) done_cb(st);
if (!st) {
LOG_WARNING("Warm up error ").error(st);
} else if (is_index) {
@@ -225,12 +226,24 @@ void CloudWarmUpManager::handle_jobs() {
if (expiration_time <= UnixSeconds()) {
expiration_time = 0;
}
+ if (!tablet->add_rowset_warmup_state(*rs,
WarmUpState::TRIGGERED_BY_JOB)) {
+ LOG(INFO) << "found duplicate warmup task for rowset "
<< rs->rowset_id()
+ << ", skip it";
+ continue;
+ }
// 1st. download segment files
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs,
seg_id),
rs->segment_file_size(seg_id),
storage_resource.value()->fs,
- expiration_time, wait);
+ expiration_time, wait, false, [tablet, rs,
seg_id](Status st) {
+ VLOG_DEBUG << "warmup rowset " <<
rs->version() << " segment "
+ << seg_id << " completed";
+ if
(tablet->complete_rowset_segment_warmup(rs->rowset_id(), st) ==
+ WarmUpState::DONE) {
+ VLOG_DEBUG << "warmup rowset " <<
rs->version() << " completed";
+ }
+ });
// 2nd. download inverted index files
int64_t file_size = -1;
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 0d96dd12ede..f92280052ac 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -38,6 +38,13 @@ enum class DownloadType {
S3,
};
+enum class WarmUpState : int {
+ NONE,
+ TRIGGERED_BY_SYNC_ROWSET,
+ TRIGGERED_BY_JOB,
+ DONE,
+};
+
struct JobMeta {
JobMeta() = default;
JobMeta(const TJobMeta& meta);
@@ -91,8 +98,8 @@ private:
void submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
- std::shared_ptr<bthread::CountdownEvent> wait,
- bool is_index = false);
+ std::shared_ptr<bthread::CountdownEvent> wait,
bool is_index = false,
+ std::function<void(Status)> done_cb = nullptr);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index ab35268afc4..96e53d66b40 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -101,5 +101,7 @@ DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms,
"10000");
DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000");
+DEFINE_mBool(enable_warmup_immediately_on_new_rowset, "false");
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 535bf3146a1..3a949746818 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -142,5 +142,7 @@ DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms);
DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms);
+DECLARE_mBool(enable_warmup_immediately_on_new_rowset);
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 4267d182cfc..1d3eb7d5060 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -53,7 +53,7 @@ struct TabletWithVersion {
enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
// Base class for all tablet classes
-class BaseTablet {
+class BaseTablet : public std::enable_shared_from_this<BaseTablet> {
public:
explicit BaseTablet(TabletMetaSharedPtr tablet_meta);
virtual ~BaseTablet();
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
new file mode 100644
index 00000000000..5ec3df04175
--- /dev/null
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <cstdint>
+#include <ranges>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_warm_up_manager.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+using namespace std::chrono;
+
+class CloudTabletWarmUpStateTest : public testing::Test {
+public:
+ CloudTabletWarmUpStateTest() : _engine(CloudStorageEngine(EngineOptions
{})) {}
+
+ void SetUp() override {
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ _tablet =
+ std::make_shared<CloudTablet>(_engine,
std::make_shared<TabletMeta>(*_tablet_meta));
+ }
+ void TearDown() override {}
+
+ RowsetSharedPtr create_rowset(Version version, int num_segments = 1) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(version);
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ rs_meta->set_num_segments(num_segments);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta,
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+protected:
+ std::string _json_rowset_meta;
+ TabletMetaSharedPtr _tablet_meta;
+ std::shared_ptr<CloudTablet> _tablet;
+ CloudStorageEngine _engine;
+};
+
+// Test get_rowset_warmup_state for non-existent rowset
+TEST_F(CloudTabletWarmUpStateTest, TestGetRowsetWarmupStateNonExistent) {
+ auto rowset = create_rowset(Version(1, 1));
+ ASSERT_NE(rowset, nullptr);
+
+ auto non_existent_id = _engine.next_rowset_id();
+
+ WarmUpState state = _tablet->get_rowset_warmup_state(non_existent_id);
+ EXPECT_EQ(state, WarmUpState::NONE);
+}
+
+// Test add_rowset_warmup_state with TRIGGERED_BY_JOB state
+TEST_F(CloudTabletWarmUpStateTest, TestAddRowsetWarmupStateTriggeredByJob) {
+ auto rowset = create_rowset(Version(1, 1), 5);
+ ASSERT_NE(rowset, nullptr);
+
+ bool result = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_TRUE(result);
+
+ // Verify the state is correctly set
+ WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test add_rowset_warmup_state with TRIGGERED_BY_SYNC_ROWSET state
+TEST_F(CloudTabletWarmUpStateTest,
TestAddRowsetWarmupStateTriggeredBySyncRowset) {
+ auto rowset = create_rowset(Version(2, 2), 3);
+ ASSERT_NE(rowset, nullptr);
+
+ bool result = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_TRUE(result);
+
+ // Verify the state is correctly set
+ WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+}
+
+// Test adding duplicate rowset warmup state should fail
+TEST_F(CloudTabletWarmUpStateTest, TestAddDuplicateRowsetWarmupState) {
+ auto rowset = create_rowset(Version(3, 3), 2);
+ ASSERT_NE(rowset, nullptr);
+
+ // First addition should succeed
+ bool result1 = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_TRUE(result1);
+
+ // Second addition should fail
+ bool result2 = _tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_FALSE(result2);
+
+ // State should remain the original one
+ WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test complete_rowset_segment_warmup for non-existent rowset
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupNonExistent)
{
+ auto non_existent_id = _engine.next_rowset_id();
+
+ WarmUpState result =
_tablet->complete_rowset_segment_warmup(non_existent_id, Status::OK());
+ EXPECT_EQ(result, WarmUpState::NONE);
+}
+
+// Test complete_rowset_segment_warmup with partial completion
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupPartial) {
+ auto rowset = create_rowset(Version(4, 4), 3);
+ ASSERT_NE(rowset, nullptr);
+
+ // Add rowset warmup state
+ bool add_result =
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_TRUE(add_result);
+
+ // Complete one segment, should still be in TRIGGERED_BY_JOB state
+ WarmUpState result1 =
+ _tablet->complete_rowset_segment_warmup(rowset->rowset_id(),
Status::OK());
+ EXPECT_EQ(result1, WarmUpState::TRIGGERED_BY_JOB);
+
+ // Complete second segment, should still be in TRIGGERED_BY_JOB state
+ WarmUpState result2 =
+ _tablet->complete_rowset_segment_warmup(rowset->rowset_id(),
Status::OK());
+ EXPECT_EQ(result2, WarmUpState::TRIGGERED_BY_JOB);
+
+ // Verify current state is still TRIGGERED_BY_JOB
+ WarmUpState current_state =
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(current_state, WarmUpState::TRIGGERED_BY_JOB);
+}
+
+// Test complete_rowset_segment_warmup with full completion
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupFull) {
+ auto rowset = create_rowset(Version(5, 5), 2);
+ ASSERT_NE(rowset, nullptr);
+
+ // Add rowset warmup state
+ bool add_result =
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_TRUE(add_result);
+
+ // Complete first segment
+ WarmUpState result1 =
+ _tablet->complete_rowset_segment_warmup(rowset->rowset_id(),
Status::OK());
+ EXPECT_EQ(result1, WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+
+ // Complete second segment, should transition to DONE state
+ WarmUpState result2 =
+ _tablet->complete_rowset_segment_warmup(rowset->rowset_id(),
Status::OK());
+ EXPECT_EQ(result2, WarmUpState::DONE);
+
+ // Verify final state is DONE
+ WarmUpState final_state =
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(final_state, WarmUpState::DONE);
+}
+
+// Test complete_rowset_segment_warmup with error status
+TEST_F(CloudTabletWarmUpStateTest, TestCompleteRowsetSegmentWarmupWithError) {
+ auto rowset = create_rowset(Version(6, 6), 1);
+ ASSERT_NE(rowset, nullptr);
+
+ // Add rowset warmup state
+ bool add_result =
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_TRUE(add_result);
+
+ // Complete with error status, should still transition to DONE when all
segments complete
+ Status error_status = Status::InternalError("Test error");
+ WarmUpState result =
_tablet->complete_rowset_segment_warmup(rowset->rowset_id(), error_status);
+ EXPECT_EQ(result, WarmUpState::DONE);
+
+ // Verify final state is DONE even with error
+ WarmUpState final_state =
_tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(final_state, WarmUpState::DONE);
+}
+
+// Test multiple rowsets warmup state management
+TEST_F(CloudTabletWarmUpStateTest, TestMultipleRowsetsWarmupState) {
+ auto rowset1 = create_rowset(Version(7, 7), 2);
+ auto rowset2 = create_rowset(Version(8, 8), 3);
+ auto rowset3 = create_rowset(Version(9, 9), 1);
+ ASSERT_NE(rowset1, nullptr);
+ ASSERT_NE(rowset2, nullptr);
+ ASSERT_NE(rowset3, nullptr);
+
+ // Add multiple rowsets
+ EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset1->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB));
+ EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset2->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET));
+ EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset3->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB));
+
+ // Verify all states
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()),
+ WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+ WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset3->rowset_id()),
+ WarmUpState::TRIGGERED_BY_JOB);
+
+ // Complete rowset1 (2 segments)
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(),
Status::OK()),
+ WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(),
Status::OK()),
+ WarmUpState::DONE);
+
+ // Complete rowset3 (1 segment)
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset3->rowset_id(),
Status::OK()),
+ WarmUpState::DONE);
+
+ // Verify states after completion
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()),
WarmUpState::DONE);
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+ WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset3->rowset_id()),
WarmUpState::DONE);
+}
+
+// Test warmup state with zero segments (edge case)
+TEST_F(CloudTabletWarmUpStateTest, TestWarmupStateWithZeroSegments) {
+ auto rowset = create_rowset(Version(10, 10), 0);
+ ASSERT_NE(rowset, nullptr);
+
+ // Add rowset with zero segments
+ bool add_result =
_tablet->add_rowset_warmup_state(*(rowset->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_TRUE(add_result);
+
+ // State should be immediately ready for completion since there are no
segments to warm up
+ WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+ EXPECT_EQ(state, WarmUpState::TRIGGERED_BY_JOB);
+
+ // Any completion call should handle the edge case gracefully
+ WarmUpState result =
_tablet->complete_rowset_segment_warmup(rowset->rowset_id(), Status::OK());
+ // With 0 segments, the counter should already be 0, so this should
transition to DONE
+ EXPECT_EQ(result, WarmUpState::DONE);
+}
+
+// Test concurrent access to warmup state (basic thread safety verification)
+TEST_F(CloudTabletWarmUpStateTest, TestConcurrentWarmupStateAccess) {
+ auto rowset1 = create_rowset(Version(11, 11), 4);
+ auto rowset2 = create_rowset(Version(12, 12), 3);
+ ASSERT_NE(rowset1, nullptr);
+ ASSERT_NE(rowset2, nullptr);
+
+ // Add rowsets from different "threads" (simulated by sequential calls)
+ EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset1->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_JOB));
+ EXPECT_TRUE(_tablet->add_rowset_warmup_state(*(rowset2->rowset_meta()),
+
WarmUpState::TRIGGERED_BY_SYNC_ROWSET));
+
+ // Interleaved completion operations
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(),
Status::OK()),
+ WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset2->rowset_id(),
Status::OK()),
+ WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+ EXPECT_EQ(_tablet->complete_rowset_segment_warmup(rowset1->rowset_id(),
Status::OK()),
+ WarmUpState::TRIGGERED_BY_JOB);
+
+ // Check states are maintained correctly
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset1->rowset_id()),
+ WarmUpState::TRIGGERED_BY_JOB);
+ EXPECT_EQ(_tablet->get_rowset_warmup_state(rowset2->rowset_id()),
+ WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+}
+} // namespace doris
diff --git
a/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
new file mode 100644
index 00000000000..99e1f4ad641
Binary files /dev/null and
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.out
differ
diff --git
a/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
new file mode 100644
index 00000000000..bc9d3dd70ea
Binary files /dev/null and
b/regression-test/data/cloud_p0/cache/multi_cluster/warm_up/cluster/test_schema_change_add_key_column.csv.gz
differ
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
new file mode 100644
index 00000000000..7bbe3f01895
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_basic.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_immediate_warmup_basic', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'block_file_cache_monitor_interval_sec=1',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(5000)
+ }
+
+ def updateBeConf = {cluster, key, value ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[4]
+ def (code, out, err) = update_be_config(ip, port, key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def getBrpcMetricsByCluster = {cluster, name->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ assert cluster_bes.size() > 0, "No backend found for cluster
${cluster}"
+ def be = cluster_bes[0]
+ def ip = be[1]
+ def port = be[5]
+ return getBrpcMetrics(ip, port, name)
+ }
+
+ docker(options) {
+ def clusterName1 = "warmup_source"
+ def clusterName2 = "warmup_target"
+
+ // Add two clusters
+ cluster.addBackend(1, clusterName1)
+ cluster.addBackend(1, clusterName2)
+
+ def tag1 = getCloudBeTagByName(clusterName1)
+ def tag2 = getCloudBeTagByName(clusterName2)
+
+ logger.info("Cluster tag1: {}", tag1)
+ logger.info("Cluster tag2: {}", tag2)
+
+ updateBeConf(clusterName2, "enable_warmup_immediately_on_new_rowset",
"true")
+
+ // Ensure we are in source cluster
+ sql """use @${clusterName1}"""
+
+ sql """
+ create table test (
+ col0 int not null,
+ col1 variant NOT NULL
+ ) DUPLICATE KEY(`col0`)
+ DISTRIBUTED BY HASH(col0) BUCKETS 1
+ PROPERTIES ("file_cache_ttl_seconds" = "3600",
"disable_auto_compaction" = "true");
+ """
+
+ clearFileCacheOnAllBackends()
+ sleep(15000)
+
+ sql """insert into test values (1, '{"a" : 1.0}')"""
+ sql """insert into test values (2, '{"a" : 111.1111}')"""
+ sql """insert into test values (3, '{"a" : "11111"}')"""
+ sql """insert into test values (4, '{"a" : 1111111111}')"""
+ sql """insert into test values (5, '{"a" : 1111.11111}')"""
+
+ // switch to read cluster, trigger a sync rowset
+ sql """use @${clusterName2}"""
+ qt_sql """select * from test"""
+ assertEquals(5, getBrpcMetricsByCluster(clusterName2,
"file_cache_download_submitted_num"))
+ assertEquals(5, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+ assertEquals(0, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_triggered_by_job_num"))
+
+ // switch to source cluster and trigger compaction
+ sql """use @${clusterName1}"""
+ trigger_and_wait_compaction("test", "cumulative")
+ sql """insert into test values (6, '{"a" : 1111.11111}')"""
+ sleep(2000)
+
+ // switch to read cluster, trigger a sync rowset
+ sql """use @${clusterName2}"""
+ qt_sql """select * from test"""
+ // wait until the injection complete
+ sleep(1000)
+
+ assertEquals(7, getBrpcMetricsByCluster(clusterName2,
"file_cache_download_submitted_num"))
+ assertEquals(7, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+ assertEquals(0, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_triggered_by_job_num"))
+ sleep(5000)
+ assertEquals(7, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_complete_num"))
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
new file mode 100644
index 00000000000..fc1416984d2
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_immediate_warmup_multi_segments.groovy
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_immediate_warmup_multi_segments', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'block_file_cache_monitor_interval_sec=1',
+ 'tablet_rowset_stale_sweep_time_sec=0',
+ 'vacuum_stale_rowsets_interval_s=10',
+ 'doris_scanner_row_bytes=1',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ def testTable = "test"
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(5000)
+ }
+
+ def updateBeConf = {cluster, key, value ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[4]
+ def (code, out, err) = update_be_config(ip, port, key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def getBrpcMetricsByCluster = {cluster, name->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ assert cluster_bes.size() > 0, "No backend found for cluster
${cluster}"
+ def be = cluster_bes[0]
+ def ip = be[1]
+ def port = be[5]
+ return getBrpcMetrics(ip, port, name)
+ }
+
+ def injectS3FileReadSlow = {cluster, sleep_s ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ def injectName = 'S3FileReader::read_at_impl.io_slow'
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[4]
+ GetDebugPoint().enableDebugPoint(ip, port as int, NodeType.BE,
injectName, [sleep:sleep_s, execute:1])
+ }
+ }
+
+ def getTabletStatus = { cluster, tablet_id, rowsetIndex,
lastRowsetSegmentNum, enableAssert = false ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ assert cluster_bes.size() > 0, "No backend found for cluster
${cluster}"
+ def be = cluster_bes[0]
+ def ip = be[1]
+ def port = be[4]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${ip}:${port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
+ def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
+ logger.info("rowset: ${rowset}")
+
+ int start_index = rowset.indexOf("]")
+ int end_index = rowset.indexOf("DATA")
+ def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+ logger.info("segmentNumStr: ${segmentNumStr}")
+ if (enableAssert) {
+ assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
+ } else {
+ return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr);
+ }
+ }
+
+ docker(options) {
+ def clusterName1 = "warmup_source"
+ def clusterName2 = "warmup_target"
+
+ // Add two clusters
+ cluster.addBackend(1, clusterName1)
+ cluster.addBackend(1, clusterName2)
+
+ def tag1 = getCloudBeTagByName(clusterName1)
+ def tag2 = getCloudBeTagByName(clusterName2)
+
+ logger.info("Cluster tag1: {}", tag1)
+ logger.info("Cluster tag2: {}", tag2)
+
+ updateBeConf(clusterName2, "enable_warmup_immediately_on_new_rowset",
"true")
+
+ // Ensure we are in source cluster
+ sql """use @${clusterName1}"""
+ sql """ DROP TABLE IF EXISTS ${testTable} """
+ sql """ CREATE TABLE IF NOT EXISTS ${testTable} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL,
+ `v3` int(11) NULL,
+ `v4` int(11) NULL
+ ) unique KEY(`k1`, `k2`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ clearFileCacheOnAllBackends()
+ sleep(15000)
+
+ def tablets = sql_return_maparray """ show tablets from test; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+
+ GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+ try {
+ // load 1
+ streamLoad {
+ table "${testTable}"
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+ file 'test_schema_change_add_key_column.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { res, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(8192, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ def rowCount1 = sql """ select count() from ${testTable}; """
+ logger.info("rowCount1: ${rowCount1}")
+ // check generate 3 segments
+ getTabletStatus(clusterName1, tablet_id, 2, 3, true)
+
+ // switch to read cluster, trigger a sync rowset
+ injectS3FileReadSlow(clusterName2, 10)
+ // the query will be blocked by the injection, we call it async
+ def future = thread {
+ sql """use @${clusterName2}"""
+ sql """select * from test"""
+ }
+ sleep(1000)
+ assertEquals(1, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_triggered_by_sync_rowset_num"))
+ assertEquals(2, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_segment_complete_num"))
+ assertEquals(0, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_complete_num"))
+
+ future.get()
+ assertEquals(3, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_segment_complete_num"))
+ assertEquals(1, getBrpcMetricsByCluster(clusterName2,
"file_cache_warm_up_rowset_complete_num"))
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]