This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 4442e420a0f branch-4.1: [test](microbench) Add some api of get_or_set 
#64691 (#65018)
4442e420a0f is described below

commit 4442e420a0f0b450b1a54a698ac143224dfd5637
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 1 17:23:29 2026 +0800

    branch-4.1: [test](microbench) Add some api of get_or_set #64691 (#65018)
    
    Cherry-picked from #64691
    
    Co-authored-by: chunping <[email protected]>
---
 be/src/io/tools/file_cache_microbench.cpp | 349 +++++++++++++++++++++++++++++-
 1 file changed, 344 insertions(+), 5 deletions(-)

diff --git a/be/src/io/tools/file_cache_microbench.cpp 
b/be/src/io/tools/file_cache_microbench.cpp
index 390782a4c8d..824c4276337 100644
--- a/be/src/io/tools/file_cache_microbench.cpp
+++ b/be/src/io/tools/file_cache_microbench.cpp
@@ -94,6 +94,7 @@ using namespace doris;
 
 bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
 bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
+bvar::LatencyRecorder 
microbench_get_or_set_latency("file_cache_microbench_get_or_set");
 
 const std::string HIDDEN_PREFIX = "test_file_cache_microbench/";
 const char PAD_CHAR = 'x';
@@ -515,6 +516,20 @@ std::string get_usage(const std::string& progname) {
           "read_length": [<left>, <right>]     // Range for reading length 
(left inclusive, right exclusive)
         }
 
+        Submit a direct file cache get_or_set benchmark job:
+        {
+          "job_type": "file_cache_get_or_set",
+          "file_prefix": "<prefix>",
+          "num_threads": <count>,
+          "num_files": <count>,
+          "repeat": <count>,                   // get_or_set calls per 
generated cache key
+          "size_bytes_perfile": <size>,        // Virtual file size used to 
clip offset + size
+          "get_or_set_offset": [<left>, <right>],
+          "get_or_set_size": [<left>, <right>],
+          "cache_type": <type>,                // NORMAL | TTL | INDEX | 
DISPOSABLE
+          "expiration": <timestamp>            // Required for TTL cache type
+        }
+
       GET /get_job_status/<job_id>
         Retrieve the status of a submitted job.
         Parameters:
@@ -570,8 +585,21 @@ std::string get_usage(const std::string& progname) {
 }
 
 // Job configuration structure
