This is an automated email from the ASF dual-hosted git repository.
zhangchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 290dc2a05ed [opt](warm up) support delaying compaction commit until
rowset warm-up finishes (#54416)
290dc2a05ed is described below
commit 290dc2a05ed5b28ce886036a481b72838da3a054
Author: zhannngchen <[email protected]>
AuthorDate: Thu Aug 7 20:08:44 2025 +0800
[opt](warm up) support delaying compaction commit until rowset warm-up
finishes (#54416)
### What problem does this PR solve?
Problem Summary:
This PR introduces an optimization to the event driven warm-up mechanism
by implementing delayed compaction commit functionality. The change
ensures that compaction operations wait for rowset warm-up processes to
complete before finalizing the commit, improving file-cache hit ratio on
read cluster.
---
be/src/cloud/cloud_internal_service.cpp | 38 ++-
be/src/cloud/cloud_meta_mgr.cpp | 19 +-
be/src/cloud/cloud_warm_up_manager.cpp | 29 +-
be/src/cloud/cloud_warm_up_manager.h | 10 +-
be/src/cloud/config.cpp | 6 +
be/src/cloud/config.h | 11 +
be/src/io/fs/s3_file_reader.cpp | 9 +
gensrc/proto/internal_service.proto | 1 +
...rm_up_cluster_event_compaction_sync_wait.groovy | 375 ++++++++++++++++++++
...uster_event_compaction_sync_wait_timeout.groovy | 377 +++++++++++++++++++++
10 files changed, 869 insertions(+), 6 deletions(-)
diff --git a/be/src/cloud/cloud_internal_service.cpp
b/be/src/cloud/cloud_internal_service.cpp
index 7beae0b825e..e35cfd0e01f 100644
--- a/be/src/cloud/cloud_internal_service.cpp
+++ b/be/src/cloud/cloud_internal_service.cpp
@@ -17,6 +17,8 @@
#include "cloud/cloud_internal_service.h"
+#include <bthread/countdown_event.h>
+
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
@@ -151,6 +153,10 @@ bvar::Adder<uint64_t>
g_file_cache_warm_up_rowset_request_to_handle_slow_count(
"file_cache_warm_up_rowset_request_to_handle_slow_count");
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_handle_to_finish_slow_count(
"file_cache_warm_up_rowset_handle_to_finish_slow_count");
+bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
+ "file_cache_warm_up_rowset_wait_for_compaction_num");
+bvar::Adder<uint64_t>
g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
+ "file_cache_warm_up_rowset_wait_for_compaction_timeout_num");
void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController*
controller
[[maybe_unused]],
@@ -158,6 +164,15 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
PWarmUpRowsetResponse* response,
google::protobuf::Closure* done)
{
brpc::ClosureGuard closure_guard(done);
+ std::shared_ptr<bthread::CountdownEvent> wait = nullptr;
+ timespec due_time;
+ if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms()
> 0) {
+ g_file_cache_warm_up_rowset_wait_for_compaction_num << 1;
+ wait = std::make_shared<bthread::CountdownEvent>(0);
+ VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms()
<< " ms";
+ due_time =
butil::milliseconds_from_now(request->sync_wait_timeout_ms());
+ }
+
for (auto& rs_meta_pb : request->rowset_metas()) {
RowsetMeta rs_meta;
rs_meta.init_from_pb(rs_meta_pb);
@@ -196,9 +211,10 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
for (int64_t segment_id = 0; segment_id < rs_meta.num_segments();
segment_id++) {
- auto download_done = [=, tablet_id = rs_meta.tablet_id(),
+ auto download_done = [&, tablet_id = rs_meta.tablet_id(),
rowset_id = rs_meta.rowset_id().to_string(),
- segment_size =
rs_meta.segment_file_size(segment_id)](Status st) {
+ segment_size =
rs_meta.segment_file_size(segment_id),
+ wait](Status st) {
if (st.ok()) {
g_file_cache_event_driven_warm_up_finished_segment_num <<
1;
g_file_cache_event_driven_warm_up_finished_segment_size <<
segment_size;
@@ -227,6 +243,9 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download segment failed, tablet_id: " <<
tablet_id
<< " rowset_id: " << rowset_id << ", error: "
<< st;
}
+ if (wait) {
+ wait->signal();
+ }
};
io::DownloadFileMeta download_meta {
@@ -247,6 +266,9 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size
<< rs_meta.segment_file_size(segment_id);
+ if (wait) {
+ wait->add_count();
+ }
_engine.file_cache_block_downloader().submit_download_task(download_meta);
auto download_inverted_index = [&](std::string index_path,
uint64_t idx_size) {
@@ -285,6 +307,9 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
LOG(WARNING) << "download inverted index failed,
tablet_id: " << tablet_id
<< " rowset_id: " << rowset_id << ",
error: " << st;
}
+ if (wait) {
+ wait->signal();
+ }
};
io::DownloadFileMeta download_meta {
.path = io::Path(index_path),
@@ -301,6 +326,10 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size <<
idx_size;
+
+ if (wait) {
+ wait->add_count();
+ }
_engine.file_cache_block_downloader().submit_download_task(download_meta);
};
@@ -341,6 +370,11 @@ void
CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
}
}
}
+ if (wait && wait->timed_wait(due_time)) {
+ g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1;
+ LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms",
+ request->rowset_metas().size(),
request->sync_wait_timeout_ms());
+ }
}
bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_segment_num(
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 6e33a711d2a..f1e1d1d6fa2 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1083,8 +1083,25 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta,
const std::string& job_i
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
}
+
+ int64_t timeout_ms = -1;
+ // if the `job_id` is not empty, it means this rowset was produced by a
compaction job.
+ if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty())
{
+ // 1. assume the download speed is 100MB/s
+ // 2. we double the download time as timeout for safety
+ // 3. for small rowsets, the timeout we calculate maybe quite small,
so we need a min_time_out
+ const double speed_mbps = 100.0; // 100MB/s
+ const double safety_factor = 2.0;
+ timeout_ms = std::min(
+
std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) /
+ (speed_mbps * 1024 * 1024) *
safety_factor * 1000),
+ config::warm_up_rowset_sync_wait_min_timeout_ms),
+ config::warm_up_rowset_sync_wait_max_timeout_ms);
+ LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: "
<< job_id
+ << ", with timeout: " << timeout_ms << " ms";
+ }
auto& manager =
ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
- manager.warm_up_rowset(rs_meta);
+ manager.warm_up_rowset(rs_meta, timeout_ms);
return st;
}
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 6d4136b53a1..5e5376a2da8 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -76,6 +76,9 @@ bvar::Status<int64_t>
g_file_cache_warm_up_rowset_last_call_unix_ts(
"file_cache_warm_up_rowset_last_call_unix_ts", 0);
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up",
"failed_task_num");
+bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
+ "file_cache_warm_up_rowset_wait_for_compaction_latency");
+
CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) :
_engine(engine) {
_download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
}
@@ -497,8 +500,9 @@ std::vector<TReplicaInfo>
CloudWarmUpManager::get_replica_info(int64_t tablet_id
return replicas;
}
-void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta) {
- auto replicas = get_replica_info(rs_meta.tablet_id());
+void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t
sync_wait_timeout_ms) {
+ auto tablet_id = rs_meta.tablet_id();
+ auto replicas = get_replica_info(tablet_id);
if (replicas.empty()) {
LOG(INFO) << "There is no need to warmup tablet=" <<
rs_meta.tablet_id()
<< ", skipping rowset=" << rs_meta.rowset_id().to_string();
@@ -512,6 +516,7 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta&
rs_meta) {
PWarmUpRowsetRequest request;
request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
request.set_unix_ts_us(now_ts);
+ request.set_sync_wait_timeout_ms(sync_wait_timeout_ms);
for (auto& replica : replicas) {
// send sync request
std::string host = replica.host;
@@ -576,8 +581,28 @@ void CloudWarmUpManager::warm_up_rowset(RowsetMeta&
rs_meta) {
}
brpc::Controller cntl;
+ if (sync_wait_timeout_ms > 0) {
+ cntl.set_timeout_ms(sync_wait_timeout_ms + 1000);
+ }
PWarmUpRowsetResponse response;
+ MonotonicStopWatch watch;
+ watch.start();
brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
+ if (cntl.Failed()) {
+ LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error:
{}",
+ rs_meta.rowset_id().to_string(), tablet_id,
cntl.ErrorText());
+ return;
+ }
+ if (sync_wait_timeout_ms > 0) {
+ auto cost_us = watch.elapsed_time_microseconds();
+ VLOG_DEBUG << "warm up rowset wait for compaction: " << cost_us <<
" us";
+ if (cost_us / 1000 > sync_wait_timeout_ms) {
+ LOG_WARNING(
+ "Warm up rowset {} for tabelt {} wait for compaction
timeout, takes {} ms",
+ rs_meta.rowset_id().to_string(), tablet_id, cost_us /
1000);
+ }
+ g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us;
+ }
}
}
diff --git a/be/src/cloud/cloud_warm_up_manager.h
b/be/src/cloud/cloud_warm_up_manager.h
index 7418e47011c..465dbcf33db 100644
--- a/be/src/cloud/cloud_warm_up_manager.h
+++ b/be/src/cloud/cloud_warm_up_manager.h
@@ -71,7 +71,15 @@ public:
Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear
= false);
- void warm_up_rowset(RowsetMeta& rs_meta);
+ // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
+ // and return immediately without waiting for the warm-up to complete.
+ // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
+ // to finish or until the specified timeout (in milliseconds) is reached.
+ //
+ // @param rs_meta Metadata of the rowset to be warmed up.
+ // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the
warm-up
+ // to complete. Non-positive value means no
waiting.
+ void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms =
-1);
void recycle_cache(int64_t tablet_id, const std::vector<RowsetId>&
rowset_ids,
const std::vector<int64_t>& num_segments,
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 5d8d1aadbeb..ab35268afc4 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -95,5 +95,11 @@ DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec,
"600");
DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000");
+DEFINE_mBool(enable_compaction_delay_commit_for_warm_up, "false");
+
+DEFINE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms, "10000");
+
+DEFINE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms, "120000");
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index d54f9f6948d..535bf3146a1 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -131,5 +131,16 @@ DECLARE_mInt64(warmup_tablet_replica_info_cache_ttl_sec);
DECLARE_mInt64(warm_up_rowset_slow_log_ms);
+// When event driven warm-up is enabled by the user, turning on this option
can help
+// avoid file cache misses in the read cluster caused by compaction.
+// If enabled, compaction will wait for the warm-up to complete before
committing.
+//
+// ATTN: Enabling this option may slow down compaction due to the added wait.
+DECLARE_mBool(enable_compaction_delay_commit_for_warm_up);
+
+DECLARE_mInt64(warm_up_rowset_sync_wait_min_timeout_ms);
+
+DECLARE_mInt64(warm_up_rowset_sync_wait_max_timeout_ms);
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 80c58235300..eede868468f 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -38,6 +38,7 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "util/bvar_helper.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/s3_util.h"
@@ -130,6 +131,14 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
LIMIT_REMOTE_SCAN_IO(bytes_read);
+ DBUG_EXECUTE_IF("S3FileReader::read_at_impl.io_slow", {
+ auto sleep_time = dp->param("sleep", 3);
+ LOG_INFO("S3FileReader::read_at_impl.io_slow inject sleep {} s",
sleep_time)
+ .tag("bucket", _bucket)
+ .tag("key", _key);
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+ });
+
int total_sleep_time = 0;
while (retry_count <= max_retries) {
*bytes_read = 0;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 8cd989683c2..fa7084e3f0b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -863,6 +863,7 @@ message PGetFileCacheMetaResponse {
message PWarmUpRowsetRequest {
repeated RowsetMetaPB rowset_metas = 1;
optional int64 unix_ts_us = 2;
+ optional int64 sync_wait_timeout_ms = 3;
}
message PWarmUpRowsetResponse {
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
new file mode 100644
index 00000000000..4879f83aab8
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait.groovy
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_warm_up_cluster_event_compaction_sync_wait', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ 'warm_up_rowset_slow_log_ms=1',
+ 'enable_compaction_delay_commit_for_warm_up=true',
+ 'warm_up_rowset_sync_wait_min_timeout_ms=20000',
+ 'warm_up_rowset_sync_wait_max_timeout_ms=30000',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(5000)
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def getBeIpAndPort = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+
+ if (cluster_bes.isEmpty()) {
+ throw new RuntimeException("No BE found for cluster: ${cluster}")
+ }
+
+ def firstBe = cluster_bes[0]
+ return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]]
+ }
+
+ def logFileCacheDownloadMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted = getBrpcMetrics(ip, port,
"file_cache_download_submitted_num")
+ def finished = getBrpcMetrics(ip, port,
"file_cache_download_finished_num")
+ def failed = getBrpcMetrics(ip, port,
"file_cache_download_failed_num")
+ logger.info("${cluster} be ${ip}:${port}, downloader
submitted=${submitted}"
+ + ", finished=${finished}, failed=${failed}")
+ }
+ }
+
+ def logWarmUpRowsetMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def finished_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_segment_num")
+ def failed_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_segment_num")
+ def submitted_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_index_num")
+ def finished_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_index_num")
+ def failed_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_index_num")
+ def compaction_sync_wait = getBrpcMetrics(ip, port,
"file_cache_warm_up_rowset_wait_for_compaction_num")
+ logger.info("${cluster} be ${ip}:${port},
submitted_segment=${submitted_segment}"
+ + ", finished_segment=${finished_segment},
failed_segment=${failed_segment}"
+ + ", submitted_index=${submitted_index}"
+ + ", finished_index=${finished_index}"
+ + ", failed_index=${failed_index}"
+ + ", compaction_sync_wait=${compaction_sync_wait}")
+ }
+ }
+
+ def getTTLCacheSize = { ip, port ->
+ return getBrpcMetrics(ip, port, "ttl_cache_size")
+ }
+
+ def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+ def backends = sql """SHOW BACKENDS"""
+
+ def srcBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+ def tgtBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+ long srcSum = 0
+ for (src in srcBes) {
+ def ip = src[1]
+ def port = src[5]
+ srcSum += getTTLCacheSize(ip, port)
+ }
+
+ long tgtSum = 0
+ for (tgt in tgtBes) {
+ def ip = tgt[1]
+ def port = tgt[5]
+ tgtSum += getTTLCacheSize(ip, port)
+ }
+
+ logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+ assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+ assertEquals(srcSum, tgtSum)
+ }
+
+ def waitForBrpcMetricValue = { ip, port, metricName, targetValue,
timeoutMs ->
+ def delta_time = 100
+ def useTime = 0
+
+ for(int t = delta_time; t <= timeoutMs; t += delta_time){
+ try {
+ def currentValue = getBrpcMetrics(ip, port, metricName)
+
+ if (currentValue == targetValue) {
+ logger.info("BE ${ip}:${port} metric ${metricName} reached
target value: ${targetValue}")
+ return true
+ }
+
+ logger.info("BE ${ip}:${port} metric ${metricName} current
value: ${currentValue}, target: ${targetValue}")
+
+ } catch (Exception e) {
+ logger.warn("Failed to get metric ${metricName} from BE
${ip}:${port}: ${e.message}")
+ }
+
+ useTime = t
+ sleep(delta_time)
+ }
+
+ assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout")
+ }
+
+ def getTabletStatus = { ip, port, tablet_id ->
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${ip}:${port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def checkFileCacheRecycle = { cluster, rowsets ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ assert cluster_bes.size() > 0, "No backend found for cluster
${cluster}"
+ def be = cluster_bes[0]
+ def ip = be[1]
+ def port = be[4]
+
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+
+ logger.info("rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ // in this case only [2-11] and [12-12] should have data in cache
+ if ((start_version == 2 && end_version == 11) || (start_version ==
12)) {
+ assertTrue(data.size() > 0)
+ } else {
+ assertTrue(data.size() == 0)
+ }
+ }
+ }
+
+ docker(options) {
+ def clusterName1 = "warmup_source"
+ def clusterName2 = "warmup_target"
+
+ // Add two clusters
+ cluster.addBackend(1, clusterName1)
+ cluster.addBackend(1, clusterName2)
+
+ def tag1 = getCloudBeTagByName(clusterName1)
+ def tag2 = getCloudBeTagByName(clusterName2)
+
+ logger.info("Cluster tag1: {}", tag1)
+ logger.info("Cluster tag2: {}", tag2)
+
+ def jsonSlurper = new JsonSlurper()
+
+ def getJobState = { jobId ->
+ def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ return jobStateResult[0][3]
+ }
+
+ // Ensure we are in source cluster
+ sql """use @${clusterName1}"""
+
+ // Start warm up job
+ def jobId_ = sql """
+ WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+ PROPERTIES (
+ "sync_mode" = "event_driven",
+ "sync_event" = "load"
+ )
+ """
+
+ def jobId = jobId_[0][0]
+ logger.info("Warm-up job ID: ${jobId}")
+
+ sql """
+ create table test (
+ col0 int not null,
+ col1 variant NOT NULL
+ ) UNIQUE KEY(`col0`)
+ DISTRIBUTED BY HASH(col0) BUCKETS 1
+ PROPERTIES ("file_cache_ttl_seconds" = "3600",
"disable_auto_compaction" = "true");
+ """
+
+ clearFileCacheOnAllBackends()
+ sleep(15000)
+
+ sql """use @${clusterName1}"""
+ // load data
+ sql """insert into test values (1, '{"a" : 1.0}')"""
+ sql """insert into test values (2, '{"a" : 111.1111}')"""
+ sql """insert into test values (3, '{"a" : "11111"}')"""
+ sql """insert into test values (4, '{"a" : 1111111111}')"""
+ sql """insert into test values (5, '{"a" : 1111.11111}')"""
+ sql """insert into test values (6, '{"a" : "11111"}')"""
+ sql """insert into test values (7, '{"a" : 11111.11111}')"""
+ sql """insert into test values (7, '{"a" : 11111.11111}')"""
+ sleep(15000)
+
+ def tablets = sql_return_maparray """ show tablets from test; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+
+ def be = getBeIpAndPort(clusterName2)
+
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ def num_submitted = getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def num_finished = getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num")
+ assertTrue(num_submitted >= 8)
+ assertEquals(num_finished, num_submitted)
+
+ // inject slow io, which should cause the warmup takes longger than 10s
+ GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int,
NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:10])
+
+ // trigger and wait compaction async
+ def future = thread {
+ sql """use @${clusterName1}"""
+ trigger_and_wait_compaction("test", "cumulative")
+ }
+ // wait until the warmup for compaction started
+ waitForBrpcMetricValue(be.ip, be.rpc_port,
"file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000)
+ sleep(1000)
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_submitted_segment_num"))
+ assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num"))
+
+ // in this moment, compaction has completed, but not commited, it's
waiting for warm up
+ // trigger a query on read cluster, can't read the compaction data
+ sql """use @${clusterName2}"""
+ sql "select * from test"
+ def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+ def rowsets = tablet_status ["rowsets"]
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ if (start_version != 0) {
+ assertEquals(start_version, end_version)
+ }
+ }
+
+ // wait the compaction complete
+ future.get()
+
+ assertEquals(num_finished + 1, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num"))
+ assertEquals(0, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num"))
+
+ // a new insert will trigger the sync rowset operation in the
following query
+ sql """insert into test values (9, '{"a" : 11111.11111}')"""
+
+ // now the compaction rowsets it accessible
+ sql """use @${clusterName2}"""
+ sql "select * from test"
+ tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+ rowsets = tablet_status ["rowsets"]
+ def found_compaction_rowsets = false
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ if (start_version != 0) {
+ if (start_version != end_version) {
+ found_compaction_rowsets = true;
+ }
+ }
+ }
+ assertTrue(found_compaction_rowsets)
+
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ // checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(jobInfo[0][0], jobId)
+ assertEquals(jobInfo[0][1], clusterName1)
+ assertEquals(jobInfo[0][2], clusterName2)
+ assertEquals(jobInfo[0][4], "CLUSTER")
+ assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+ "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+ assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+ // Cancel job and confirm
+ sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+ def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(cancelInfo[0][3], "CANCELLED")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
new file mode 100644
index 00000000000..64b72dfcec5
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/cluster/test_warm_up_cluster_event_compaction_sync_wait_timeout.groovy
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import groovy.json.JsonSlurper
+
+suite('test_warm_up_cluster_event_compaction_sync_wait_timeout', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ 'warm_up_rowset_slow_log_ms=1',
+ 'enable_compaction_delay_commit_for_warm_up=true',
+ 'warm_up_rowset_sync_wait_min_timeout_ms=10000',
+ 'warm_up_rowset_sync_wait_max_timeout_ms=10000',
+ ]
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(5000)
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def getBeIpAndPort = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+
+ if (cluster_bes.isEmpty()) {
+ throw new RuntimeException("No BE found for cluster: ${cluster}")
+ }
+
+ def firstBe = cluster_bes[0]
+ return [ip: firstBe[1], http_port:firstBe[4], rpc_port: firstBe[5]]
+ }
+
+ def logFileCacheDownloadMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted = getBrpcMetrics(ip, port,
"file_cache_download_submitted_num")
+ def finished = getBrpcMetrics(ip, port,
"file_cache_download_finished_num")
+ def failed = getBrpcMetrics(ip, port,
"file_cache_download_failed_num")
+ logger.info("${cluster} be ${ip}:${port}, downloader
submitted=${submitted}"
+ + ", finished=${finished}, failed=${failed}")
+ }
+ }
+
+ def logWarmUpRowsetMetrics = { cluster ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ for (be in cluster_bes) {
+ def ip = be[1]
+ def port = be[5]
+ def submitted_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def finished_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_segment_num")
+ def failed_segment = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_segment_num")
+ def submitted_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_submitted_index_num")
+ def finished_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_finished_index_num")
+ def failed_index = getBrpcMetrics(ip, port,
"file_cache_event_driven_warm_up_failed_index_num")
+ def compaction_sync_wait = getBrpcMetrics(ip, port,
"file_cache_warm_up_rowset_wait_for_compaction_num")
+ logger.info("${cluster} be ${ip}:${port},
submitted_segment=${submitted_segment}"
+ + ", finished_segment=${finished_segment},
failed_segment=${failed_segment}"
+ + ", submitted_index=${submitted_index}"
+ + ", finished_index=${finished_index}"
+ + ", failed_index=${failed_index}"
+ + ", compaction_sync_wait=${compaction_sync_wait}")
+ }
+ }
+
+ def getTTLCacheSize = { ip, port ->
+ return getBrpcMetrics(ip, port, "ttl_cache_size")
+ }
+
+ def checkTTLCacheSizeSumEqual = { cluster1, cluster2 ->
+ def backends = sql """SHOW BACKENDS"""
+
+ def srcBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster1}\"""") }
+ def tgtBes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster2}\"""") }
+
+ long srcSum = 0
+ for (src in srcBes) {
+ def ip = src[1]
+ def port = src[5]
+ srcSum += getTTLCacheSize(ip, port)
+ }
+
+ long tgtSum = 0
+ for (tgt in tgtBes) {
+ def ip = tgt[1]
+ def port = tgt[5]
+ tgtSum += getTTLCacheSize(ip, port)
+ }
+
+ logger.info("ttl_cache_size: src=${srcSum} dst=${tgtSum}")
+ assertTrue(srcSum > 0, "ttl_cache_size should > 0")
+ assertEquals(srcSum, tgtSum)
+ }
+
+ def waitForBrpcMetricValue = { ip, port, metricName, targetValue,
timeoutMs ->
+ def delta_time = 100
+ def useTime = 0
+
+ for(int t = delta_time; t <= timeoutMs; t += delta_time){
+ try {
+ def currentValue = getBrpcMetrics(ip, port, metricName)
+
+ if (currentValue == targetValue) {
+ logger.info("BE ${ip}:${port} metric ${metricName} reached
target value: ${targetValue}")
+ return true
+ }
+
+ logger.info("BE ${ip}:${port} metric ${metricName} current
value: ${currentValue}, target: ${targetValue}")
+
+ } catch (Exception e) {
+ logger.warn("Failed to get metric ${metricName} from BE
${ip}:${port}: ${e.message}")
+ }
+
+ useTime = t
+ sleep(delta_time)
+ }
+
+ assertTrue(useTime <= timeoutMs, "waitForBrpcMetricValue timeout")
+ }
+
+ def getTabletStatus = { ip, port, tablet_id ->
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${ip}:${port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def checkFileCacheRecycle = { cluster, rowsets ->
+ def backends = sql """SHOW BACKENDS"""
+ def cluster_bes = backends.findAll {
it[19].contains("""\"compute_group_name\" : \"${cluster}\"""") }
+ assert cluster_bes.size() > 0, "No backend found for cluster
${cluster}"
+ def be = cluster_bes[0]
+ def ip = be[1]
+ def port = be[4]
+
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ def rowset_id = rowsetStr.split(" ")[4]
+
+ logger.info("rowset ${i}, start: ${start_version}, end:
${end_version}, id: ${rowset_id}")
+ def data =
Http.GET("http://${ip}:${port}/api/file_cache?op=list_cache&value=${rowset_id}_0.dat",
true)
+ logger.info("file cache data: ${data}")
+ // in this case only [2-11] and [12-12] should have data in cache
+ if ((start_version == 2 && end_version == 11) || (start_version ==
12)) {
+ assertTrue(data.size() > 0)
+ } else {
+ assertTrue(data.size() == 0)
+ }
+ }
+ }
+
+ docker(options) {
+ def clusterName1 = "warmup_source"
+ def clusterName2 = "warmup_target"
+
+ // Add two clusters
+ cluster.addBackend(1, clusterName1)
+ cluster.addBackend(1, clusterName2)
+
+ def tag1 = getCloudBeTagByName(clusterName1)
+ def tag2 = getCloudBeTagByName(clusterName2)
+
+ logger.info("Cluster tag1: {}", tag1)
+ logger.info("Cluster tag2: {}", tag2)
+
+ def jsonSlurper = new JsonSlurper()
+
+ def getJobState = { jobId ->
+ def jobStateResult = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ return jobStateResult[0][3]
+ }
+
+ // Ensure we are in source cluster
+ sql """use @${clusterName1}"""
+
+ // Start warm up job
+ def jobId_ = sql """
+ WARM UP CLUSTER ${clusterName2} WITH CLUSTER ${clusterName1}
+ PROPERTIES (
+ "sync_mode" = "event_driven",
+ "sync_event" = "load"
+ )
+ """
+
+ def jobId = jobId_[0][0]
+ logger.info("Warm-up job ID: ${jobId}")
+
+ sql """
+ create table test (
+ col0 int not null,
+ col1 variant NOT NULL
+ ) UNIQUE KEY(`col0`)
+ DISTRIBUTED BY HASH(col0) BUCKETS 1
+ PROPERTIES ("file_cache_ttl_seconds" = "3600",
"disable_auto_compaction" = "true");
+ """
+
+ clearFileCacheOnAllBackends()
+ sleep(15000)
+
+ sql """use @${clusterName1}"""
+ // load data
+ sql """insert into test values (1, '{"a" : 1.0}')"""
+ sql """insert into test values (2, '{"a" : 111.1111}')"""
+ sql """insert into test values (3, '{"a" : "11111"}')"""
+ sql """insert into test values (4, '{"a" : 1111111111}')"""
+ sql """insert into test values (5, '{"a" : 1111.11111}')"""
+ sql """insert into test values (6, '{"a" : "11111"}')"""
+ sql """insert into test values (7, '{"a" : 11111.11111}')"""
+ sql """insert into test values (7, '{"a" : 11111.11111}')"""
+ sleep(15000)
+
+ def tablets = sql_return_maparray """ show tablets from test; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+
+ def be = getBeIpAndPort(clusterName2)
+
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ def num_submitted = getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_submitted_segment_num")
+ def num_finished = getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num")
+ assertTrue(num_submitted >= 8)
+ assertEquals(num_finished, num_submitted)
+
+ // inject slow io, which should cause the warmup takes longger than 10s
+ GetDebugPoint().enableDebugPoint(be.ip, be.http_port as int,
NodeType.BE, "S3FileReader::read_at_impl.io_slow", [sleep:20])
+
+ // trigger and wait compaction async
+ def future = thread {
+ sql """use @${clusterName1}"""
+ trigger_and_wait_compaction("test", "cumulative")
+ }
+ // wait until the warmup for compaction started
+ waitForBrpcMetricValue(be.ip, be.rpc_port,
"file_cache_warm_up_rowset_wait_for_compaction_num", 1, /*timeout*/10000)
+ sleep(1000)
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ assertEquals(num_submitted + 1, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_submitted_segment_num"))
+ assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num"))
+
+ // in this moment, compaction has completed, but not commited, it's
waiting for warm up
+ // trigger a query on read cluster, can't read the compaction data
+ sql """use @${clusterName2}"""
+ sql "select * from test"
+ def tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+ def rowsets = tablet_status ["rowsets"]
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ if (start_version != 0) {
+ assertEquals(start_version, end_version)
+ }
+ }
+
+ // wait the compaction complete
+ // we inject 20s sleep on s3 file read, so the compaction will be
timeout
+ future.get()
+
+ // still not finished, so `num_finished` not change
+ assertEquals(num_finished, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_event_driven_warm_up_finished_segment_num"))
+ assertEquals(1, getBrpcMetrics(be.ip, be.rpc_port,
"file_cache_warm_up_rowset_wait_for_compaction_timeout_num"))
+
+ // a new insert will trigger the sync rowset operation in the
following query
+ sql """insert into test values (9, '{"a" : 11111.11111}')"""
+
+ // now the compaction rowsets it accessible
+ sql """use @${clusterName2}"""
+ sql "select * from test"
+ tablet_status = getTabletStatus(be.ip, be.http_port, tablet_id)
+ rowsets = tablet_status ["rowsets"]
+ def found_compaction_rowsets = false
+ for (int i = 0; i < rowsets.size(); i++) {
+ def rowsetStr = rowsets[i]
+ // [12-12] 1 DATA NONOVERLAPPING
02000000000000124843c92c13625daa8296c20957119893 1011.00 B
+ def start_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[0].toInteger()
+ def end_version = rowsetStr.split(" ")[0].replace('[',
'').replace(']', '').split("-")[1].toInteger()
+ if (start_version != 0) {
+ if (start_version != end_version) {
+ found_compaction_rowsets = true;
+ }
+ }
+ }
+ assertTrue(found_compaction_rowsets)
+
+ logFileCacheDownloadMetrics(clusterName2)
+ logWarmUpRowsetMetrics(clusterName2)
+ // checkTTLCacheSizeSumEqual(clusterName1, clusterName2)
+
+ def jobInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(jobInfo[0][0], jobId)
+ assertEquals(jobInfo[0][1], clusterName1)
+ assertEquals(jobInfo[0][2], clusterName2)
+ assertEquals(jobInfo[0][4], "CLUSTER")
+ assertTrue(jobInfo[0][3] in ["RUNNING", "PENDING"],
+ "JobState is ${jobInfo[0][3]}, expected RUNNING or PENDING")
+ assertEquals(jobInfo[0][5], "EVENT_DRIVEN (LOAD)")
+
+ // Cancel job and confirm
+ sql """CANCEL WARM UP JOB WHERE ID = ${jobId}"""
+ def cancelInfo = sql """SHOW WARM UP JOB WHERE ID = ${jobId}"""
+ assertEquals(cancelInfo[0][3], "CANCELLED")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]