This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 4442e420a0f branch-4.1: [test](microbench) Add some api of get_or_set
#64691 (#65018)
4442e420a0f is described below
commit 4442e420a0f0b450b1a54a698ac143224dfd5637
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 1 17:23:29 2026 +0800
branch-4.1: [test](microbench) Add some api of get_or_set #64691 (#65018)
Cherry-picked from #64691
Co-authored-by: chunping <[email protected]>
---
be/src/io/tools/file_cache_microbench.cpp | 349 +++++++++++++++++++++++++++++-
1 file changed, 344 insertions(+), 5 deletions(-)
diff --git a/be/src/io/tools/file_cache_microbench.cpp
b/be/src/io/tools/file_cache_microbench.cpp
index 390782a4c8d..824c4276337 100644
--- a/be/src/io/tools/file_cache_microbench.cpp
+++ b/be/src/io/tools/file_cache_microbench.cpp
@@ -94,6 +94,7 @@ using namespace doris;
bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
+bvar::LatencyRecorder
microbench_get_or_set_latency("file_cache_microbench_get_or_set");
const std::string HIDDEN_PREFIX = "test_file_cache_microbench/";
const char PAD_CHAR = 'x';
@@ -515,6 +516,20 @@ std::string get_usage(const std::string& progname) {
"read_length": [<left>, <right>] // Range for reading length
(left inclusive, right exclusive)
}
+ Submit a direct file cache get_or_set benchmark job:
+ {
+ "job_type": "file_cache_get_or_set",
+ "file_prefix": "<prefix>",
+ "num_threads": <count>,
+ "num_files": <count>,
+ "repeat": <count>, // get_or_set calls per
generated cache key
+ "size_bytes_perfile": <size>, // Virtual file size used to
clip offset + size
+ "get_or_set_offset": [<left>, <right>],
+ "get_or_set_size": [<left>, <right>],
+ "cache_type": <type>, // NORMAL | TTL | INDEX |
DISPOSABLE
+ "expiration": <timestamp> // Required for TTL cache type
+ }
+
GET /get_job_status/<job_id>
Retrieve the status of a submitted job.
Parameters:
@@ -570,8 +585,21 @@ std::string get_usage(const std::string& progname) {
}
// Job configuration structure
+enum class JobType { FILE_CACHE, FILE_CACHE_GET_OR_SET };
+
+static std::string job_type_to_string(JobType type) {
+ switch (type) {
+ case JobType::FILE_CACHE:
+ return "file_cache";
+ case JobType::FILE_CACHE_GET_OR_SET:
+ return "file_cache_get_or_set";
+ }
+ return "unknown";
+}
+
struct JobConfig {
// Default value initialization
+ JobType job_type = JobType::FILE_CACHE;
int64_t size_bytes_perfile = 1024 * 1024;
int32_t write_iops = 0;
int32_t read_iops = 0;
@@ -586,6 +614,10 @@ struct JobConfig {
int64_t read_offset_right = 0;
int64_t read_length_left = 0;
int64_t read_length_right = 0;
+ int64_t get_or_set_offset_left = 0;
+ int64_t get_or_set_offset_right = 1;
+ int64_t get_or_set_size_left = 4096;
+ int64_t get_or_set_size_right = 4097;
bool write_file_cache = true;
bool bvar_enable = false;
@@ -599,21 +631,46 @@ struct JobConfig {
throw std::runtime_error("JSON parse error json args=" + json_str);
}
+ config.job_type = parse_job_type(d);
+
// Basic validation
validate(d);
// Use helper functions to parse each field
parse_basic_fields(d, config);
parse_cache_settings(d, config);
- parse_read_settings(d, config);
- // Additional validation
- validate_config(config);
+ if (config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+ parse_get_or_set_fields(d, config);
+ validate_get_or_set_config(config);
+ } else {
+ parse_read_settings(d, config);
+
+ // Additional validation
+ validate_config(config);
+ }
return config;
}
private:
+ static JobType parse_job_type(const rapidjson::Document& d) {
+ if (!d.HasMember("job_type")) {
+ return JobType::FILE_CACHE;
+ }
+ if (!d["job_type"].IsString()) {
+ throw std::runtime_error("job_type must be a string");
+ }
+ std::string type = d["job_type"].GetString();
+ if (type == "file_cache") {
+ return JobType::FILE_CACHE;
+ }
+ if (type == "file_cache_get_or_set") {
+ return JobType::FILE_CACHE_GET_OR_SET;
+ }
+ throw std::runtime_error("unsupported job_type: " + type);
+ }
+
// Validate the JSON document
static void validate(const rapidjson::Document& json_data) {
if (!json_data.HasMember("file_prefix") ||
!json_data["file_prefix"].IsString() ||
@@ -681,6 +738,32 @@ private:
}
}
+ static void parse_get_or_set_fields(const rapidjson::Document& d,
JobConfig& config) {
+ if (d.HasMember("get_or_set_offset") &&
d["get_or_set_offset"].IsArray() &&
+ d["get_or_set_offset"].Size() == 2) {
+ const rapidjson::Value& offset_array = d["get_or_set_offset"];
+ config.get_or_set_offset_left = offset_array[0].GetInt64();
+ config.get_or_set_offset_right = offset_array[1].GetInt64();
+ } else if (d.HasMember("read_offset") && d["read_offset"].IsArray() &&
+ d["read_offset"].Size() == 2) {
+ const rapidjson::Value& offset_array = d["read_offset"];
+ config.get_or_set_offset_left = offset_array[0].GetInt64();
+ config.get_or_set_offset_right = offset_array[1].GetInt64();
+ }
+
+ if (d.HasMember("get_or_set_size") && d["get_or_set_size"].IsArray() &&
+ d["get_or_set_size"].Size() == 2) {
+ const rapidjson::Value& size_array = d["get_or_set_size"];
+ config.get_or_set_size_left = size_array[0].GetInt64();
+ config.get_or_set_size_right = size_array[1].GetInt64();
+ } else if (d.HasMember("read_length") && d["read_length"].IsArray() &&
+ d["read_length"].Size() == 2) {
+ const rapidjson::Value& size_array = d["read_length"];
+ config.get_or_set_size_left = size_array[0].GetInt64();
+ config.get_or_set_size_right = size_array[1].GetInt64();
+ }
+ }
+
// Parse read-related settings
static void parse_read_settings(const rapidjson::Document& d, JobConfig&
config) {
if (config.read_iops > 0) {
@@ -731,8 +814,50 @@ private:
}
}
+ static void validate_get_or_set_config(const JobConfig& config) {
+ if (config.num_threads <= 0 || config.num_threads > 10000) {
+ throw std::runtime_error("num_threads must be between 1 and
10000");
+ }
+ if (config.num_files <= 0) {
+ throw std::runtime_error("num_files must be positive");
+ }
+ if (config.repeat <= 0) {
+ throw std::runtime_error("repeat must be positive");
+ }
+ if (config.size_bytes_perfile <= 0) {
+ throw std::runtime_error("size_bytes_perfile must be positive");
+ }
+ if (config.get_or_set_offset_left < 0 ||
+ config.get_or_set_offset_left >= config.get_or_set_offset_right) {
+ throw std::runtime_error(
+ "get_or_set_offset must be [left, right) with 0 <= left <
right");
+ }
+ if (config.get_or_set_size_left <= 0 ||
+ config.get_or_set_size_left >= config.get_or_set_size_right) {
+ throw std::runtime_error("get_or_set_size must be [left, right)
with 0 < left < right");
+ }
+ if (config.cache_type != "NORMAL" && config.cache_type != "TTL" &&
+ config.cache_type != "INDEX" && config.cache_type != "DISPOSABLE")
{
+ throw std::runtime_error(
+ "cache_type must be NORMAL, TTL, INDEX, or DISPOSABLE for
get_or_set jobs");
+ }
+ if (config.cache_type == "TTL" && config.expiration <= 0) {
+ throw std::runtime_error("expiration must be positive when cache
type is TTL");
+ }
+ }
+
public:
std::string to_string() const {
+ if (job_type == JobType::FILE_CACHE_GET_OR_SET) {
+ return fmt::format(
+ "job_type: {}, num_threads: {}, num_files: {},
file_prefix: {}, "
+ "size_bytes_perfile: {}, repeat: {}, cache_type: {},
expiration: {}, "
+ "get_or_set_offset: [{}, {}), get_or_set_size: [{}, {})",
+ job_type_to_string(job_type), num_threads, num_files,
+ get_prefix() + file_prefix, size_bytes_perfile, repeat,
cache_type, expiration,
+ get_or_set_offset_left, get_or_set_offset_right,
get_or_set_size_left,
+ get_or_set_size_right);
+ }
return fmt::format(
"size_bytes_perfile: {}, write_iops: {}, read_iops: {},
num_threads: {}, "
"num_files: {}, file_prefix: {}, write_file_cache: {},
write_batch_size: {}, "
@@ -781,6 +906,10 @@ struct Job {
int64_t lock_wait_timer = 0;
int64_t get_timer = 0;
int64_t set_timer = 0;
+ int64_t get_or_set_ops = 0;
+ int64_t get_or_set_blocks = 0;
+ double get_or_set_qps = 0;
+ double avg_get_or_set_us = 0;
} stats;
// Record associated file information for the job
@@ -1291,6 +1420,11 @@ private:
JobConfig& config = job.config;
LOG(INFO) << "Executing job " << job_id << " with config: " <<
config.to_string();
+ if (config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+ execute_file_cache_get_or_set(job);
+ return;
+ }
+
// Generate multiple keys
std::vector<std::string> keys;
keys.reserve(config.num_files);
@@ -1329,6 +1463,27 @@ private:
LOG(INFO) << "Job " << job_id << " execution completed";
}
+ static doris::io::FileCacheType parse_file_cache_type(const std::string&
cache_type) {
+ if (cache_type == "TTL") {
+ return doris::io::FileCacheType::TTL;
+ }
+ if (cache_type == "INDEX") {
+ return doris::io::FileCacheType::INDEX;
+ }
+ if (cache_type == "DISPOSABLE") {
+ return doris::io::FileCacheType::DISPOSABLE;
+ }
+ return doris::io::FileCacheType::NORMAL;
+ }
+
+ static size_t pick_range_value(std::mt19937_64& gen, int64_t left, int64_t
right) {
+ if (left + 1 == right) {
+ return static_cast<size_t>(left);
+ }
+ std::uniform_int_distribution<int64_t> dis(left, right - 1);
+ return static_cast<size_t>(dis(gen));
+ }
+
doris::S3ClientConf create_s3_client_conf(const JobConfig& config) {
doris::S3ClientConf s3_conf;
s3_conf.max_connections = std::max(256, config.num_threads * 4);
@@ -1431,6 +1586,123 @@ private:
LOG(INFO) << "Total write time: " << job.stats.total_write_time << "
seconds";
}
+ void execute_file_cache_get_or_set(Job& job) {
+ const JobConfig& config = job.config;
+ LOG(INFO) << "Executing file_cache_get_or_set job " << job.job_id
+ << " with config: " << config.to_string();
+
+ std::vector<std::string> keys;
+ keys.reserve(config.num_files);
+ for (int i = 0; i < config.num_files; ++i) {
+ keys.push_back(get_prefix() + config.file_prefix + "/" +
job.job_id + "_" +
+ std::to_string(i));
+ }
+
+ BenchThreadPool pool(config.num_threads);
+ std::vector<std::future<void>> futures;
+ futures.reserve(keys.size());
+ std::mutex stats_mutex;
+ std::mutex error_mutex;
+ doris::io::ReadStatistics total_stats;
+ std::atomic<int64_t> total_ops {0};
+ std::atomic<int64_t> total_blocks {0};
+ std::atomic<bool> has_error {false};
+ std::string first_error;
+
+ doris::MonotonicStopWatch stopwatch;
+ stopwatch.start();
+ for (const auto& key : keys) {
+ futures.push_back(pool.enqueue([&, key]() {
+ try {
+ doris::io::ReadStatistics local_stats;
+ int64_t local_ops = 0;
+ int64_t local_blocks = 0;
+ doris::io::FileCacheType cache_type =
parse_file_cache_type(config.cache_type);
+
+ auto hash = doris::io::BlockFileCache::hash(key);
+ doris::io::BlockFileCache* cache =
+
doris::io::FileCacheFactory::instance()->get_by_path(hash);
+ if (cache == nullptr) {
+ throw std::runtime_error("No file cache instance found
for key " + key);
+ }
+
+ std::mt19937_64 gen(std::hash<std::string> {}(key));
+ for (int64_t i = 0; i < config.repeat; ++i) {
+ size_t offset = pick_range_value(gen,
config.get_or_set_offset_left,
+
config.get_or_set_offset_right);
+ size_t size = pick_range_value(gen,
config.get_or_set_size_left,
+
config.get_or_set_size_right);
+ if (offset >=
static_cast<size_t>(config.size_bytes_perfile)) {
+ offset =
static_cast<size_t>(config.size_bytes_perfile - 1);
+ }
+ if (offset + size >
static_cast<size_t>(config.size_bytes_perfile)) {
+ size =
static_cast<size_t>(config.size_bytes_perfile) - offset;
+ }
+
+ doris::MonotonicStopWatch single_sw;
+ single_sw.start();
+ doris::io::CacheContext cache_context;
+ cache_context.stats = &local_stats;
+ cache_context.cache_type = cache_type;
+ cache_context.expiration_time = config.expiration;
+ auto holder = cache->get_or_set(hash, offset, size,
cache_context);
+ single_sw.stop();
+ local_stats.cache_get_or_set_timer +=
single_sw.elapsed_time();
+ local_blocks += holder.file_blocks.size();
+ ++local_ops;
+ microbench_get_or_set_latency <<
(single_sw.elapsed_time() / 1000);
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(stats_mutex);
+ total_stats.cache_get_or_set_timer +=
local_stats.cache_get_or_set_timer;
+ total_stats.lock_wait_timer +=
local_stats.lock_wait_timer;
+ total_stats.get_timer += local_stats.get_timer;
+ total_stats.set_timer += local_stats.set_timer;
+ }
+ total_ops.fetch_add(local_ops, std::memory_order_relaxed);
+ total_blocks.fetch_add(local_blocks,
std::memory_order_relaxed);
+ } catch (const std::exception& e) {
+ has_error.store(true, std::memory_order_relaxed);
+ std::lock_guard<std::mutex> lock(error_mutex);
+ if (first_error.empty()) {
+ first_error = e.what();
+ }
+ }
+ }));
+ }
+
+ for (auto& future : futures) {
+ future.get();
+ }
+ stopwatch.stop();
+ if (has_error.load(std::memory_order_relaxed)) {
+ throw std::runtime_error(first_error.empty() ?
"file_cache_get_or_set task failed"
+ : first_error);
+ }
+
+ const double elapsed_sec = stopwatch.elapsed_time() / 1e9;
+ job.stats.total_read_time = std::to_string(elapsed_sec) + " seconds";
+ job.stats.cache_get_or_set_timer = total_stats.cache_get_or_set_timer;
+ job.stats.lock_wait_timer = total_stats.lock_wait_timer;
+ job.stats.get_timer = total_stats.get_timer;
+ job.stats.set_timer = total_stats.set_timer;
+ job.stats.get_or_set_ops = total_ops.load(std::memory_order_relaxed);
+ job.stats.get_or_set_blocks =
total_blocks.load(std::memory_order_relaxed);
+ job.stats.get_or_set_qps =
+ elapsed_sec > 0 ?
static_cast<double>(job.stats.get_or_set_ops) / elapsed_sec : 0;
+ job.stats.avg_get_or_set_us =
+ job.stats.get_or_set_ops > 0
+ ?
static_cast<double>(job.stats.cache_get_or_set_timer) /
+ job.stats.get_or_set_ops / 1000
+ : 0;
+
+ LOG(INFO) << "Completed file_cache_get_or_set job " << job.job_id
+ << ", ops=" << job.stats.get_or_set_ops
+ << ", blocks=" << job.stats.get_or_set_blocks << ",
elapsed_sec=" << elapsed_sec
+ << ", qps=" << job.stats.get_or_set_qps;
+ }
+
// Execute read tasks
void execute_read_tasks(const std::vector<std::string>& keys, Job& job,
JobConfig& config) {
LOG(INFO) << "Starting read tasks for job " << job.job_id << ",
num_keys=" << keys.size()
@@ -1883,6 +2155,10 @@ public:
rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
d.AddMember("job_id", rapidjson::Value(job.job_id.c_str(),
allocator), allocator);
+ if (job.config.job_type != JobType::FILE_CACHE) {
+ auto job_type = job_type_to_string(job.config.job_type);
+ d.AddMember("job_type", rapidjson::Value(job_type.c_str(),
allocator), allocator);
+ }
d.AddMember("status",
rapidjson::Value(get_status_string(job.status).c_str(), allocator),
allocator);
@@ -1897,10 +2173,18 @@ public:
}
// Add configuration information
- add_config_info(d, allocator, job.config);
+ if (job.config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+ add_get_or_set_config_info(d, allocator, job.config);
+ } else {
+ add_config_info(d, allocator, job.config);
+ }
// Add statistics information
- add_stats_info(d, allocator, job.stats);
+ if (job.config.job_type == JobType::FILE_CACHE_GET_OR_SET) {
+ add_get_or_set_stats_info(d, allocator, job.stats);
+ } else {
+ add_stats_info(d, allocator, job.stats);
+ }
// Add file records (if requested)
if (files_value) {
@@ -1984,6 +2268,11 @@ public:
job_obj.AddMember("file_prefix",
rapidjson::Value(job->config.file_prefix.c_str(), allocator),
allocator);
+ if (job->config.job_type != JobType::FILE_CACHE) {
+ auto job_type = job_type_to_string(job->config.job_type);
+ job_obj.AddMember("job_type",
rapidjson::Value(job_type.c_str(), allocator),
+ allocator);
+ }
jobs_array.PushBack(job_obj, allocator);
}
@@ -2524,6 +2813,34 @@ private:
doc.AddMember("config", config_obj, allocator);
}
+ void add_get_or_set_config_info(rapidjson::Document& doc,
+ rapidjson::Document::AllocatorType&
allocator,
+ const JobConfig& config) {
+ rapidjson::Value config_obj(rapidjson::kObjectType);
+ config_obj.AddMember("num_threads", config.num_threads, allocator);
+ config_obj.AddMember("num_files", config.num_files, allocator);
+ config_obj.AddMember("file_prefix",
rapidjson::Value(config.file_prefix.c_str(), allocator),
+ allocator);
+ config_obj.AddMember("size_bytes_perfile", config.size_bytes_perfile,
allocator);
+ config_obj.AddMember("repeat", config.repeat, allocator);
+ config_obj.AddMember("cache_type",
rapidjson::Value(config.cache_type.c_str(), allocator),
+ allocator);
+ config_obj.AddMember("expiration", config.expiration, allocator);
+ config_obj.AddMember("bvar_enable", config.bvar_enable, allocator);
+
+ rapidjson::Value offset_array(rapidjson::kArrayType);
+ offset_array.PushBack(config.get_or_set_offset_left, allocator);
+ offset_array.PushBack(config.get_or_set_offset_right, allocator);
+ config_obj.AddMember("get_or_set_offset", offset_array, allocator);
+
+ rapidjson::Value size_array(rapidjson::kArrayType);
+ size_array.PushBack(config.get_or_set_size_left, allocator);
+ size_array.PushBack(config.get_or_set_size_right, allocator);
+ config_obj.AddMember("get_or_set_size", size_array, allocator);
+
+ doc.AddMember("config", config_obj, allocator);
+ }
+
// Add statistics information to JSON response
void add_stats_info(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
const Job::Statistics& stats) {
@@ -2568,6 +2885,28 @@ private:
doc.AddMember("statistics", stats_obj, allocator);
}
+ void add_get_or_set_stats_info(rapidjson::Document& doc,
+ rapidjson::Document::AllocatorType&
allocator,
+ const Job::Statistics& stats) {
+ rapidjson::Value stats_obj(rapidjson::kObjectType);
+ stats_obj.AddMember("total_time",
+ rapidjson::Value(stats.total_read_time.c_str(),
allocator), allocator);
+ stats_obj.AddMember("get_or_set_ops",
static_cast<uint64_t>(stats.get_or_set_ops),
+ allocator);
+ stats_obj.AddMember("get_or_set_blocks",
static_cast<uint64_t>(stats.get_or_set_blocks),
+ allocator);
+ stats_obj.AddMember("get_or_set_qps", stats.get_or_set_qps, allocator);
+ stats_obj.AddMember("avg_get_or_set_us", stats.avg_get_or_set_us,
allocator);
+ stats_obj.AddMember("cache_get_or_set_timer",
+
static_cast<uint64_t>(stats.cache_get_or_set_timer), allocator);
+ stats_obj.AddMember("lock_wait_timer",
static_cast<uint64_t>(stats.lock_wait_timer),
+ allocator);
+ stats_obj.AddMember("get_timer",
static_cast<uint64_t>(stats.get_timer), allocator);
+ stats_obj.AddMember("set_timer",
static_cast<uint64_t>(stats.set_timer), allocator);
+
+ doc.AddMember("statistics", stats_obj, allocator);
+ }
+
// Add file records to JSON response
void add_file_records(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
const std::vector<FileInfo>& file_records, size_t
max_files) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]