This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d07a3f7787f branch-3.0: [enhancement](cloud) optimize warm up local IO
and performance #50275 (#50527)
d07a3f7787f is described below
commit d07a3f7787f275c94941bccc3a8e3bb92a9f3004
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 30 16:59:42 2025 +0800
branch-3.0: [enhancement](cloud) optimize warm up local IO and performance
#50275 (#50527)
Cherry-picked from #50275
Co-authored-by: zhengyu <[email protected]>
---
be/src/cloud/cloud_tablet.cpp | 26 +-
be/src/cloud/cloud_warm_up_manager.cpp | 38 +--
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/cache/block_file_cache_downloader.cpp | 2 +
be/src/io/cache/cached_remote_file_reader.cpp | 23 +-
be/src/io/io_common.h | 3 +
be/test/io/cache/block_file_cache_test.cpp | 89 ++++++-
.../test_warm_up_same_table_multi_times.groovy | 268 +++++++++++++++++++++
9 files changed, 411 insertions(+), 40 deletions(-)
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 5f9490ecab0..c5b29cabd9e 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -263,18 +263,18 @@ void
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
- _engine.file_cache_block_downloader().submit_download_task(
- io::DownloadFileMeta {
- .path =
storage_resource.value()->remote_segment_path(
- *rowset_meta, seg_id),
- .file_size =
rs->rowset_meta()->segment_file_size(seg_id),
- .file_system =
storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time =
expiration_time,
- },
- .download_done {},
- });
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+ .file_size =
rs->rowset_meta()->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {},
+ });
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
@@ -284,11 +284,13 @@ void
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
.ctx =
{
.expiration_time =
expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.download_done {},
};
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
+ // clang-format on
auto schema_ptr = rowset_meta->tablet_schema();
auto idx_version =
schema_ptr->get_inverted_index_storage_format();
if (idx_version == InvertedIndexStorageFormatPB::V1) {
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp
b/be/src/cloud/cloud_warm_up_manager.cpp
index 58b5711d997..d8bce097465 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -113,24 +113,24 @@ void CloudWarmUpManager::handle_jobs() {
}
wait->add_count();
- _engine.file_cache_block_downloader().submit_download_task(
- io::DownloadFileMeta {
- .path =
storage_resource.value()->remote_segment_path(*rs,
-
seg_id),
- .file_size = rs->segment_file_size(seg_id),
- .file_system =
storage_resource.value()->fs,
- .ctx =
- {
- .expiration_time =
expiration_time,
- },
- .download_done =
- [wait](Status st) {
- if (!st) {
- LOG_WARNING("Warm up error
").error(st);
- }
- wait->signal();
- },
- });
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rs, seg_id),
+ .file_size = rs->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done =
+ [wait](Status st) {
+ if (!st) {
+ LOG_WARNING("Warm up error
").error(st);
+ }
+ wait->signal();
+ },
+ });
auto download_idx_file = [&](const io::Path& idx_path) {
io::DownloadFileMeta meta {
@@ -140,6 +140,7 @@ void CloudWarmUpManager::handle_jobs() {
.ctx =
{
.expiration_time =
expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.download_done =
[wait](Status st) {
@@ -149,6 +150,7 @@ void CloudWarmUpManager::handle_jobs() {
wait->signal();
},
};
+ // clang-format on
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
};
auto schema_ptr = rs->tablet_schema();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f5a56c23ebd..1bc9f557b37 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1106,6 +1106,7 @@ DEFINE_mInt64(cache_lock_held_long_tail_threshold_us,
"30000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
+DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b2799622d5f..941b6a13777 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1148,6 +1148,7 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
+DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index 026f7e2a017..b9944e39989 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -190,6 +190,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
{
.is_index_data = meta.cache_type() ==
::doris::FileCacheType::INDEX,
.expiration_time = meta.expiration_time(),
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
},
.download_done = std::move(download_done),
};
@@ -229,6 +230,7 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
// TODO(plat1ko):
// 1. Directly append buffer data to file cache
// 2. Provide `FileReader::async_read()` interface
+ DCHECK(meta.ctx.is_dryrun ==
config::enable_reader_dryrun_when_download_file_cache);
auto st = file_reader->read_at(offset, {buffer.get(), size},
&bytes_read, &meta.ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to download file: " << st;
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp
b/be/src/io/cache/cached_remote_file_reader.cpp
index d1bd0b8023c..c7476b7ab74 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -45,6 +45,8 @@ namespace doris::io {
bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
+bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
+ "cached_remote_reader_skip_local_cache_io_sum_bytes");
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr
remote_file_reader,
const FileReaderOptions& opts)
@@ -110,6 +112,7 @@ std::pair<size_t, size_t>
CachedRemoteFileReader::s_align_size(size_t offset, si
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
const IOContext* io_ctx) {
+ const bool is_dryrun = io_ctx->is_dryrun;
DCHECK(!closed());
DCHECK(io_ctx);
if (offset > size()) {
@@ -125,7 +128,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
}
ReadStatistics stats;
auto defer_func = [&](int*) {
- if (io_ctx->file_cache_stats) {
+ if (io_ctx->file_cache_stats && !is_dryrun) {
// update stats in io_ctx, for query profile
_update_stats(stats, io_ctx->file_cache_stats,
io_ctx->is_inverted_index);
// update stats increment in this reading procedure for file cache
metrics
@@ -156,7 +159,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
size_t file_offset = cur_offset - iter->second->offset();
size_t reserve_bytes =
std::min(need_read_size, iter->second->range().size()
- file_offset);
- {
+ if (is_dryrun) [[unlikely]] {
+ g_skip_local_cache_io_sum_bytes << reserve_bytes;
+ } else {
SCOPED_RAW_TIMER(&stats.local_read_timer);
if (!iter->second
->read(Slice(result.data + (cur_offset -
offset), reserve_bytes),
@@ -241,7 +246,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset,
Slice result, size_t*
}
// copy from memory directly
size_t right_offset = offset + bytes_req - 1;
- if (empty_start <= right_offset && empty_end >= offset) {
+ if (empty_start <= right_offset && empty_end >= offset && !is_dryrun) {
size_t copy_left_offset = offset < empty_start ? empty_start :
offset;
size_t copy_right_offset = right_offset < empty_end ? right_offset
: empty_end;
char* dst = result.data + (copy_left_offset - offset);
@@ -295,10 +300,14 @@ Status CachedRemoteFileReader::read_at_impl(size_t
offset, Slice result, size_t*
* the thread reads the data from remote too.
*/
if (block_state == FileBlock::State::DOWNLOADED) {
- size_t file_offset = current_offset - left;
- SCOPED_RAW_TIMER(&stats.local_read_timer);
- st = block->read(Slice(result.data + (current_offset -
offset), read_size),
- file_offset);
+ if (is_dryrun) [[unlikely]] {
+ g_skip_local_cache_io_sum_bytes << read_size;
+ } else {
+ size_t file_offset = current_offset - left;
+ SCOPED_RAW_TIMER(&stats.local_read_timer);
+ st = block->read(Slice(result.data + (current_offset -
offset), read_size),
+ file_offset);
+ }
}
if (!st || block_state != FileBlock::State::DOWNLOADED) {
LOG(WARNING) << "Read data failed from file cache downloaded
by others. err="
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index be56d0d63ab..d4a4e26a7c1 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -67,6 +67,9 @@ struct IOContext {
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
bool is_inverted_index = false;
+ // if is_dryrun, read IO will download data to cache but return no data to
reader
+ // useful to skip cache data read from local disk to accelarate warm up
+ bool is_dryrun = false;
};
} // namespace io
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 2c9bfaf2093..f71ec26b526 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7252,9 +7252,92 @@ TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- //if (fs::exists(cache_base_path)) {
- // fs::remove_all(cache_base_path);
- //}
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+}
+
+extern bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes;
+
+TEST_F(BlockFileCacheTest, reader_dryrun_when_download_file_cache) {
+ bool org = config::enable_reader_dryrun_when_download_file_cache;
+ config::enable_reader_dryrun_when_download_file_cache = true;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 6291456;
+ settings.query_queue_elements = 6;
+ settings.index_queue_size = 1048576;
+ settings.index_queue_elements = 1;
+ settings.disposable_queue_size = 1048576;
+ settings.disposable_queue_elements = 1;
+ settings.capacity = 8388608;
+ settings.max_file_block_size = 1048576;
+ settings.max_query_cache_size = 0;
+ io::CacheContext context;
+ ReadStatistics rstats;
+ context.stats = &rstats;
+ context.query_id = query_id;
+
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
+ FileReaderSPtr local_reader;
+ ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+ io::FileReaderOptions opts;
+ opts.cache_type = io::cache_type_from_string("file_block_cache");
+ opts.is_doris_table = true;
+ CachedRemoteFileReader reader(local_reader, opts);
+ auto key = io::BlockFileCache::hash("tmp_file");
+ EXPECT_EQ(reader._cache_hash, key);
+ EXPECT_EQ(local_reader->path().native(), reader.path().native());
+ EXPECT_EQ(local_reader->size(), reader.size());
+ EXPECT_FALSE(reader.closed());
+ EXPECT_EQ(local_reader->path().native(),
reader.get_remote_reader()->path().native());
+ {
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ RuntimeProfile profile("file_cache_test");
+ FileCacheProfileReporter reporter(&profile);
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ io_ctx.is_dryrun = true;
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+ EXPECT_TRUE(std::all_of(buffer.begin(), buffer.end(), [](char c) {
return c == '\0'; }));
+ reporter.update(&stats);
+ }
+ {
+ std::string buffer;
+ buffer.resize(64_kb);
+ IOContext io_ctx;
+ RuntimeProfile profile("file_cache_test");
+ FileCacheProfileReporter reporter(&profile);
+ FileCacheStatistics stats;
+ io_ctx.file_cache_stats = &stats;
+ io_ctx.is_dryrun = true;
+ size_t bytes_read {0};
+ ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()),
&bytes_read, &io_ctx)
+ .ok());
+
+ EXPECT_TRUE(std::all_of(buffer.begin(), buffer.end(), [](char c) {
return c == '\0'; }));
+ reporter.update(&stats);
+ }
+ EXPECT_EQ(g_skip_local_cache_io_sum_bytes.get_value(), 65536);
+
+ EXPECT_TRUE(reader.close().ok());
+ EXPECT_TRUE(reader.closed());
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ FileCacheFactory::instance()->_caches.clear();
+ FileCacheFactory::instance()->_path_to_cache.clear();
+ FileCacheFactory::instance()->_capacity = 0;
+ config::enable_reader_dryrun_when_download_file_cache = org;
}
} // namespace doris::io
diff --git
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
new file mode 100644
index 00000000000..fd28dec7ddd
--- /dev/null
+++
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_warm_up_same_table_multi_times") {
+ def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="12000") """
+ def getJobState = { jobId ->
+ def jobStateResult = sql """ SHOW WARM UP JOB WHERE ID = ${jobId} """
+ return jobStateResult[0][2]
+ }
+
+ String[][] backends = sql """ show backends """
+ String backendId;
+ def backendIdToBackendIP = [:]
+ def backendIdToBackendHttpPort = [:]
+ def backendIdToBackendBrpcPort = [:]
+ for (String[] backend in backends) {
+ if (backend[9].equals("true") &&
backend[19].contains("regression_cluster_name0")) {
+ backendIdToBackendIP.put(backend[0], backend[1])
+ backendIdToBackendHttpPort.put(backend[0], backend[4])
+ backendIdToBackendBrpcPort.put(backend[0], backend[5])
+ }
+ }
+ assertEquals(backendIdToBackendIP.size(), 1)
+
+ backendId = backendIdToBackendIP.keySet()[0]
+ def url = backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendHttpPort.get(backendId) +
"""/api/file_cache?op=clear&sync=true"""
+ logger.info(url)
+ def clearFileCache = { check_func ->
+ httpTest {
+ endpoint ""
+ uri url
+ op "get"
+ body ""
+ check check_func
+ }
+ }
+
+ def getMetricsMethod = { check_func ->
+ httpTest {
+ endpoint backendIdToBackendIP.get(backendId) + ":" +
backendIdToBackendBrpcPort.get(backendId)
+ uri "/brpc_metrics"
+ op "get"
+ check check_func
+ }
+ }
+
+ def s3BucketName = getS3BucketName()
+ def s3WithProperties = """WITH S3 (
+ |"AWS_ACCESS_KEY" = "${getS3AK()}",
+ |"AWS_SECRET_KEY" = "${getS3SK()}",
+ |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+ |"AWS_REGION" = "${getS3Region()}",
+ |"provider" = "${getS3Provider()}")
+ |PROPERTIES(
+ |"exec_mem_limit" = "8589934592",
+ |"load_parallelism" = "3")""".stripMargin()
+
+
+
+ sql "use @regression_cluster_name0"
+ // sql "use @compute_cluster"
+
+ def table = "customer"
+ sql new File("""${context.file.parent}/../ddl/${table}_delete.sql""").text
+ // create table if not exists
+ sql (new File("""${context.file.parent}/../ddl/${table}.sql""").text +
ttlProperties)
+ sql """ alter table ${table} set ("disable_auto_compaction" = "true") """
// no influence from compaction
+
+ sleep(10000)
+
+ def load_customer_once = {
+ def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
+ def loadLabel = table + "_" + uniqueID
+ // load data from cos
+ def loadSql = new
File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
s3BucketName)
+ loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) +
s3WithProperties
+ sql loadSql
+
+ // check load state
+ while (true) {
+ def stateResult = sql "show load where Label = '${loadLabel}'"
+ def loadState = stateResult[stateResult.size() - 1][2].toString()
+ if ("CANCELLED".equalsIgnoreCase(loadState)) {
+ throw new IllegalStateException("load ${loadLabel} failed.")
+ } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+ break
+ }
+ sleep(5000)
+ }
+ }
+
+ clearFileCache.call();
+ sleep(30000)
+
+ load_customer_once()
+ load_customer_once()
+ load_customer_once()
+ load_customer_once()
+
+ def jobId = sql "warm up cluster regression_cluster_name0 with table
customer;"
+ try {
+ sql "warm up cluster regression_cluster_name0 with table customer;"
+ assertTrue(false)
+ } catch (Exception e) {
+ assertTrue(true)
+ }
+ int retryTime = 120
+ int j = 0
+ for (; j < retryTime; j++) {
+ sleep(1000)
+ def status = getJobState(jobId[0][0])
+ logger.info(status)
+ if (status.equals("CANCELLED")) {
+ assertTrue(false);
+ }
+ if (status.equals("FINISHED")) {
+ break;
+ }
+ }
+ if (j == retryTime) {
+ sql "cancel warm up job where id = ${jobId[0][0]}"
+ assertTrue(false);
+ }
+ sleep(30000)
+ long ttl_cache_size = 0
+ getMetricsMethod.call() {
+ respCode, body ->
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ Boolean flag = false;
+ for (String line in strs) {
+ if (line.contains("ttl_cache_size")) {
+ if (line.startsWith("#")) {
+ continue
+ }
+ def i = line.indexOf(' ')
+ ttl_cache_size = line.substring(i).toLong()
+ flag = true
+ break
+ }
+ }
+ assertTrue(flag)
+ }
+
+ long skip_io_bytes_start = 0;
+ getMetricsMethod.call() {
+ respCode, body ->
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ Boolean flag = false;
+ for (String line in strs) {
+ if
(line.contains("cached_remote_reader_skip_local_cache_io_sum_bytes")) {
+ if (line.startsWith("#")) {
+ continue
+ }
+ def i = line.indexOf(' ')
+ skip_io_bytes_start = line.substring(i).toLong()
+ flag = true
+ break
+ }
+ }
+ assertTrue(flag)
+ }
+
+ // AGAIN! regression_cluster_name1
+ jobId = sql "warm up cluster regression_cluster_name0 with table customer;"
+
+ retryTime = 120
+ j = 0
+ for (; j < retryTime; j++) {
+ sleep(1000)
+ def status = getJobState(jobId[0][0])
+ logger.info(status)
+ if (status.equals("CANCELLED")) {
+ assertTrue(false);
+ }
+ if (status.equals("FINISHED")) {
+ break;
+ }
+ }
+ if (j == retryTime) {
+ sql "cancel warm up job where id = ${jobId[0][0]}"
+ assertTrue(false);
+ }
+ sleep(30000)
+ ttl_cache_size = 0
+ getMetricsMethod.call() {
+ respCode, body ->
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ Boolean flag = false;
+ for (String line in strs) {
+ if (line.contains("ttl_cache_size")) {
+ if (line.startsWith("#")) {
+ continue
+ }
+ def i = line.indexOf(' ')
+ ttl_cache_size = line.substring(i).toLong()
+ flag = true
+ break
+ }
+ }
+ assertTrue(flag)
+ }
+
+ getMetricsMethod.call() {
+ respCode, body ->
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ Boolean flag = false;
+ for (String line in strs) {
+ if (line.contains("ttl_cache_size")) {
+ if (line.startsWith("#")) {
+ continue
+ }
+ def i = line.indexOf(' ')
+ assertEquals(ttl_cache_size, line.substring(i).toLong())
+ flag = true
+ break
+ }
+ }
+ assertTrue(flag)
+ }
+
+ long skip_io_bytes_end = 0;
+ getMetricsMethod.call() {
+ respCode, body ->
+ assertEquals("${respCode}".toString(), "200")
+ String out = "${body}".toString()
+ def strs = out.split('\n')
+ Boolean flag = false;
+ for (String line in strs) {
+ if
(line.contains("cached_remote_reader_skip_local_cache_io_sum_bytes")) {
+ if (line.startsWith("#")) {
+ continue
+ }
+ def i = line.indexOf(' ')
+ skip_io_bytes_end = line.substring(i).toLong()
+ flag = true
+ break
+ }
+ }
+ assertTrue(flag)
+ }
+ long diff = skip_io_bytes_end - skip_io_bytes_start;
+ println("skip_io_bytes diff: " + diff);
+ assertTrue(diff > 1000);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]