This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d9ec456a7a [improvement](cloud) Add enable_recycler config to skip 
recycler dynamically (#63286)
2d9ec456a7a is described below

commit 2d9ec456a7ad2e3a4877fef47ed51f04c4bb0f1a
Author: meiyi <[email protected]>
AuthorDate: Tue May 19 16:19:01 2026 +0800

    [improvement](cloud) Add enable_recycler config to skip recycler 
dynamically (#63286)
---
 cloud/src/common/config.h       |  2 +
 cloud/src/recycler/recycler.cpp | 45 ++++++++++++++---------
 cloud/test/recycler_test.cpp    | 81 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 110 insertions(+), 18 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index f54f65f9202..cdc8b5cd190 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -443,4 +443,6 @@ CONF_Bool(enable_instance_update_watcher, "true");
 CONF_mBool(advance_txn_lazy_commit_during_reads, "true");
 CONF_mBool(wait_txn_lazy_commit_during_reads, "true");
 
+// Whether to enable recycler. If false, the recycler will skip scanning 
instances to pending queue.
+CONF_mBool(enable_recycler, "true");
 } // namespace doris::cloud::config
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 401d938a1ae..84e92fa04c1 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -249,26 +249,30 @@ void Recycler::instance_scanner_callback() {
     std::this_thread::sleep_for(
             
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
     while (!stopped()) {
-        std::vector<InstanceInfoPB> instances;
-        get_all_instances(txn_kv_.get(), instances);
-        // TODO(plat1ko): delete job recycle kv of non-existent instances
-        LOG(INFO) << "Recycler get instances: " << [&instances] {
-            std::stringstream ss;
-            for (auto& i : instances) ss << ' ' << i.instance_id();
-            return ss.str();
-        }();
-        if (!instances.empty()) {
-            // enqueue instances
-            std::lock_guard lock(mtx_);
-            for (auto& instance : instances) {
-                if (instance_filter_.filter_out(instance.instance_id())) 
continue;
-                auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
-                // skip instance already in pending queue
-                if (success) {
-                    pending_instance_queue_.push_back(std::move(instance));
+        if (config::enable_recycler) {
+            std::vector<InstanceInfoPB> instances;
+            get_all_instances(txn_kv_.get(), instances);
+            // TODO(plat1ko): delete job recycle kv of non-existent instances
+            LOG(INFO) << "Recycler get instances: " << [&instances] {
+                std::stringstream ss;
+                for (auto& i : instances) ss << ' ' << i.instance_id();
+                return ss.str();
+            }();
+            if (!instances.empty()) {
+                // enqueue instances
+                std::lock_guard lock(mtx_);
+                for (auto& instance : instances) {
+                    if (instance_filter_.filter_out(instance.instance_id())) 
continue;
+                    auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
+                    // skip instance already in pending queue
+                    if (success) {
+                        pending_instance_queue_.push_back(std::move(instance));
+                    }
                 }
+                pending_instance_cond_.notify_all();
             }
-            pending_instance_cond_.notify_all();
+        } else {
+            LOG(WARNING) << "Skip recycler since enable_recycler is false";
         }
         {
             std::unique_lock lock(mtx_);
@@ -298,6 +302,11 @@ void Recycler::recycle_callback() {
             // skip instance in recycling
             if (recycling_instance_map_.count(instance_id)) continue;
         }
+        if (!config::enable_recycler) {
+            LOG(WARNING) << "Skip recycle instance_id=" << instance_id
+                         << " since enable_recycler is false";
+            continue;
+        }
         auto instance_recycler = std::make_shared<InstanceRecycler>(
                 txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index ffe2401862b..022a83eb88f 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -8693,4 +8693,85 @@ TEST(RecyclerTest, 
recycle_tablet_with_delete_file_failure) {
         EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be 
deleted";
     }
 }
+
+TEST(RecyclerTest, enable_recycler_default_true) {
+    EXPECT_TRUE(config::enable_recycler);
+}
+
+TEST(RecyclerTest, enable_recycler_skip_instance_scanner) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    bool old_val = config::enable_recycler;
+    config::enable_recycler = false;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler = old_val;
+    };
+
+    int64_t old_recycle_interval = config::recycle_interval_seconds;
+    config::recycle_interval_seconds = 0;
+    DORIS_CLOUD_DEFER {
+        config::recycle_interval_seconds = old_recycle_interval;
+    };
+
+    int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds;
+    config::recycler_sleep_before_scheduling_seconds = 0;
+    DORIS_CLOUD_DEFER {
+        config::recycler_sleep_before_scheduling_seconds = old_sleep;
+    };
+
+    Recycler recycler(txn_kv);
+    std::thread t([&]() { recycler.instance_scanner_callback(); });
+
+    // Let the callback complete one iteration:
+    //   sleep(0) -> check enable_recycler (false, skip) -> wait_for(0, 
timeout)
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    recycler.stopped_ = true;
+    recycler.notifier_.notify_all();
+    t.join();
+
+    EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+}
+
+TEST(RecyclerTest, enable_recycler_skip_recycle_callback) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    bool old_val = config::enable_recycler;
+    config::enable_recycler = false;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler = old_val;
+    };
+
+    Recycler recycler(txn_kv);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id("test_instance");
+    recycler.pending_instance_queue_.push_back(instance);
+    recycler.pending_instance_set_.insert("test_instance");
+
+    std::thread t([&]() { recycler.recycle_callback(); });
+
+    // Wait until the callback has popped the instance from the queue.
+    // Can not wait on pending_instance_cond_ here because the callback does
+    // not notify after popping, which may cause a deadlock: both the main
+    // thread and the callback end up waiting on the same CV with different
+    // predicates and no one will wake them up.
+    while (true) {
+        {
+            std::lock_guard lock(recycler.mtx_);
+            if (recycler.pending_instance_queue_.empty()) break;
+        }
+        std::this_thread::yield();
+    }
+
+    recycler.stopped_ = true;
+    recycler.pending_instance_cond_.notify_all();
+    t.join();
+
+    EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+    EXPECT_TRUE(recycler.pending_instance_set_.empty());
+    EXPECT_TRUE(recycler.recycling_instance_map_.empty());
+}
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to