This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d07a3f7787f branch-3.0: [enhancement](cloud) optimize warm up local IO 
and performance #50275 (#50527)
d07a3f7787f is described below

commit d07a3f7787f275c94941bccc3a8e3bb92a9f3004
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 30 16:59:42 2025 +0800

    branch-3.0: [enhancement](cloud) optimize warm up local IO and performance 
#50275 (#50527)
    
    Cherry-picked from #50275
    
    Co-authored-by: zhengyu <[email protected]>
---
 be/src/cloud/cloud_tablet.cpp                      |  26 +-
 be/src/cloud/cloud_warm_up_manager.cpp             |  38 +--
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/io/cache/block_file_cache_downloader.cpp    |   2 +
 be/src/io/cache/cached_remote_file_reader.cpp      |  23 +-
 be/src/io/io_common.h                              |   3 +
 be/test/io/cache/block_file_cache_test.cpp         |  89 ++++++-
 .../test_warm_up_same_table_multi_times.groovy     | 268 +++++++++++++++++++++
 9 files changed, 411 insertions(+), 40 deletions(-)

diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 5f9490ecab0..c5b29cabd9e 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -263,18 +263,18 @@ void 
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
                                     ? 0
                                     : rowset_meta->newest_write_timestamp() +
                                               _tablet_meta->ttl_seconds();
-                    _engine.file_cache_block_downloader().submit_download_task(
-                            io::DownloadFileMeta {
-                                    .path = 
storage_resource.value()->remote_segment_path(
-                                            *rowset_meta, seg_id),
-                                    .file_size = 
rs->rowset_meta()->segment_file_size(seg_id),
-                                    .file_system = 
storage_resource.value()->fs,
-                                    .ctx =
-                                            {
-                                                    .expiration_time = 
expiration_time,
-                                            },
-                                    .download_done {},
-                            });
+                    // clang-format off
+                    
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                            .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+                            .file_size = 
rs->rowset_meta()->segment_file_size(seg_id),
+                            .file_system = storage_resource.value()->fs,
+                            .ctx =
+                                    {
+                                            .expiration_time = expiration_time,
+                                            .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                                    },
+                            .download_done {},
+                    });
 
                     auto download_idx_file = [&](const io::Path& idx_path) {
                         io::DownloadFileMeta meta {
@@ -284,11 +284,13 @@ void 
CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
                                 .ctx =
                                         {
                                                 .expiration_time = 
expiration_time,
+                                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                                         },
                                 .download_done {},
                         };
                         
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
                     };
+                    // clang-format on
                     auto schema_ptr = rowset_meta->tablet_schema();
                     auto idx_version = 
