This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1dd76b2189c branch-4.0: [improvement](cloud) Add enable_recycler 
config to skip recycler dynamically #63286 (#63398)
1dd76b2189c is described below

commit 1dd76b2189c27fb3e44be7de9c340b33f848d9ef
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 22 23:53:51 2026 -0700

    branch-4.0: [improvement](cloud) Add enable_recycler config to skip 
recycler dynamically #63286 (#63398)
    
    Cherry-picked from #63286
    
    Co-authored-by: meiyi <[email protected]>
---
 cloud/src/common/config.h       |  2 +
 cloud/src/recycler/recycler.cpp | 45 ++++++++++++++---------
 cloud/test/recycler_test.cpp    | 81 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 110 insertions(+), 18 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index d5fd3a88363..35d38a4d416 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -435,4 +435,6 @@ CONF_Bool(enable_instance_update_watcher, "true");
 CONF_mBool(advance_txn_lazy_commit_during_reads, "true");
 CONF_mBool(wait_txn_lazy_commit_during_reads, "true");
 
+// Whether to enable recycler. If false, the recycler will skip scanning 
instances to pending queue.
+CONF_mBool(enable_recycler, "true");
 } // namespace doris::cloud::config
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index aab1daba813..548fcbb21f0 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -248,26 +248,30 @@ void Recycler::instance_scanner_callback() {
     std::this_thread::sleep_for(
             
std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
     while (!stopped()) {
-        std::vector<InstanceInfoPB> instances;
-        get_all_instances(txn_kv_.get(), instances);
-        // TODO(plat1ko): delete job recycle kv of non-existent instances
-        LOG(INFO) << "Recycler get instances: " << [&instances] {
-            std::stringstream ss;
-            for (auto& i : instances) ss << ' ' << i.instance_id();
-            return ss.str();
-        }();
-        if (!instances.empty()) {
-            // enqueue instances
-            std::lock_guard lock(mtx_);
-            for (auto& instance : instances) {
-                if (instance_filter_.filter_out(instance.instance_id())) 
continue;
-                auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
-                // skip instance already in pending queue
-                if (success) {
-                    pending_instance_queue_.push_back(std::move(instance));
+        if (config::enable_recycler) {
+            std::vector<InstanceInfoPB> instances;
+            get_all_instances(txn_kv_.get(), instances);
+            // TODO(plat1ko): delete job recycle kv of non-existent instances
+            LOG(INFO) << "Recycler get instances: " << [&instances] {
+                std::stringstream ss;
+                for (auto& i : instances) ss << ' ' << i.instance_id();
+                return ss.str();
+            }();
+            if (!instances.empty()) {
+                // enqueue instances
+                std::lock_guard lock(mtx_);
+                for (auto& instance : instances) {
+                    if (instance_filter_.filter_out(instance.instance_id())) 
continue;
+                    auto [_, success] = 
pending_instance_set_.insert(instance.instance_id());
+                    // skip instance already in pending queue
+                    if (success) {
+                        pending_instance_queue_.push_back(std::move(instance));
+                    }
                 }
+                pending_instance_cond_.notify_all();
             }
-            pending_instance_cond_.notify_all();
+        } else {
+            LOG(WARNING) << "Skip recycler since enable_recycler is false";
         }
         {
             std::unique_lock lock(mtx_);
@@ -297,6 +301,11 @@ void Recycler::recycle_callback() {
             // skip instance in recycling
             if (recycling_instance_map_.count(instance_id)) continue;
         }
+        if (!config::enable_recycler) {
+            LOG(WARNING) << "Skip recycle instance_id=" << instance_id
+                         << " since enable_recycler is false";
+            continue;
+        }
         auto instance_recycler = std::make_shared<InstanceRecycler>(
                 txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index a538707a312..733f349d6cf 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -8603,4 +8603,85 @@ TEST(RecyclerTest, 
recycle_tablet_with_delete_file_failure) {
         EXPECT_EQ(it->size(), 0) << "All recycle rowset keys should be 
deleted";
     }
 }
+
+TEST(RecyclerTest, enable_recycler_default_true) {
+    EXPECT_TRUE(config::enable_recycler);
+}
+
+TEST(RecyclerTest, enable_recycler_skip_instance_scanner) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    bool old_val = config::enable_recycler;
+    config::enable_recycler = false;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler = old_val;
+    };
+
+    int64_t old_recycle_interval = config::recycle_interval_seconds;
+    config::recycle_interval_seconds = 0;
+    DORIS_CLOUD_DEFER {
+        config::recycle_interval_seconds = old_recycle_interval;
+    };
+
+    int64_t old_sleep = config::recycler_sleep_before_scheduling_seconds;
+    config::recycler_sleep_before_scheduling_seconds = 0;
+    DORIS_CLOUD_DEFER {
+        config::recycler_sleep_before_scheduling_seconds = old_sleep;
+    };
+
+    Recycler recycler(txn_kv);
+    std::thread t([&]() { recycler.instance_scanner_callback(); });
+
+    // Let the callback complete one iteration:
+    //   sleep(0) -> check enable_recycler (false, skip) -> wait_for(0, 
timeout)
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    recycler.stopped_ = true;
+    recycler.notifier_.notify_all();
+    t.join();
+
+    EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+}
+
+TEST(RecyclerTest, enable_recycler_skip_recycle_callback) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    bool old_val = config::enable_recycler;
+    config::enable_recycler = false;
+    DORIS_CLOUD_DEFER {
+        config::enable_recycler = old_val;
+    };
+
+    Recycler recycler(txn_kv);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id("test_instance");
+    recycler.pending_instance_queue_.push_back(instance);
+    recycler.pending_instance_set_.insert("test_instance");
+
+    std::thread t([&]() { recycler.recycle_callback(); });
+
+    // Wait until the callback has popped the instance from the queue.
+    // Can not wait on pending_instance_cond_ here because the callback does
+    // not notify after popping, which may cause a deadlock: both the main
+    // thread and the callback end up waiting on the same CV with different
+    // predicates and no one will wake them up.
+    while (true) {
+        {
+            std::lock_guard lock(recycler.mtx_);
+            if (recycler.pending_instance_queue_.empty()) break;
+        }
+        std::this_thread::yield();
+    }
+
+    recycler.stopped_ = true;
+    recycler.pending_instance_cond_.notify_all();
+    t.join();
+
+    EXPECT_TRUE(recycler.pending_instance_queue_.empty());
+    EXPECT_TRUE(recycler.pending_instance_set_.empty());
+    EXPECT_TRUE(recycler.recycling_instance_map_.empty());
+}
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to