+enum class JobType { FILE_CACHE, FILE_CACHE_GET_OR_SET };
+
+static std::string job_type_to_string(JobType type) {
+    switch (type) {
+    case JobType::FILE_CACHE:
+        return "file_cache";
+    case JobType::FILE_CACHE_GET_OR_SET:
+        return "file_cache_get_or_set";
+    }
+    return "unknown";
+}
+
 struct JobConfig {
     // Default value initialization
+    JobType job_type = JobType::FILE_CACHE;
     int64_t size_bytes_perfile = 1024 * 1024;
     int32_t write_iops = 0;
     int32_t read_iops = 0;
@@ -586,6 +614,10 @@ struct JobConfig {
     int64_t read_offset_right = 0;
     int64_t read_length_left = 0;
     int64_t read_length_right = 0;
+    int64_t get_or_set_offset_left = 0;
+    int64_t get_or_set_offset_right = 1;
+    int64_t get_or_set_size_left = 4096;
+    int64_t get_or_set_size_right = 4097;
     bool write_file_cache = true;
     bool bvar_enable = false;
 
@@ -599,21 +631,46 @@ struct JobConfig {
             throw std::runtime_error("JSON parse error json args=" + json_str);
         }
 
+        config.job_type = parse_job_type(d);
+
         // Basic validation
         validate(d);
 
         // Use helper functions to parse each field
         parse_basic_fields(d, config);
         parse_cache_settings(d, config);
-        parse_read_settings(d, config);
 
-        // Additional validation
-        validate_config(config);
+        if (config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+            parse_get_or_set_fields(d, config);
+            validate_get_or_set_config(config);
+        } else {
+            parse_read_settings(d, config);
+
+            // Additional validation
+            validate_config(config);
+        }
 
         return config;
     }
 
 private:
+    static JobType parse_job_type(const rapidjson::Document& d) {
+        if (!d.HasMember("job_type")) {
+            return JobType::FILE_CACHE;
+        }
+        if (!d["job_type"].IsString()) {
+            throw std::runtime_error("job_type must be a string");
+        }
+        std::string type = d["job_type"].GetString();
+        if (type == "file_cache") {
+            return JobType::FILE_CACHE;
+        }
+        if (type == "file_cache_get_or_set") {
+            return JobType::FILE_CACHE_GET_OR_SET;
+        }
+        throw std::runtime_error("unsupported job_type: " + type);
+    }
+
     // Validate the JSON document
     static void validate(const rapidjson::Document& json_data) {
         if (!json_data.HasMember("file_prefix") || 
!json_data["file_prefix"].IsString() ||
@@ -681,6 +738,32 @@ private:
         }
     }
 
+    static void parse_get_or_set_fields(const rapidjson::Document& d, 
JobConfig& config) {
+        if (d.HasMember("get_or_set_offset") && 
d["get_or_set_offset"].IsArray() &&
+            d["get_or_set_offset"].Size() == 2) {
+            const rapidjson::Value& offset_array = d["get_or_set_offset"];
+            config.get_or_set_offset_left = offset_array[0].GetInt64();
+            config.get_or_set_offset_right = offset_array[1].GetInt64();
+        } else if (d.HasMember("read_offset") && d["read_offset"].IsArray() &&
+                   d["read_offset"].Size() == 2) {
+            const rapidjson::Value& offset_array = d["read_offset"];
+            config.get_or_set_offset_left = offset_array[0].GetInt64();
+            config.get_or_set_offset_right = offset_array[1].GetInt64();
+        }
+
+        if (d.HasMember("get_or_set_size") && d["get_or_set_size"].IsArray() &&
+            d["get_or_set_size"].Size() == 2) {
+            const rapidjson::Value& size_array = d["get_or_set_size"];
+            config.get_or_set_size_left = size_array[0].GetInt64();
+            config.get_or_set_size_right = size_array[1].GetInt64();
+        } else if (d.HasMember("read_length") && d["read_length"].IsArray() &&
+                   d["read_length"].Size() == 2) {
+            const rapidjson::Value& size_array = d["read_length"];
+            config.get_or_set_size_left = size_array[0].GetInt64();
+            config.get_or_set_size_right = size_array[1].GetInt64();
+        }
+    }
+
     // Parse read-related settings
     static void parse_read_settings(const rapidjson::Document& d, JobConfig& 
config) {
         if (config.read_iops > 0) {
@@ -731,8 +814,50 @@ private:
         }
     }
 
+    static void validate_get_or_set_config(const JobConfig& config) {
+        if (config.num_threads <= 0 || config.num_threads > 10000) {
+            throw std::runtime_error("num_threads must be between 1 and 
10000");
+        }
+        if (config.num_files <= 0) {
+            throw std::runtime_error("num_files must be positive");
+        }
+        if (config.repeat <= 0) {
+            throw std::runtime_error("repeat must be positive");
+        }
+        if (config.size_bytes_perfile <= 0) {
+            throw std::runtime_error("size_bytes_perfile must be positive");
+        }
+        if (config.get_or_set_offset_left < 0 ||
+            config.get_or_set_offset_left >= config.get_or_set_offset_right) {
+            throw std::runtime_error(
+                    "get_or_set_offset must be [left, right) with 0 <= left < 
right");
+        }
+        if (config.get_or_set_size_left <= 0 ||
+            config.get_or_set_size_left >= config.get_or_set_size_right) {
+            throw std::runtime_error("get_or_set_size must be [left, right) 
with 0 < left < right");
+        }
+        if (config.cache_type != "NORMAL" && config.cache_type != "TTL" &&
+            config.cache_type != "INDEX" && config.cache_type != "DISPOSABLE") 
{
+            throw std::runtime_error(
+                    "cache_type must be NORMAL, TTL, INDEX, or DISPOSABLE for 
get_or_set jobs");
+        }
+        if (config.cache_type == "TTL" && config.expiration <= 0) {
+            throw std::runtime_error("expiration must be positive when cache 
type is TTL");
+        }
+    }
+
 public:
     std::string to_string() const {
+        if (job_type == JobType::FILE_CACHE_GET_OR_SET) {
+            return fmt::format(
+                    "job_type: {}, num_threads: {}, num_files: {}, 
file_prefix: {}, "
+                    "size_bytes_perfile: {}, repeat: {}, cache_type: {}, 
expiration: {}, "
+                    "get_or_set_offset: [{}, {}), get_or_set_size: [{}, {})",
+                    job_type_to_string(job_type), num_threads, num_files,
+                    get_prefix() + file_prefix, size_bytes_perfile, repeat, 
cache_type, expiration,
+                    get_or_set_offset_left, get_or_set_offset_right, 
get_or_set_size_left,
+                    get_or_set_size_right);
+        }
         return fmt::format(
                 "size_bytes_perfile: {}, write_iops: {}, read_iops: {}, 
num_threads: {}, "
                 "num_files: {}, file_prefix: {}, write_file_cache: {}, 
write_batch_size: {}, "
@@ -781,6 +906,10 @@ struct Job {
         int64_t lock_wait_timer = 0;
         int64_t get_timer = 0;
         int64_t set_timer = 0;
+        int64_t get_or_set_ops = 0;
+        int64_t get_or_set_blocks = 0;
+        double get_or_set_qps = 0;
+        double avg_get_or_set_us = 0;
     } stats;
 
     // Record associated file information for the job
@@ -1291,6 +1420,11 @@ private:
         JobConfig& config = job.config;
         LOG(INFO) << "Executing job " << job_id << " with config: " << 
config.to_string();
 
+        if (config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+            execute_file_cache_get_or_set(job);
+            return;
+        }
+
         // Generate multiple keys
         std::vector<std::string> keys;
         keys.reserve(config.num_files);
@@ -1329,6 +1463,27 @@ private:
         LOG(INFO) << "Job " << job_id << " execution completed";
     }
 
+    static doris::io::FileCacheType parse_file_cache_type(const std::string& 
cache_type) {
+        if (cache_type == "TTL") {
+            return doris::io::FileCacheType::TTL;
+        }
+        if (cache_type == "INDEX") {
+            return doris::io::FileCacheType::INDEX;
+        }
+        if (cache_type == "DISPOSABLE") {
+            return doris::io::FileCacheType::DISPOSABLE;
+        }
+        return doris::io::FileCacheType::NORMAL;
+    }
+
+    static size_t pick_range_value(std::mt19937_64& gen, int64_t left, int64_t 
right) {
+        if (left + 1 == right) {
+            return static_cast<size_t>(left);
+        }
+        std::uniform_int_distribution<int64_t> dis(left, right - 1);
+        return static_cast<size_t>(dis(gen));
+    }
+
     doris::S3ClientConf create_s3_client_conf(const JobConfig& config) {
         doris::S3ClientConf s3_conf;
         s3_conf.max_connections = std::max(256, config.num_threads * 4);
@@ -1431,6 +1586,123 @@ private:
         LOG(INFO) << "Total write time: " << job.stats.total_write_time << " 
seconds";
     }
 
+    void execute_file_cache_get_or_set(Job& job) {
+        const JobConfig& config = job.config;
+        LOG(INFO) << "Executing file_cache_get_or_set job " << job.job_id
+                  << " with config: " << config.to_string();
+
+        std::vector<std::string> keys;
+        keys.reserve(config.num_files);
+        for (int i = 0; i < config.num_files; ++i) {
+            keys.push_back(get_prefix() + config.file_prefix + "/" + 
job.job_id + "_" +
+                           std::to_string(i));
+        }
+
+        BenchThreadPool pool(config.num_threads);
+        std::vector<std::future<void>> futures;
+        futures.reserve(keys.size());
+        std::mutex stats_mutex;
+        std::mutex error_mutex;
+        doris::io::ReadStatistics total_stats;
+        std::atomic<int64_t> total_ops {0};
+        std::atomic<int64_t> total_blocks {0};
+        std::atomic<bool> has_error {false};
+        std::string first_error;
+
+        doris::MonotonicStopWatch stopwatch;
+        stopwatch.start();
+        for (const auto& key : keys) {
+            futures.push_back(pool.enqueue([&, key]() {
+                try {
+                    doris::io::ReadStatistics local_stats;
+                    int64_t local_ops = 0;
+                    int64_t local_blocks = 0;
+                    doris::io::FileCacheType cache_type = 
parse_file_cache_type(config.cache_type);
+
+                    auto hash = doris::io::BlockFileCache::hash(key);
+                    doris::io::BlockFileCache* cache =
+                            
doris::io::FileCacheFactory::instance()->get_by_path(hash);
+                    if (cache == nullptr) {
+                        throw std::runtime_error("No file cache instance found 
for key " + key);
+                    }
+
+                    std::mt19937_64 gen(std::hash<std::string> {}(key));
+                    for (int64_t i = 0; i < config.repeat; ++i) {
+                        size_t offset = pick_range_value(gen, 
config.get_or_set_offset_left,
+                                                         
config.get_or_set_offset_right);
+                        size_t size = pick_range_value(gen, 
config.get_or_set_size_left,
+                                                       
config.get_or_set_size_right);
+                        if (offset >= 
static_cast<size_t>(config.size_bytes_perfile)) {
+                            offset = 
static_cast<size_t>(config.size_bytes_perfile - 1);
+                        }
+                        if (offset + size > 
static_cast<size_t>(config.size_bytes_perfile)) {
+                            size = 
static_cast<size_t>(config.size_bytes_perfile) - offset;
+                        }
+
+                        doris::MonotonicStopWatch single_sw;
+                        single_sw.start();
+                        doris::io::CacheContext cache_context;
+                        cache_context.stats = &local_stats;
+                        cache_context.cache_type = cache_type;
+                        cache_context.expiration_time = config.expiration;
+                        auto holder = cache->get_or_set(hash, offset, size, 
cache_context);
+                        single_sw.stop();
+                        local_stats.cache_get_or_set_timer += 
single_sw.elapsed_time();
+                        local_blocks += holder.file_blocks.size();
+                        ++local_ops;
+                        microbench_get_or_set_latency << 
(single_sw.elapsed_time() / 1000);
+                    }
+
+                    {
+                        std::lock_guard<std::mutex> lock(stats_mutex);
+                        total_stats.cache_get_or_set_timer += 
local_stats.cache_get_or_set_timer;
+                        total_stats.lock_wait_timer += 
local_stats.lock_wait_timer;
+                        total_stats.get_timer += local_stats.get_timer;
+                        total_stats.set_timer += local_stats.set_timer;
+                    }
+                    total_ops.fetch_add(local_ops, std::memory_order_relaxed);
+                    total_blocks.fetch_add(local_blocks, 
std::memory_order_relaxed);
+                } catch (const std::exception& e) {
+                    has_error.store(true, std::memory_order_relaxed);
+                    std::lock_guard<std::mutex> lock(error_mutex);
+                    if (first_error.empty()) {
+                        first_error = e.what();
+                    }
+                }
+            }));
+        }
+
+        for (auto& future : futures) {
+            future.get();
+        }
+        stopwatch.stop();
+        if (has_error.load(std::memory_order_relaxed)) {
+            throw std::runtime_error(first_error.empty() ? 
"file_cache_get_or_set task failed"
+                                                         : first_error);
+        }
+
+        const double elapsed_sec = stopwatch.elapsed_time() / 1e9;
+        job.stats.total_read_time = std::to_string(elapsed_sec) + " seconds";
+        job.stats.cache_get_or_set_timer = total_stats.cache_get_or_set_timer;
+        job.stats.lock_wait_timer = total_stats.lock_wait_timer;
+        job.stats.get_timer = total_stats.get_timer;
+        job.stats.set_timer = total_stats.set_timer;
+        job.stats.get_or_set_ops = total_ops.load(std::memory_order_relaxed);
+        job.stats.get_or_set_blocks = 
total_blocks.load(std::memory_order_relaxed);
+        job.stats.get_or_set_qps =
+                elapsed_sec > 0 ? 
static_cast<double>(job.stats.get_or_set_ops) / elapsed_sec : 0;
+        job.stats.avg_get_or_set_us =
+                job.stats.get_or_set_ops > 0
+                        ? 
static_cast<double>(job.stats.cache_get_or_set_timer) /
+                                  job.stats.get_or_set_ops / 1000
+                        : 0;
+
+        LOG(INFO) << "Completed file_cache_get_or_set job " << job.job_id
+                  << ", ops=" << job.stats.get_or_set_ops
+                  << ", blocks=" << job.stats.get_or_set_blocks << ", 
elapsed_sec=" << elapsed_sec
+                  << ", qps=" << job.stats.get_or_set_qps;
+    }
+
     // Execute read tasks
     void execute_read_tasks(const std::vector<std::string>& keys, Job& job, 
JobConfig& config) {
         LOG(INFO) << "Starting read tasks for job " << job.job_id << ", 
num_keys=" << keys.size()
@@ -1883,6 +2155,10 @@ public:
             rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
 
             d.AddMember("job_id", rapidjson::Value(job.job_id.c_str(), 
allocator), allocator);
+            if (job.config.job_type != JobType::FILE_CACHE) {
+                auto job_type = job_type_to_string(job.config.job_type);
+                d.AddMember("job_type", rapidjson::Value(job_type.c_str(), 
allocator), allocator);
+            }
             d.AddMember("status",
                         
rapidjson::Value(get_status_string(job.status).c_str(), allocator),
                         allocator);
@@ -1897,10 +2173,18 @@ public:
             }
 
             // Add configuration information
-            add_config_info(d, allocator, job.config);
+            if (job.config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+                add_get_or_set_config_info(d, allocator, job.config);
+            } else {
+                add_config_info(d, allocator, job.config);
+            }
 
             // Add statistics information
-            add_stats_info(d, allocator, job.stats);
+            if (job.config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+                add_get_or_set_stats_info(d, allocator, job.stats);
+            } else {
+                add_stats_info(d, allocator, job.stats);
+            }
 
             // Add file records (if requested)
             if (files_value) {
@@ -1984,6 +2268,11 @@ public:
                 job_obj.AddMember("file_prefix",
                                   
rapidjson::Value(job->config.file_prefix.c_str(), allocator),
                                   allocator);
+                if (job->config.job_type != JobType::FILE_CACHE) {
+                    auto job_type = job_type_to_string(job->config.job_type);
+                    job_obj.AddMember("job_type", 
rapidjson::Value(job_type.c_str(), allocator),
+                                      allocator);
+                }
 
                 jobs_array.PushBack(job_obj, allocator);
             }
@@ -2524,6 +2813,34 @@ private:
         doc.AddMember("config", config_obj, allocator);
     }
 
+    void add_get_or_set_config_info(rapidjson::Document& doc,
+                                    rapidjson::Document::AllocatorType& 
allocator,
+                                    const JobConfig& config) {
+        rapidjson::Value config_obj(rapidjson::kObjectType);
+        config_obj.AddMember("num_threads", config.num_threads, allocator);
+        config_obj.AddMember("num_files", config.num_files, allocator);
+        config_obj.AddMember("file_prefix", 
rapidjson::Value(config.file_prefix.c_str(), allocator),
+                             allocator);
+        config_obj.AddMember("size_bytes_perfile", config.size_bytes_perfile, 
allocator);
+        config_obj.AddMember("repeat", config.repeat, allocator);
+        config_obj.AddMember("cache_type", 
rapidjson::Value(config.cache_type.c_str(), allocator),
+                             allocator);
+        config_obj.AddMember("expiration", config.expiration, allocator);
+        config_obj.AddMember("bvar_enable", config.bvar_enable, allocator);
+
+        rapidjson::Value offset_array(rapidjson::kArrayType);
+        offset_array.PushBack(config.get_or_set_offset_left, allocator);
+        offset_array.PushBack(config.get_or_set_offset_right, allocator);
+        config_obj.AddMember("get_or_set_offset", offset_array, allocator);
+
+        rapidjson::Value size_array(rapidjson::kArrayType);
+        size_array.PushBack(config.get_or_set_size_left, allocator);
+        size_array.PushBack(config.get_or_set_size_right, allocator);
+        config_obj.AddMember("get_or_set_size", size_array, allocator);
+
+        doc.AddMember("config", config_obj, allocator);
+    }
+
     // Add statistics information to JSON response
     void add_stats_info(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
                         const Job::Statistics& stats) {
@@ -2568,6 +2885,28 @@ private:
         doc.AddMember("statistics", stats_obj, allocator);
     }
 
+    void add_get_or_set_stats_info(rapidjson::Document& doc,
+                                   rapidjson::Document::AllocatorType& 
allocator,
+                                   const Job::Statistics& stats) {
+        rapidjson::Value stats_obj(rapidjson::kObjectType);
+        stats_obj.AddMember("total_time",
+                            rapidjson::Value(stats.total_read_time.c_str(), 
allocator), allocator);
+        stats_obj.AddMember("get_or_set_ops", 
static_cast<uint64_t>(stats.get_or_set_ops),
+                            allocator);
+        stats_obj.AddMember("get_or_set_blocks", 
static_cast<uint64_t>(stats.get_or_set_blocks),
+                            allocator);
+        stats_obj.AddMember("get_or_set_qps", stats.get_or_set_qps, allocator);
+        stats_obj.AddMember("avg_get_or_set_us", stats.avg_get_or_set_us, 
allocator);
+        stats_obj.AddMember("cache_get_or_set_timer",
+                            
static_cast<uint64_t>(stats.cache_get_or_set_timer), allocator);
+        stats_obj.AddMember("lock_wait_timer", 
static_cast<uint64_t>(stats.lock_wait_timer),
+                            allocator);
+        stats_obj.AddMember("get_timer", 
static_cast<uint64_t>(stats.get_timer), allocator);
+        stats_obj.AddMember("set_timer", 
static_cast<uint64_t>(stats.set_timer), allocator);
+
+        doc.AddMember("statistics", stats_obj, allocator);
+    }
+
     // Add file records to JSON response
     void add_file_records(rapidjson::Document& doc, 
rapidjson::Document::AllocatorType& allocator,
                           const std::vector<FileInfo>& file_records, size_t 
max_files) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to