This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a9a87f86796 [chore](cloud) Support dynamic recycler instance filter
config (#63822)
a9a87f86796 is described below
commit a9a87f86796fa7432a5f66b6d2d83c8c953d3e82
Author: Yixuan Wang <[email protected]>
AuthorDate: Mon Jun 1 14:36:39 2026 +0800
[chore](cloud) Support dynamic recycler instance filter config (#63822)
Read recycler whitelist and blacklist directly from config when scanning
instances, so runtime config updates can affect filtering without
restart.
Add a unit test for dynamic filter changes.
---
cloud/src/common/config.h | 4 ++--
cloud/src/common/configbase.cpp | 22 ++++++++++++++++++----
cloud/src/common/configbase.h | 3 +++
cloud/src/recycler/recycler.cpp | 12 ++++++++++--
cloud/src/recycler/recycler.h | 2 --
cloud/test/recycler_test.cpp | 30 ++++++++++++++++++++++++++++++
6 files changed, 63 insertions(+), 10 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index b14c079b8f8..e9149dec15d 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -96,9 +96,9 @@ CONF_mInt64(compacted_rowset_retention_seconds, "10800"); //
3h
CONF_mInt64(dropped_index_retention_seconds, "10800"); // 3h
CONF_mInt64(dropped_partition_retention_seconds, "10800"); // 3h
// Which instance should be recycled. If empty, recycle all instances.
-CONF_Strings(recycle_whitelist, ""); // Comma seprated list
+CONF_mStrings(recycle_whitelist, ""); // Comma seprated list
// These instances will not be recycled, only effective when whitelist is
empty.
-CONF_Strings(recycle_blacklist, ""); // Comma seprated list
+CONF_mStrings(recycle_blacklist, ""); // Comma seprated list
// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
// Max number of delete tasks per batch when recycling objects.
diff --git a/cloud/src/common/configbase.cpp b/cloud/src/common/configbase.cpp
index f5e02eac64b..fb99b771953 100644
--- a/cloud/src/common/configbase.cpp
+++ b/cloud/src/common/configbase.cpp
@@ -408,6 +408,7 @@ bool do_set_config(const Register::Field& feild, const
std::string& value, bool
UPDATE_FIELD(feild, value, int32_t, need_persist);
UPDATE_FIELD(feild, value, int64_t, need_persist);
UPDATE_FIELD(feild, value, double, need_persist);
+ UPDATE_FIELD(feild, value, std::vector<std::string>, need_persist);
{
// add lock to ensure thread safe
std::unique_lock<std::shared_mutex> lock(mutable_string_config_lock);
@@ -504,17 +505,30 @@ std::pair<bool, std::string> update_config(const
std::string& configs, bool pers
std::unordered_map<std::string, std::string> conf_map;
std::istringstream ss(configs);
std::string conf;
+ std::string key;
+ std::string val;
+ auto add_config = [&]() {
+ if (!key.empty()) {
+ conf_map.emplace(std::move(key), std::move(val));
+ }
+ };
while (std::getline(ss, conf, ',')) {
auto pos = conf.find('=');
- if (pos == std::string::npos) {
+ if (pos == std::string::npos && key.empty()) {
return {false, fmt::format("config {} is invalid", conf)};
}
- std::string key = conf.substr(0, pos);
- std::string val = conf.substr(pos + 1);
+ if (pos == std::string::npos) {
+ trim(conf);
+ val += "," + conf;
+ continue;
+ }
+ add_config();
+ key = conf.substr(0, pos);
+ val = conf.substr(pos + 1);
trim(key);
trim(val);
- conf_map.emplace(std::move(key), std::move(val));
}
+ add_config();
return set_config(std::move(conf_map), persist, custom_conf_path);
}
diff --git a/cloud/src/common/configbase.h b/cloud/src/common/configbase.h
index b6eff9ae101..d937eb04b4e 100644
--- a/cloud/src/common/configbase.h
+++ b/cloud/src/common/configbase.h
@@ -107,6 +107,8 @@ public:
#define CONF_Doubles(name, defaultstr) DEFINE_FIELD(std::vector<double>, name,
defaultstr, false)
#define CONF_Strings(name, defaultstr) \
DEFINE_FIELD(std::vector<std::string>, name, defaultstr, false)
+#define CONF_mStrings(name, defaultstr) \
+ DEFINE_FIELD(std::vector<std::string>, name, defaultstr, true)
#define CONF_mBool(name, defaultstr) DEFINE_FIELD(bool, name, defaultstr, true)
#define CONF_mInt16(name, defaultstr) DEFINE_FIELD(int16_t, name, defaultstr,
true)
#define CONF_mInt32(name, defaultstr) DEFINE_FIELD(int32_t, name, defaultstr,
true)
@@ -128,6 +130,7 @@ public:
#define CONF_Int64s(name, defaultstr) DECLARE_FIELD(std::vector<int64_t>, name)
#define CONF_Doubles(name, defaultstr) DECLARE_FIELD(std::vector<double>, name)
#define CONF_Strings(name, defaultstr) DECLARE_FIELD(std::vector<std::string>,
name)
+#define CONF_mStrings(name, defaultstr)
DECLARE_FIELD(std::vector<std::string>, name)
#define CONF_mBool(name, defaultstr) DECLARE_FIELD(bool, name)
#define CONF_mInt16(name, defaultstr) DECLARE_FIELD(int16_t, name)
#define CONF_mInt32(name, defaultstr) DECLARE_FIELD(int32_t, name)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 84e92fa04c1..2e077e24abc 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -97,6 +97,15 @@ void sleep_for_packed_file_retry() {
std::this_thread::sleep_for(std::chrono::milliseconds(packed_file_retry_sleep_ms()));
}
+bool filter_out_instance(const std::string& instance_id) {
+ if (config::recycle_whitelist.empty()) {
+ return std::ranges::find(config::recycle_blacklist, instance_id) !=
+ config::recycle_blacklist.end();
+ }
+ return std::ranges::find(config::recycle_whitelist, instance_id) ==
+ config::recycle_whitelist.end();
+}
+
} // namespace
// return 0 for success get a key, 1 for key not found, negative for error
@@ -262,7 +271,7 @@ void Recycler::instance_scanner_callback() {
// enqueue instances
std::lock_guard lock(mtx_);
for (auto& instance : instances) {
- if (instance_filter_.filter_out(instance.instance_id()))
continue;
+ if (filter_out_instance(instance.instance_id())) continue;
auto [_, success] =
pending_instance_set_.insert(instance.instance_id());
// skip instance already in pending queue
if (success) {
@@ -415,7 +424,6 @@ void Recycler::check_recycle_tasks() {
}
int Recycler::start(brpc::Server* server) {
- instance_filter_.reset(config::recycle_whitelist,
config::recycle_blacklist);
g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency);
S3Environment::getInstance();
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 84062f41039..04ec7f19637 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -40,7 +40,6 @@
#include "recycler/snapshot_chain_compactor.h"
#include "recycler/snapshot_data_migrator.h"
#include "recycler/storage_vault_accessor.h"
-#include "recycler/white_black_list.h"
#include "snapshot/snapshot_manager.h"
namespace brpc {
@@ -118,7 +117,6 @@ private:
std::string ip_port_;
- WhiteBlackList instance_filter_;
std::unique_ptr<Checker> checker_;
RecyclerThreadPoolGroup _thread_pool_group;
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 022a83eb88f..319d3c5705b 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -8774,4 +8774,34 @@ TEST(RecyclerTest,
enable_recycler_skip_recycle_callback) {
EXPECT_TRUE(recycler.pending_instance_set_.empty());
EXPECT_TRUE(recycler.recycling_instance_map_.empty());
}
+
+TEST(RecyclerTest, RecycleInstanceFilterReadsConfigDynamically) {
+ auto old_whitelist = config::recycle_whitelist;
+ auto old_blacklist = config::recycle_blacklist;
+ DORIS_CLOUD_DEFER {
+ config::recycle_whitelist = old_whitelist;
+ config::recycle_blacklist = old_blacklist;
+ };
+
+ auto [succ, cause] = config::update_config("recycle_whitelist=", false,
"");
+ ASSERT_TRUE(succ) << cause;
+ std::tie(succ, cause) =
+ config::update_config("recycle_blacklist=instance1,instance2",
false, "");
+ ASSERT_TRUE(succ) << cause;
+ ASSERT_EQ(config::recycle_blacklist.size(), 2);
+ EXPECT_TRUE(filter_out_instance("instance1"));
+ EXPECT_TRUE(filter_out_instance("instance2"));
+ EXPECT_FALSE(filter_out_instance("instance3"));
+
+ std::tie(succ, cause) =
config::update_config("recycle_blacklist=instance2", false, "");
+ ASSERT_TRUE(succ) << cause;
+ EXPECT_FALSE(filter_out_instance("instance1"));
+ EXPECT_TRUE(filter_out_instance("instance2"));
+
+ std::tie(succ, cause) = config::update_config(
+ "recycle_whitelist=instance1,recycle_blacklist=instance1", false,
"");
+ ASSERT_TRUE(succ) << cause;
+ EXPECT_FALSE(filter_out_instance("instance1"));
+ EXPECT_TRUE(filter_out_instance("instance2"));
+}
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]