freemandealer commented on code in PR #58922:
URL: https://github.com/apache/doris/pull/58922#discussion_r2608831727
##########
be/src/io/tools/file_cache_microbench.cpp:
##########
@@ -835,10 +836,373 @@ struct Job {
}
};
-// Job manager
+namespace microbenchService {
+class JobManager;
+
+class ServiceStateMachine {
+public:
+ static constexpr uint32_t JobUnit = 0b100;
+ static constexpr uint32_t StateMask = 0b11;
+ static constexpr uint32_t Idle = 0b11;
+ static constexpr uint32_t Reloading = 0b01;
+ static constexpr uint32_t Stopped = 0b00;
+
+ // idempotent handle
+ static constexpr uint32_t ReplicateReload = 0b10;
+
+ uint32_t finish_one_job() {
+ uint32_t old_s = _state.load();
+ while (true) {
+ DCHECK((old_s >> 2) > 0);
+ uint32_t new_s = old_s - JobUnit;
+ if (_state.compare_exchange_weak(old_s, new_s)) {
+ return new_s;
+ }
+ }
+ }
+
+ uint32_t run_one_job() {
+ uint32_t old_s = _state.load();
+ while (true) {
+ if ((old_s & StateMask) != Idle) {
+ return old_s;
+ }
+ uint32_t new_s = old_s + JobUnit;
+ if (_state.compare_exchange_weak(old_s, new_s)) {
+ return new_s;
+ }
+ }
+ }
+
+ uint32_t reload_request() {
+ uint32_t old_s = _state.load();
+ while (true) {
+ uint32_t state = old_s & StateMask;
+ if (state == Stopped) {
+ return old_s;
+ }
+ if (state == Reloading) {
+ return ReplicateReload;
+ }
+
+ uint32_t new_s = (old_s & ~StateMask) | Reloading;
+ if (_state.compare_exchange_weak(old_s, new_s)) {
+ return new_s;
+ }
+ }
+ }
+
+ uint32_t finish_reload() {
+ uint32_t expected = Reloading;
+ if (_state.compare_exchange_strong(expected, Idle)) {
+ return Idle;
+ }
+ return expected;
+ }
+
+ uint32_t service_start() {
+ uint32_t expected = Stopped;
+ if (_state.compare_exchange_strong(expected, Idle)) {
+ return Idle;
+ }
+ return expected;
+ }
+
+ uint32_t service_stop() {
+ uint32_t expected = Idle;
+ if (_state.compare_exchange_strong(expected, Stopped)) {
+ return Stopped;
+ }
+ return expected;
+ }
+
+ uint32_t get_state() const { return _state.load(); }
+
+ uint32_t can_submit_job() const { return (_state.load() & StateMask) ==
Idle; }
+
+ bool can_do_reload() const {
+ uint32_t current_state = _state.load();
+ return ((current_state & StateMask) == Reloading) && ((current_state
>> 2) == 0);
+ }
+
+private:
+ std::atomic<uint32_t> _state {Stopped};
+};
+
+using StateSPtr = std::shared_ptr<ServiceStateMachine>;
+
+class BenchEnvManager : public std::enable_shared_from_this<BenchEnvManager> {
+public:
+ BenchEnvManager(std::string_view doris_home) : _doris_home(doris_home) {}
+
+ ~BenchEnvManager() { stop_reload_worker(); }
+
+ doris::Status load_config() {
+ std::string conffile = std::string(_doris_home) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), true, true, true)) {
+ return Status::InternalError("Error reading config file");
+ }
+
+ std::string custom_conffile = doris::config::custom_config_dir +
"/be_custom.conf";
+ if (!doris::config::init(custom_conffile.c_str(), true, false, false))
{
+ return Status::InternalError("Error reading custom config file");
+ }
+
+ if (!doris::config::enable_file_cache) {
+ return Status::InternalError("config::enbale_file_cache should be
true!");
+ }
+
+ config::group_commit_wal_max_disk_limit = "100M";
+ LOG(INFO) << "Obj config. ak=" << doris::config::test_s3_ak
+ << " sk=" << doris::config::test_s3_sk
+ << " region=" << doris::config::test_s3_region
+ << " endpoint=" << doris::config::test_s3_endpoint
+ << " bucket=" << doris::config::test_s3_bucket;
+
+ LOG(INFO) << "File cache config. enable_file_cache=" <<
doris::config::enable_file_cache
+ << " file_cache_path=" << doris::config::file_cache_path
+ << " file_cache_each_block_size=" <<
doris::config::file_cache_each_block_size
+ << " clear_file_cache=" << doris::config::clear_file_cache
+ << " enable_file_cache_query_limit="
+ << doris::config::enable_file_cache_query_limit
+ << " file_cache_enter_disk_resource_limit_mode_percent="
+ <<
doris::config::file_cache_enter_disk_resource_limit_mode_percent
+ << " file_cache_exit_disk_resource_limit_mode_percent="
+ <<
doris::config::file_cache_exit_disk_resource_limit_mode_percent
+ << " enable_read_cache_file_directly="
+ << doris::config::enable_read_cache_file_directly
+ << " file_cache_enable_evict_from_other_queue_by_size="
+ <<
doris::config::file_cache_enable_evict_from_other_queue_by_size
+ << " file_cache_error_log_limit_bytes="
+ << doris::config::file_cache_error_log_limit_bytes
+ << " cache_lock_wait_long_tail_threshold_us="
+ << doris::config::cache_lock_wait_long_tail_threshold_us
+ << " cache_lock_held_long_tail_threshold_us="
+ << doris::config::cache_lock_held_long_tail_threshold_us
+ << " file_cache_remove_block_qps_limit="
+ << doris::config::file_cache_remove_block_qps_limit
+ << " enable_evict_file_cache_in_advance="
+ << doris::config::enable_evict_file_cache_in_advance
+ << " file_cache_enter_need_evict_cache_in_advance_percent="
+ <<
doris::config::file_cache_enter_need_evict_cache_in_advance_percent
+ << " file_cache_exit_need_evict_cache_in_advance_percent="
+ <<
doris::config::file_cache_exit_need_evict_cache_in_advance_percent
+ << " file_cache_evict_in_advance_interval_ms="
+ << doris::config::file_cache_evict_in_advance_interval_ms
+ << " file_cache_evict_in_advance_batch_bytes="
+ << doris::config::file_cache_evict_in_advance_batch_bytes;
+
+ LOG(INFO) << "S3 writer config. s3_file_writer_log_interval_second="
+ << doris::config::s3_file_writer_log_interval_second
+ << " s3_write_buffer_size=" <<
doris::config::s3_write_buffer_size
+ << " enable_flush_file_cache_async="
+ << doris::config::enable_flush_file_cache_async;
+
+ return Status::OK();
+ }
+
+ doris::Status global_load_once_env() {
+ doris::CpuInfo::init();
+ doris::DiskInfo::init();
+ doris::MemInfo::init();
+
+ LOG(INFO) << doris::CpuInfo::debug_string();
+ LOG(INFO) << doris::DiskInfo::debug_string();
+ LOG(INFO) << doris::MemInfo::debug_string();
+
+ return Status::OK();
+ }
+
+ doris::Status load_bench_exec_env() {
+ SCOPED_INIT_THREAD_CONTEXT();
+
+ std::vector<doris::StorePath> paths;
+ auto olap_res =
doris::parse_conf_store_paths(doris::config::storage_root_path, &paths);
+ if (!olap_res) {
+ LOG(ERROR) << "parse config storage path failed, path="
+ << doris::config::storage_root_path;
+ exit(-1);
+ }
+
+ std::vector<doris::StorePath> spill_paths;
+ if (doris::config::spill_storage_root_path.empty()) {
+ doris::config::spill_storage_root_path =
doris::config::storage_root_path;
+ }
+ olap_res =
+
doris::parse_conf_store_paths(doris::config::spill_storage_root_path,
&spill_paths);
+ if (!olap_res) {
+ LOG(ERROR) << "parse config spill storage path failed, path="
+ << doris::config::spill_storage_root_path;
+ exit(-1);
+ }
+ std::set<std::string> broken_paths;
+
doris::parse_conf_broken_store_paths(doris::config::broken_storage_path,
&broken_paths);
+
+ auto it = paths.begin();
+ for (; it != paths.end();) {
+ if (broken_paths.contains(it->path)) {
+ if (doris::config::ignore_broken_disk) {
+ LOG(WARNING) << "ignore broken disk, path = " << it->path;
+ it = paths.erase(it);
+ } else {
+ LOG(ERROR) << "a broken disk is found " << it->path;
+ exit(-1);
+ }
+ } else if (!doris::check_datapath_rw(it->path)) {
+ if (doris::config::ignore_broken_disk) {
+ LOG(WARNING) << "read write test file failed, path=" <<
it->path;
+ it = paths.erase(it);
+ } else {
+ LOG(ERROR) << "read write test file failed, path=" <<
it->path;
+ // if only one disk and the disk is full, also need exit
because rocksdb will open failed
+ exit(-1);
+ }
+ } else {
+ ++it;
+ }
+ }
+
+ if (paths.empty()) {
+ LOG(ERROR) << "All disks are broken, exit.";
+ exit(-1);
+ }
+
+ it = spill_paths.begin();
+ for (; it != spill_paths.end();) {
+ if (!doris::check_datapath_rw(it->path)) {
+ if (doris::config::ignore_broken_disk) {
+ LOG(WARNING) << "read write test file failed, path=" <<
it->path;
+ it = spill_paths.erase(it);
+ } else {
+ LOG(ERROR) << "read write test file failed, path=" <<
it->path;
+ exit(-1);
+ }
+ } else {
+ ++it;
+ }
+ }
+ if (spill_paths.empty()) {
+ LOG(ERROR) << "All spill disks are broken, exit.";
+ exit(-1);
+ }
+
+ auto* exec_env = doris::ExecEnv::GetInstance();
+ auto status = doris::ExecEnv::init(exec_env, paths, spill_paths,
broken_paths);
+ if (!status.ok()) {
+ return status;
+ }
+
+ std::unique_ptr<doris::ThreadPool> s3_upload_pool;
+
static_cast<void>(doris::ThreadPoolBuilder("MicrobenchS3FileUploadThreadPool")
+ .set_min_threads(256)
+ .set_max_threads(512)
+ .build(&s3_upload_pool));
+ exec_env->set_s3_file_upload_thread_pool(std::move(s3_upload_pool));
+
exec_env->set_file_cache_open_fd_cache(std::make_unique<doris::io::FDCache>());
+ return Status::OK();
+ }
+
+ doris::Status release_bench_exec_env() {
+ auto* exec_env = doris::ExecEnv::GetInstance();
+ if (auto* file_cache_factory = exec_env->file_cache_factory()) {
Review Comment:
add a condition ? if (... && config::clear_file_cache)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]