schema_ptr->get_inverted_index_storage_format();
                     if (idx_version == InvertedIndexStorageFormatPB::V1) {
diff --git a/be/src/cloud/cloud_warm_up_manager.cpp 
b/be/src/cloud/cloud_warm_up_manager.cpp
index 58b5711d997..d8bce097465 100644
--- a/be/src/cloud/cloud_warm_up_manager.cpp
+++ b/be/src/cloud/cloud_warm_up_manager.cpp
@@ -113,24 +113,24 @@ void CloudWarmUpManager::handle_jobs() {
                     }
 
                     wait->add_count();
-                    _engine.file_cache_block_downloader().submit_download_task(
-                            io::DownloadFileMeta {
-                                    .path = 
storage_resource.value()->remote_segment_path(*rs,
-                                                                               
           seg_id),
-                                    .file_size = rs->segment_file_size(seg_id),
-                                    .file_system = 
storage_resource.value()->fs,
-                                    .ctx =
-                                            {
-                                                    .expiration_time = 
expiration_time,
-                                            },
-                                    .download_done =
-                                            [wait](Status st) {
-                                                if (!st) {
-                                                    LOG_WARNING("Warm up error 
").error(st);
-                                                }
-                                                wait->signal();
-                                            },
-                            });
+                    // clang-format off
+                    
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                            .path = 
storage_resource.value()->remote_segment_path(*rs, seg_id),
+                            .file_size = rs->segment_file_size(seg_id),
+                            .file_system = storage_resource.value()->fs,
+                            .ctx =
+                                    {
+                                            .expiration_time = expiration_time,
+                                            .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                                    },
+                            .download_done =
+                                    [wait](Status st) {
+                                        if (!st) {
+                                            LOG_WARNING("Warm up error 
").error(st);
+                                        }
+                                        wait->signal();
+                                    },
+                    });
 
                     auto download_idx_file = [&](const io::Path& idx_path) {
                         io::DownloadFileMeta meta {
@@ -140,6 +140,7 @@ void CloudWarmUpManager::handle_jobs() {
                                 .ctx =
                                         {
                                                 .expiration_time = 
expiration_time,
+                                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                                         },
                                 .download_done =
                                         [wait](Status st) {
@@ -149,6 +150,7 @@ void CloudWarmUpManager::handle_jobs() {
                                             wait->signal();
                                         },
                         };
+                        // clang-format on
                         
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
                     };
                     auto schema_ptr = rs->tablet_schema();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f5a56c23ebd..1bc9f557b37 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1106,6 +1106,7 @@ DEFINE_mInt64(cache_lock_held_long_tail_threshold_us, 
"30000000");
 DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
 DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
 DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
+DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
 DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
 DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
 DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b2799622d5f..941b6a13777 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1148,6 +1148,7 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
 DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
 DECLARE_mInt64(file_cache_remove_block_qps_limit);
 DECLARE_mInt64(file_cache_background_gc_interval_ms);
+DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
 DECLARE_mInt64(file_cache_background_monitor_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
 DECLARE_mInt64(file_cache_background_ttl_gc_batch);
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp 
b/be/src/io/cache/block_file_cache_downloader.cpp
index 026f7e2a017..b9944e39989 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -190,6 +190,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
                         {
                                 .is_index_data = meta.cache_type() == 
::doris::FileCacheType::INDEX,
                                 .expiration_time = meta.expiration_time(),
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
                         },
                 .download_done = std::move(download_done),
         };
@@ -229,6 +230,7 @@ void FileCacheBlockDownloader::download_segment_file(const 
DownloadFileMeta& met
         // TODO(plat1ko):
         //  1. Directly append buffer data to file cache
         //  2. Provide `FileReader::async_read()` interface
+        DCHECK(meta.ctx.is_dryrun == 
config::enable_reader_dryrun_when_download_file_cache);
         auto st = file_reader->read_at(offset, {buffer.get(), size}, 
&bytes_read, &meta.ctx);
         if (!st.ok()) {
             LOG(WARNING) << "failed to download file: " << st;
diff --git a/be/src/io/cache/cached_remote_file_reader.cpp 
b/be/src/io/cache/cached_remote_file_reader.cpp
index d1bd0b8023c..c7476b7ab74 100644
--- a/be/src/io/cache/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/cached_remote_file_reader.cpp
@@ -45,6 +45,8 @@ namespace doris::io {
 bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
 bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
 bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
+bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
+        "cached_remote_reader_skip_local_cache_io_sum_bytes");
 
 CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr 
remote_file_reader,
                                                const FileReaderOptions& opts)
@@ -110,6 +112,7 @@ std::pair<size_t, size_t> 
CachedRemoteFileReader::s_align_size(size_t offset, si
 
 Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
                                             const IOContext* io_ctx) {
+    const bool is_dryrun = io_ctx->is_dryrun;
     DCHECK(!closed());
     DCHECK(io_ctx);
     if (offset > size()) {
@@ -125,7 +128,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
     }
     ReadStatistics stats;
     auto defer_func = [&](int*) {
-        if (io_ctx->file_cache_stats) {
+        if (io_ctx->file_cache_stats && !is_dryrun) {
             // update stats in io_ctx, for query profile
             _update_stats(stats, io_ctx->file_cache_stats, 
io_ctx->is_inverted_index);
             // update stats increment in this reading procedure for file cache 
metrics
@@ -156,7 +159,9 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
                 size_t file_offset = cur_offset - iter->second->offset();
                 size_t reserve_bytes =
                         std::min(need_read_size, iter->second->range().size() 
- file_offset);
-                {
+                if (is_dryrun) [[unlikely]] {
+                    g_skip_local_cache_io_sum_bytes << reserve_bytes;
+                } else {
                     SCOPED_RAW_TIMER(&stats.local_read_timer);
                     if (!iter->second
                                  ->read(Slice(result.data + (cur_offset - 
offset), reserve_bytes),
@@ -241,7 +246,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, 
Slice result, size_t*
         }
         // copy from memory directly
         size_t right_offset = offset + bytes_req - 1;
-        if (empty_start <= right_offset && empty_end >= offset) {
+        if (empty_start <= right_offset && empty_end >= offset && !is_dryrun) {
             size_t copy_left_offset = offset < empty_start ? empty_start : 
offset;
             size_t copy_right_offset = right_offset < empty_end ? right_offset 
: empty_end;
             char* dst = result.data + (copy_left_offset - offset);
@@ -295,10 +300,14 @@ Status CachedRemoteFileReader::read_at_impl(size_t 
offset, Slice result, size_t*
              * the thread reads the data from remote too.
              */
             if (block_state == FileBlock::State::DOWNLOADED) {
-                size_t file_offset = current_offset - left;
-                SCOPED_RAW_TIMER(&stats.local_read_timer);
-                st = block->read(Slice(result.data + (current_offset - 
offset), read_size),
-                                 file_offset);
+                if (is_dryrun) [[unlikely]] {
+                    g_skip_local_cache_io_sum_bytes << read_size;
+                } else {
+                    size_t file_offset = current_offset - left;
+                    SCOPED_RAW_TIMER(&stats.local_read_timer);
+                    st = block->read(Slice(result.data + (current_offset - 
offset), read_size),
+                                     file_offset);
+                }
             }
             if (!st || block_state != FileBlock::State::DOWNLOADED) {
                 LOG(WARNING) << "Read data failed from file cache downloaded 
by others. err="
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index be56d0d63ab..d4a4e26a7c1 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -67,6 +67,9 @@ struct IOContext {
     const TUniqueId* query_id = nullptr;             // Ref
     FileCacheStatistics* file_cache_stats = nullptr; // Ref
     bool is_inverted_index = false;
+    // if is_dryrun, read IO will download data to cache but return no data to 
reader
+    // useful to skip cache data read from local disk to accelarate warm up
+    bool is_dryrun = false;
 };
 
 } // namespace io
diff --git a/be/test/io/cache/block_file_cache_test.cpp 
b/be/test/io/cache/block_file_cache_test.cpp
index 2c9bfaf2093..f71ec26b526 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -7252,9 +7252,92 @@ TEST_F(BlockFileCacheTest, validate_get_or_set_crash) {
     SyncPoint::get_instance()->disable_processing();
     SyncPoint::get_instance()->clear_all_call_backs();
 
-    //if (fs::exists(cache_base_path)) {
-    //    fs::remove_all(cache_base_path);
-    //}
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+}
+
+extern bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes;
+
+TEST_F(BlockFileCacheTest, reader_dryrun_when_download_file_cache) {
+    bool org = config::enable_reader_dryrun_when_download_file_cache;
+    config::enable_reader_dryrun_when_download_file_cache = true;
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    fs::create_directories(cache_base_path);
+    TUniqueId query_id;
+    query_id.hi = 1;
+    query_id.lo = 1;
+    io::FileCacheSettings settings;
+    settings.query_queue_size = 6291456;
+    settings.query_queue_elements = 6;
+    settings.index_queue_size = 1048576;
+    settings.index_queue_elements = 1;
+    settings.disposable_queue_size = 1048576;
+    settings.disposable_queue_elements = 1;
+    settings.capacity = 8388608;
+    settings.max_file_block_size = 1048576;
+    settings.max_query_cache_size = 0;
+    io::CacheContext context;
+    ReadStatistics rstats;
+    context.stats = &rstats;
+    context.query_id = query_id;
+    
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path, 
settings).ok());
+    FileReaderSPtr local_reader;
+    ASSERT_TRUE(global_local_filesystem()->open_file(tmp_file, &local_reader));
+    io::FileReaderOptions opts;
+    opts.cache_type = io::cache_type_from_string("file_block_cache");
+    opts.is_doris_table = true;
+    CachedRemoteFileReader reader(local_reader, opts);
+    auto key = io::BlockFileCache::hash("tmp_file");
+    EXPECT_EQ(reader._cache_hash, key);
+    EXPECT_EQ(local_reader->path().native(), reader.path().native());
+    EXPECT_EQ(local_reader->size(), reader.size());
+    EXPECT_FALSE(reader.closed());
+    EXPECT_EQ(local_reader->path().native(), 
reader.get_remote_reader()->path().native());
+    {
+        std::string buffer;
+        buffer.resize(64_kb);
+        IOContext io_ctx;
+        RuntimeProfile profile("file_cache_test");
+        FileCacheProfileReporter reporter(&profile);
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        io_ctx.is_dryrun = true;
+        size_t bytes_read {0};
+        ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                            .ok());
+        EXPECT_TRUE(std::all_of(buffer.begin(), buffer.end(), [](char c) { 
return c == '\0'; }));
+        reporter.update(&stats);
+    }
+    {
+        std::string buffer;
+        buffer.resize(64_kb);
+        IOContext io_ctx;
+        RuntimeProfile profile("file_cache_test");
+        FileCacheProfileReporter reporter(&profile);
+        FileCacheStatistics stats;
+        io_ctx.file_cache_stats = &stats;
+        io_ctx.is_dryrun = true;
+        size_t bytes_read {0};
+        ASSERT_TRUE(reader.read_at(32222, Slice(buffer.data(), buffer.size()), 
&bytes_read, &io_ctx)
+                            .ok());
+
+        EXPECT_TRUE(std::all_of(buffer.begin(), buffer.end(), [](char c) { 
return c == '\0'; }));
+        reporter.update(&stats);
+    }
+    EXPECT_EQ(g_skip_local_cache_io_sum_bytes.get_value(), 65536);
+
+    EXPECT_TRUE(reader.close().ok());
+    EXPECT_TRUE(reader.closed());
+    if (fs::exists(cache_base_path)) {
+        fs::remove_all(cache_base_path);
+    }
+    FileCacheFactory::instance()->_caches.clear();
+    FileCacheFactory::instance()->_path_to_cache.clear();
+    FileCacheFactory::instance()->_capacity = 0;
+    config::enable_reader_dryrun_when_download_file_cache = org;
 }
 
 } // namespace doris::io
diff --git 
a/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
new file mode 100644
index 00000000000..fd28dec7ddd
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/cache/multi_cluster/warm_up/table/test_warm_up_same_table_multi_times.groovy
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_warm_up_same_table_multi_times") {
+    def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="12000") """
+    def getJobState = { jobId ->
+         def jobStateResult = sql """  SHOW WARM UP JOB WHERE ID = ${jobId} """
+         return jobStateResult[0][2]
+    }
+
+    String[][] backends = sql """ show backends """
+    String backendId;
+    def backendIdToBackendIP = [:]
+    def backendIdToBackendHttpPort = [:]
+    def backendIdToBackendBrpcPort = [:]
+    for (String[] backend in backends) {
+        if (backend[9].equals("true") && 
backend[19].contains("regression_cluster_name0")) {
+            backendIdToBackendIP.put(backend[0], backend[1])
+            backendIdToBackendHttpPort.put(backend[0], backend[4])
+            backendIdToBackendBrpcPort.put(backend[0], backend[5])
+        }
+    }
+    assertEquals(backendIdToBackendIP.size(), 1)
+
+    backendId = backendIdToBackendIP.keySet()[0]
+    def url = backendIdToBackendIP.get(backendId) + ":" + 
backendIdToBackendHttpPort.get(backendId) + 
"""/api/file_cache?op=clear&sync=true"""
+    logger.info(url)
+    def clearFileCache = { check_func ->
+        httpTest {
+            endpoint ""
+            uri url
+            op "get"
+            body ""
+            check check_func
+        }
+    }
+
+    def getMetricsMethod = { check_func ->
+        httpTest {
+            endpoint backendIdToBackendIP.get(backendId) + ":" + 
backendIdToBackendBrpcPort.get(backendId)
+            uri "/brpc_metrics"
+            op "get"
+            check check_func
+        }
+    }
+
+    def s3BucketName = getS3BucketName()
+    def s3WithProperties = """WITH S3 (
+        |"AWS_ACCESS_KEY" = "${getS3AK()}",
+        |"AWS_SECRET_KEY" = "${getS3SK()}",
+        |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+        |"AWS_REGION" = "${getS3Region()}",
+        |"provider" = "${getS3Provider()}")
+        |PROPERTIES(
+        |"exec_mem_limit" = "8589934592",
+        |"load_parallelism" = "3")""".stripMargin()
+
+
+
+    sql "use @regression_cluster_name0"
+    // sql "use @compute_cluster"
+
+    def table = "customer"
+    sql new File("""${context.file.parent}/../ddl/${table}_delete.sql""").text
+    // create table if not exists
+    sql (new File("""${context.file.parent}/../ddl/${table}.sql""").text + 
ttlProperties)
+    sql """ alter table ${table} set ("disable_auto_compaction" = "true") """ 
// no influence from compaction
+
+    sleep(10000)
+
+    def load_customer_once =  {
+        def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
+        def loadLabel = table + "_" + uniqueID
+        // load data from cos
+        def loadSql = new 
File("""${context.file.parent}/../ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
 s3BucketName)
+        loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + 
s3WithProperties
+        sql loadSql
+
+        // check load state
+        while (true) {
+            def stateResult = sql "show load where Label = '${loadLabel}'"
+            def loadState = stateResult[stateResult.size() - 1][2].toString()
+            if ("CANCELLED".equalsIgnoreCase(loadState)) {
+                throw new IllegalStateException("load ${loadLabel} failed.")
+            } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+                break
+            }
+            sleep(5000)
+        }
+    }
+
+    clearFileCache.call();
+    sleep(30000)
+
+    load_customer_once()
+    load_customer_once()
+    load_customer_once()
+    load_customer_once()
+
+    def jobId = sql "warm up cluster regression_cluster_name0 with table 
customer;"
+    try {
+        sql "warm up cluster regression_cluster_name0 with table customer;"
+        assertTrue(false)
+    } catch (Exception e) {
+        assertTrue(true)
+    }
+    int retryTime = 120
+    int j = 0
+    for (; j < retryTime; j++) {
+        sleep(1000)
+        def status = getJobState(jobId[0][0])
+        logger.info(status)
+        if (status.equals("CANCELLED")) {
+            assertTrue(false);
+        }
+        if (status.equals("FINISHED")) {
+            break;
+        }
+    }
+    if (j == retryTime) {
+        sql "cancel warm up job where id = ${jobId[0][0]}"
+        assertTrue(false);
+    }
+    sleep(30000)
+    long ttl_cache_size = 0
+    getMetricsMethod.call() {
+        respCode, body ->
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def strs = out.split('\n')
+            Boolean flag = false;
+            for (String line in strs) {
+                if (line.contains("ttl_cache_size")) {
+                    if (line.startsWith("#")) {
+                        continue
+                    }
+                    def i = line.indexOf(' ')
+                    ttl_cache_size = line.substring(i).toLong()
+                    flag = true
+                    break
+                }
+            }
+            assertTrue(flag)
+    }
+
+    long skip_io_bytes_start = 0;
+    getMetricsMethod.call() {
+        respCode, body ->
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def strs = out.split('\n')
+            Boolean flag = false;
+            for (String line in strs) {
+                if 
(line.contains("cached_remote_reader_skip_local_cache_io_sum_bytes")) {
+                    if (line.startsWith("#")) {
+                        continue
+                    }
+                    def i = line.indexOf(' ')
+                    skip_io_bytes_start = line.substring(i).toLong()
+                    flag = true
+                    break
+                }
+            }
+            assertTrue(flag)
+    }
+
+    // AGAIN! regression_cluster_name1
+    jobId = sql "warm up cluster regression_cluster_name0 with table customer;"
+
+    retryTime = 120
+    j = 0
+    for (; j < retryTime; j++) {
+        sleep(1000)
+        def status = getJobState(jobId[0][0])
+        logger.info(status)
+        if (status.equals("CANCELLED")) {
+            assertTrue(false);
+        }
+        if (status.equals("FINISHED")) {
+            break;
+        }
+    }
+    if (j == retryTime) {
+        sql "cancel warm up job where id = ${jobId[0][0]}"
+        assertTrue(false);
+    }
+    sleep(30000)
+    ttl_cache_size = 0
+    getMetricsMethod.call() {
+        respCode, body ->
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def strs = out.split('\n')
+            Boolean flag = false;
+            for (String line in strs) {
+                if (line.contains("ttl_cache_size")) {
+                    if (line.startsWith("#")) {
+                        continue
+                    }
+                    def i = line.indexOf(' ')
+                    ttl_cache_size = line.substring(i).toLong()
+                    flag = true
+                    break
+                }
+            }
+            assertTrue(flag)
+    }
+
+    getMetricsMethod.call() {
+        respCode, body ->
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def strs = out.split('\n')
+            Boolean flag = false;
+            for (String line in strs) {
+                if (line.contains("ttl_cache_size")) {
+                    if (line.startsWith("#")) {
+                        continue
+                    }
+                    def i = line.indexOf(' ')
+                    assertEquals(ttl_cache_size, line.substring(i).toLong())
+                    flag = true
+                    break
+                }
+            }
+            assertTrue(flag)
+    }
+
+    long skip_io_bytes_end = 0;
+    getMetricsMethod.call() {
+        respCode, body ->
+            assertEquals("${respCode}".toString(), "200")
+            String out = "${body}".toString()
+            def strs = out.split('\n')
+            Boolean flag = false;
+            for (String line in strs) {
+                if 
(line.contains("cached_remote_reader_skip_local_cache_io_sum_bytes")) {
+                    if (line.startsWith("#")) {
+                        continue
+                    }
+                    def i = line.indexOf(' ')
+                    skip_io_bytes_end = line.substring(i).toLong()
+                    flag = true
+                    break
+                }
+            }
+            assertTrue(flag)
+    }
+    long diff = skip_io_bytes_end - skip_io_bytes_start;
+    println("skip_io_bytes diff: " + diff);
+    assertTrue(diff > 1000);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to