This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0705bfadc10 [feature](Recycler) Parallelize s3 delete operations and 
recycle_tablet  (#37630)
0705bfadc10 is described below

commit 0705bfadc108d3452d8c828a31b5b76111153548
Author: AlexYue <[email protected]>
AuthorDate: Fri Jul 19 22:39:01 2024 +0800

    [feature](Recycler) Parallelize s3 delete operations and recycle_tablet  
(#37630)
    
    Previously the procedure of recycler instance is single-threaded, which
    is not full sufficiently parallel. And there exists many network IO
    operation. So this pr tries to spilt recycle tasks into different stage
    which can be parallel. And make the delete operations parallel.
---
 cloud/src/common/config.h                 |   2 +
 cloud/src/recycler/azure_obj_client.cpp   |   6 +-
 cloud/src/recycler/azure_obj_client.h     |   5 +-
 cloud/src/recycler/obj_storage_client.cpp |  30 ++++-
 cloud/src/recycler/obj_storage_client.h   |  11 +-
 cloud/src/recycler/recycler.cpp           | 180 ++++++++++++++++++++++++------
 cloud/src/recycler/recycler.h             |  23 +++-
 cloud/src/recycler/recycler_service.cpp   |   9 +-
 cloud/src/recycler/s3_accessor.cpp        |  14 ++-
 cloud/src/recycler/s3_accessor.h          |   1 +
 cloud/src/recycler/s3_obj_client.cpp      |   6 +-
 cloud/src/recycler/s3_obj_client.h        |   5 +-
 cloud/src/recycler/sync_executor.h        | 125 +++++++++++++++++++++
 cloud/src/recycler/util.h                 |   1 +
 cloud/test/CMakeLists.txt                 |   8 +-
 cloud/test/recycler_test.cpp              |  55 +++++----
 cloud/test/util_test.cpp                  | 150 ++++++++++++++++++++++++-
 17 files changed, 545 insertions(+), 86 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index c5786bedff7..cb4bee9648e 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
 CONF_Strings(recycle_blacklist, ""); // Comma seprated list
 CONF_mInt32(instance_recycler_worker_pool_size, "1");
 CONF_Bool(enable_checker, "false");
+// The parallelism for parallel recycle operation
+CONF_Int32(recycle_pool_parallelism, "10");
 // Currently only used for recycler test
 CONF_Bool(enable_inverted_check, "false");
 // interval for scanning instances to do checks and inspections
diff --git a/cloud/src/recycler/azure_obj_client.cpp 
b/cloud/src/recycler/azure_obj_client.cpp
index 8674768fcd8..6a20bff0950 100644
--- a/cloud/src/recycler/azure_obj_client.cpp
+++ b/cloud/src/recycler/azure_obj_client.cpp
@@ -215,7 +215,8 @@ std::unique_ptr<ObjectListIterator> 
AzureObjClient::list_objects(ObjectStoragePa
 // You can find out the num in 
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
 // > Each batch request supports a maximum of 256 subrequests.
 ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
-                                                     std::vector<std::string> 
keys) {
+                                                     std::vector<std::string> 
keys,
+                                                     ObjClientOptions option) {
     if (keys.empty()) {
         return {0};
     }
@@ -289,8 +290,9 @@ ObjectStorageResponse 
AzureObjClient::delete_object(ObjectStoragePathRef path) {
 }
 
 ObjectStorageResponse 
AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path,
+                                                                 
ObjClientOptions option,
                                                                  int64_t 
expiration_time) {
-    return delete_objects_recursively_(path, expiration_time, 
BlobBatchMaxOperations);
+    return delete_objects_recursively_(path, option, expiration_time, 
BlobBatchMaxOperations);
 }
 
 ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/azure_obj_client.h 
b/cloud/src/recycler/azure_obj_client.h
index 49b54ca8c6d..96212d720a3 100644
--- a/cloud/src/recycler/azure_obj_client.h
+++ b/cloud/src/recycler/azure_obj_client.h
@@ -38,12 +38,13 @@ public:
 
     std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef 
path) override;
 
-    ObjectStorageResponse delete_objects(const std::string& bucket,
-                                         std::vector<std::string> keys) 
override;
+    ObjectStorageResponse delete_objects(const std::string& bucket, 
std::vector<std::string> keys,
+                                         ObjClientOptions option) override;
 
     ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;
 
     ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
+                                                     ObjClientOptions option,
                                                      int64_t expiration_time = 
0) override;
 
     ObjectStorageResponse get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/obj_storage_client.cpp 
b/cloud/src/recycler/obj_storage_client.cpp
index 1dd6435214d..855fa110a4c 100644
--- a/cloud/src/recycler/obj_storage_client.cpp
+++ b/cloud/src/recycler/obj_storage_client.cpp
@@ -18,10 +18,13 @@
 #include "recycler/obj_storage_client.h"
 
 #include "cpp/sync_point.h"
+#include "recycler/sync_executor.h"
+#include "recycler/util.h"
 
 namespace doris::cloud {
 
 ObjectStorageResponse 
ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path,
+                                                                    const 
ObjClientOptions& option,
                                                                     int64_t 
expired_time,
                                                                     size_t 
batch_size) {
     TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", 
&batch_size);
@@ -29,6 +32,10 @@ ObjectStorageResponse 
ObjStorageClient::delete_objects_recursively_(ObjectStorag
 
     ObjectStorageResponse ret;
     std::vector<std::string> keys;
+    SyncExecutor<int> concurrent_delete_executor(
+            option.executor,
+            fmt::format("delete objects under bucket {}, path {}", 
path.bucket, path.key),
+            [](const int& ret) { return ret != 0; });
 
     for (auto obj = list_iter->next(); obj.has_value(); obj = 
list_iter->next()) {
         if (expired_time > 0 && obj->mtime_s > expired_time) {
@@ -39,20 +46,31 @@ ObjectStorageResponse 
ObjStorageClient::delete_objects_recursively_(ObjectStorag
         if (keys.size() < batch_size) {
             continue;
         }
-
-        ret = delete_objects(path.bucket, std::move(keys));
-        if (ret.ret != 0) {
-            return ret;
-        }
+        concurrent_delete_executor.add([this, &path, k = std::move(keys), 
option]() mutable {
+            return delete_objects(path.bucket, std::move(k), option).ret;
+        });
     }
 
     if (!list_iter->is_valid()) {
+        bool finished;
+        concurrent_delete_executor.when_all(&finished);
         return {-1};
     }
 
     if (!keys.empty()) {
-        return delete_objects(path.bucket, std::move(keys));
+        concurrent_delete_executor.add([this, &path, k = std::move(keys), 
option]() mutable {
+            return delete_objects(path.bucket, std::move(k), option).ret;
+        });
     }
+    bool finished = true;
+    std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+    for (int r : rets) {
+        if (r != 0) {
+            ret = -1;
+        }
+    }
+
+    ret = finished ? ret : -1;
 
     return ret;
 }
diff --git a/cloud/src/recycler/obj_storage_client.h 
b/cloud/src/recycler/obj_storage_client.h
index 955a8c174e5..fc0211820d1 100644
--- a/cloud/src/recycler/obj_storage_client.h
+++ b/cloud/src/recycler/obj_storage_client.h
@@ -51,6 +51,12 @@ public:
     virtual std::optional<ObjectMeta> next() = 0;
 };
 
+class SimpleThreadPool;
+struct ObjClientOptions {
+    bool prefetch {true};
+    std::shared_ptr<SimpleThreadPool> executor;
+};
+
 class ObjStorageClient {
 public:
     ObjStorageClient() = default;
@@ -71,7 +77,8 @@ public:
 
     // According to the bucket and prefix specified by the user, it performs 
batch deletion based on the object names in the object array.
     virtual ObjectStorageResponse delete_objects(const std::string& bucket,
-                                                 std::vector<std::string> 
keys) = 0;
+                                                 std::vector<std::string> keys,
+                                                 ObjClientOptions option) = 0;
 
     // Delete the file named key in the object storage bucket.
     virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0;
@@ -79,6 +86,7 @@ public:
     // According to the prefix, recursively delete all objects under the 
prefix.
     // If `expiration_time` > 0, only delete objects with mtime earlier than 
`expiration_time`.
     virtual ObjectStorageResponse 
delete_objects_recursively(ObjectStoragePathRef path,
+                                                             ObjClientOptions 
option,
                                                              int64_t 
expiration_time = 0) = 0;
 
     // Get the objects' expiration time on the bucket
@@ -91,6 +99,7 @@ public:
 
 protected:
     ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef 
path,
+                                                      const ObjClientOptions& 
option,
                                                       int64_t expiration_time, 
size_t batch_size);
 };
 
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 204ff8d2f18..0b2267e601d 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -47,6 +47,7 @@
 #include "cpp/sync_point.h"
 #include "meta-service/keys.h"
 #include "recycler/recycler_service.h"
+#include "recycler/sync_executor.h"
 #include "recycler/util.h"
 
 namespace doris::cloud {
@@ -166,6 +167,16 @@ static inline void check_recycle_task(const std::string& 
instance_id, const std:
 
 Recycler::Recycler(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) 
{
     ip_port_ = std::string(butil::my_ip_cstr()) + ":" + 
std::to_string(config::brpc_listen_port);
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    auto recycle_tablet_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_tablet_pool->start();
+    auto group_recycle_function_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    group_recycle_function_pool->start();
+    _thread_pool_group =
+            RecyclerThreadPoolGroup(std::move(s3_producer_pool), 
std::move(recycle_tablet_pool),
+                                    std::move(group_recycle_function_pool));
 }
 
 Recycler::~Recycler() {
@@ -225,7 +236,8 @@ void Recycler::recycle_callback() {
             // skip instance in recycling
             if (recycling_instance_map_.count(instance_id)) continue;
         }
-        auto instance_recycler = std::make_shared<InstanceRecycler>(txn_kv_, 
instance);
+        auto instance_recycler =
+                std::make_shared<InstanceRecycler>(txn_kv_, instance, 
_thread_pool_group);
         if (instance_recycler->init() != 0) {
             LOG(WARNING) << "failed to init instance recycler, instance_id=" 
<< instance_id;
             continue;
@@ -441,11 +453,13 @@ private:
     std::unordered_set<Key, HashOfKey> schemas_without_inverted_index_;
 };
 
-InstanceRecycler::InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const 
InstanceInfoPB& instance)
+InstanceRecycler::InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const 
InstanceInfoPB& instance,
+                                   RecyclerThreadPoolGroup thread_pool_group)
         : txn_kv_(std::move(txn_kv)),
           instance_id_(instance.instance_id()),
           instance_info_(instance),
-          
inverted_index_id_cache_(std::make_unique<InvertedIndexIdCache>(instance_id_, 
txn_kv_)) {}
+          
inverted_index_id_cache_(std::make_unique<InvertedIndexIdCache>(instance_id_, 
txn_kv_)),
+          _thread_pool_group(std::move(thread_pool_group)) {}
 
 InstanceRecycler::~InstanceRecycler() = default;
 
@@ -542,22 +556,51 @@ int InstanceRecycler::init() {
     return init_storage_vault_accessors();
 }
 
+template <typename... Func>
+auto task_wrapper(Func... funcs) -> std::function<int()> {
+    return [funcs...]() {
+        return [](std::initializer_list<int> numbers) {
+            int i = 0;
+            for (int num : numbers) {
+                if (num != 0) {
+                    i = num;
+                }
+            }
+            return i;
+        }({funcs()...});
+    };
+}
+
 int InstanceRecycler::do_recycle() {
     TEST_SYNC_POINT("InstanceRecycler.do_recycle");
     if (instance_info_.status() == InstanceInfoPB::DELETED) {
         return recycle_deleted_instance();
     } else if (instance_info_.status() == InstanceInfoPB::NORMAL) {
-        int ret = recycle_indexes();
-        if (recycle_partitions() != 0) ret = -1;
-        if (recycle_tmp_rowsets() != 0) ret = -1;
-        if (recycle_rowsets() != 0) ret = -1;
-        if (abort_timeout_txn() != 0) ret = -1;
-        if (recycle_expired_txn_label() != 0) ret = -1;
-        if (recycle_copy_jobs() != 0) ret = -1;
-        if (recycle_stage() != 0) ret = -1;
-        if (recycle_expired_stage_objects() != 0) ret = -1;
-        if (recycle_versions() != 0) ret = -1;
-        return ret;
+        SyncExecutor<int> 
sync_executor(_thread_pool_group.group_recycle_function_pool,
+                                        fmt::format("instance id {}", 
instance_id_),
+                                        [](int r) { return r != 0; });
+        sync_executor
+                .add(task_wrapper(
+                        [this]() -> int { return 
InstanceRecycler::recycle_indexes(); },
+                        [this]() -> int { return 
InstanceRecycler::recycle_partitions(); },
+                        [this]() -> int { return 
InstanceRecycler::recycle_tmp_rowsets(); },
+                        [this]() -> int { return 
InstanceRecycler::recycle_rowsets(); }))
+                .add(task_wrapper(
+                        [this]() { return 
InstanceRecycler::abort_timeout_txn(); },
+                        [this]() { return 
InstanceRecycler::recycle_expired_txn_label(); }))
+                .add(task_wrapper([this]() { return 
InstanceRecycler::recycle_copy_jobs(); }))
+                .add(task_wrapper([this]() { return 
InstanceRecycler::recycle_stage(); }))
+                .add(task_wrapper(
+                        [this]() { return 
InstanceRecycler::recycle_expired_stage_objects(); }))
+                .add(task_wrapper([this]() { return 
InstanceRecycler::recycle_versions(); }));
+        bool finished = true;
+        std::vector<int> rets = sync_executor.when_all(&finished);
+        for (int ret : rets) {
+            if (ret != 0) {
+                return ret;
+            }
+        }
+        return finished ? 0 : -1;
     } else {
         LOG(WARNING) << "invalid instance status: " << instance_info_.status()
                      << " instance_id=" << instance_id_;
@@ -1012,7 +1055,7 @@ int InstanceRecycler::recycle_versions() {
 int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, 
int64_t partition_id,
                                       bool is_empty_tablet) {
     int num_scanned = 0;
-    int num_recycled = 0;
+    std::atomic_int num_recycled = 0;
 
     std::string tablet_key_begin, tablet_key_end;
     std::string stats_key_begin, stats_key_end;
@@ -1054,12 +1097,20 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
                 .tag("num_recycled", num_recycled);
     });
 
+    // The first string_view represents the tablet key which has been recycled
+    // The second bool represents whether the following fdb's tablet key 
deletion could be done using range move or not
+    using TabletKeyPair = std::pair<std::string_view, bool>;
+    SyncExecutor<TabletKeyPair> sync_executor(
+            _thread_pool_group.recycle_tablet_pool,
+            fmt::format("recycle tablets, tablet id {}, index id {}, partition 
id {}", table_id,
+                        index_id, partition_id),
+            [](const TabletKeyPair& k) { return k.first.empty(); });
+
     // Elements in `tablet_keys` has the same lifetime as `it` in 
`scan_and_recycle`
-    std::vector<std::string_view> tablet_keys;
     std::vector<std::string> tablet_idx_keys;
     std::vector<std::string> init_rs_keys;
-    bool use_range_remove = true;
     auto recycle_func = [&, is_empty_tablet, this](std::string_view k, 
std::string_view v) -> int {
+        bool use_range_remove = true;
         ++num_scanned;
         doris::TabletMetaCloudPB tablet_meta_pb;
         if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) {
@@ -1070,13 +1121,19 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         int64_t tablet_id = tablet_meta_pb.tablet_id();
         tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_, 
tablet_id}));
         if (!is_empty_tablet) {
-            if (recycle_tablet(tablet_id) != 0) {
-                LOG_WARNING("failed to recycle tablet")
-                        .tag("instance_id", instance_id_)
-                        .tag("tablet_id", tablet_id);
-                use_range_remove = false;
-                return -1;
-            }
+            sync_executor.add([this, &num_recycled, tid = tablet_id, 
range_move = use_range_remove,
+                               k]() mutable -> TabletKeyPair {
+                if (recycle_tablet(tid) != 0) {
+                    LOG_WARNING("failed to recycle tablet")
+                            .tag("instance_id", instance_id_)
+                            .tag("tablet_id", tid);
+                    range_move = false;
+                    return {std::string_view(), range_move};
+                }
+                ++num_recycled;
+                LOG_INFO("k is {}, is empty {}", k, k.empty());
+                return {k, range_move};
+            });
         } else {
             // Empty tablet only has a [0-1] init rowset
             init_rs_keys.push_back(meta_rowset_key({instance_id_, tablet_id, 
1}));
@@ -1100,19 +1157,38 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
                 }
                 return true;
             }());
+            sync_executor.add([k]() mutable -> TabletKeyPair {
+                LOG_INFO("k is {}, is empty {}", k, k.empty());
+                return {k, true};
+            });
+            ++num_recycled;
         }
-        ++num_recycled;
-        tablet_keys.push_back(k);
         return 0;
     };
 
+    // TODO(AlexYue): Add one ut to cover use_range_remove = false
     auto loop_done = [&, this]() -> int {
+        bool finished = true;
+        auto tablet_keys = sync_executor.when_all(&finished);
+        if (!finished) {
+            LOG_WARNING("failed to recycle tablet").tag("instance_id", 
instance_id_);
+            return -1;
+        }
+        sync_executor.reset();
         if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0;
+        // sort the vector using key's order
+        std::sort(tablet_keys.begin(), tablet_keys.end(),
+                  [](const auto& prev, const auto& last) { return prev.first < 
last.first; });
+        bool use_range_remove = true;
+        for (auto& [_, remove] : tablet_keys) {
+            if (!remove) {
+                use_range_remove = remove;
+                break;
+            }
+        }
         std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&](int*) {
-            tablet_keys.clear();
             tablet_idx_keys.clear();
             init_rs_keys.clear();
-            use_range_remove = true;
         });
         std::unique_ptr<Transaction> txn;
         if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
@@ -1122,10 +1198,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, 
int64_t index_id, int64_
         std::string tablet_key_end;
         if (!tablet_keys.empty()) {
             if (use_range_remove) {
-                tablet_key_end = std::string(tablet_keys.back()) + '\x00';
-                txn->remove(tablet_keys.front(), tablet_key_end);
+                tablet_key_end = std::string(tablet_keys.back().first) + 
'\x00';
+                txn->remove(tablet_keys.front().first, tablet_key_end);
             } else {
-                for (auto k : tablet_keys) {
+                for (auto& [k, _] : tablet_keys) {
                     txn->remove(k);
                 }
             }
@@ -1312,13 +1388,26 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
             }
         }
     }
+
+    SyncExecutor<int> 
concurrent_delete_executor(_thread_pool_group.s3_producer_pool,
+                                                 "delete_rowset_data",
+                                                 [](const int& ret) { return 
ret != 0; });
     for (auto& [resource_id, file_paths] : resource_file_paths) {
-        auto& accessor = accessor_map_[resource_id];
-        DCHECK(accessor);
-        if (accessor->delete_files(file_paths) != 0) {
+        concurrent_delete_executor.add([&, rid = &resource_id, paths = 
&file_paths]() -> int {
+            auto& accessor = accessor_map_[*rid];
+            DCHECK(accessor);
+            return accessor->delete_files(*paths);
+        });
+    }
+    bool finished = true;
+    std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+    for (int r : rets) {
+        if (r != 0) {
             ret = -1;
+            break;
         }
     }
+    ret = finished ? ret : -1;
     return ret;
 }
 
@@ -1377,15 +1466,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) 
{
         ret = -1;
     }
 
+    SyncExecutor<int> concurrent_delete_executor(
+            _thread_pool_group.s3_producer_pool,
+            fmt::format("delete tablet {} s3 rowset", tablet_id),
+            [](const int& ret) { return ret != 0; });
+
     // delete all rowset data in this tablet
     for (auto& [_, accessor] : accessor_map_) {
-        if (accessor->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
-            LOG(WARNING) << "failed to delete rowset data of tablet " << 
tablet_id
-                         << " s3_path=" << accessor->uri();
+        concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
+            if 
((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
+                LOG(WARNING) << "failed to delete rowset data of tablet " << 
tablet_id
+                             << " s3_path=" << accessor->uri();
+                return -1;
+            }
+            return 0;
+        });
+    }
+    bool finished = true;
+    std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+    for (int r : rets) {
+        if (r != 0) {
             ret = -1;
         }
     }
 
+    ret = finished ? ret : -1;
+
     if (ret == 0) {
         // All object files under tablet have been deleted
         std::lock_guard lock(recycled_tablets_mtx_);
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 1085d9362d5..950c0193f77 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -40,6 +40,24 @@ class TxnKv;
 class InstanceRecycler;
 class StorageVaultAccessor;
 class Checker;
+class SimpleThreadPool;
+struct RecyclerThreadPoolGroup {
+    RecyclerThreadPoolGroup() = default;
+    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
+                            std::shared_ptr<SimpleThreadPool> 
recycle_tablet_pool,
+                            std::shared_ptr<SimpleThreadPool> 
group_recycle_function_pool)
+            : s3_producer_pool(std::move(s3_producer_pool)),
+              recycle_tablet_pool(std::move(recycle_tablet_pool)),
+              
group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
+    ~RecyclerThreadPoolGroup() = default;
+    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
+    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = 
default;
+    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = 
default;
+    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
+    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
+    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
+    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
+};
 
 class Recycler {
 public:
@@ -83,11 +101,13 @@ private:
 
     WhiteBlackList instance_filter_;
     std::unique_ptr<Checker> checker_;
+    RecyclerThreadPoolGroup _thread_pool_group;
 };
 
 class InstanceRecycler {
 public:
-    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const 
InstanceInfoPB& instance);
+    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const 
InstanceInfoPB& instance,
+                              RecyclerThreadPoolGroup thread_pool_group);
     ~InstanceRecycler();
 
     // returns 0 for success otherwise error
@@ -219,6 +239,7 @@ private:
     std::mutex recycle_tasks_mutex;
     // <task_name, start_time>>
     std::map<std::string, int64_t> running_recycle_tasks;
+    RecyclerThreadPoolGroup _thread_pool_group;
 };
 
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_service.cpp 
b/cloud/src/recycler/recycler_service.cpp
index f0b8959a517..52c510fb2e7 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -151,7 +151,8 @@ void RecyclerServiceImpl::check_instance(const std::string& 
instance_id, MetaSer
 }
 
 void recycle_copy_jobs(const std::shared_ptr<TxnKv>& txn_kv, const 
std::string& instance_id,
-                       MetaServiceCode& code, std::string& msg) {
+                       MetaServiceCode& code, std::string& msg,
+                       RecyclerThreadPoolGroup thread_pool_group) {
     std::unique_ptr<Transaction> txn;
     TxnErrorCode err = txn_kv->create_txn(&txn);
     if (err != TxnErrorCode::TXN_OK) {
@@ -188,13 +189,13 @@ void recycle_copy_jobs(const std::shared_ptr<TxnKv>& 
txn_kv, const std::string&
             return;
         }
     }
-    auto recycler = std::make_unique<InstanceRecycler>(txn_kv, instance);
+
+    auto recycler = std::make_unique<InstanceRecycler>(txn_kv, instance, 
thread_pool_group);
     if (recycler->init() != 0) {
         LOG(WARNING) << "failed to init InstanceRecycler recycle_copy_jobs on 
instance "
                      << instance_id;
         return;
     }
-
     std::thread worker([recycler = std::move(recycler), instance_id] {
         LOG(INFO) << "manually trigger recycle_copy_jobs on instance " << 
instance_id;
         recycler->recycle_copy_jobs();
@@ -332,7 +333,7 @@ void 
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
             status_code = 400;
             return;
         }
-        recycle_copy_jobs(txn_kv_, *instance_id, code, msg);
+        recycle_copy_jobs(txn_kv_, *instance_id, code, msg, 
recycler_->_thread_pool_group);
         response_body = msg;
         return;
     }
diff --git a/cloud/src/recycler/s3_accessor.cpp 
b/cloud/src/recycler/s3_accessor.cpp
index c993e0fb547..2c983a5fa06 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -36,6 +36,7 @@
 #include "common/config.h"
 #include "common/encryption_util.h"
 #include "common/logging.h"
+#include "common/simple_thread_pool.h"
 #include "common/string_util.h"
 #include "common/util.h"
 #include "cpp/obj_retry_strategy.h"
@@ -234,7 +235,15 @@ int S3Accessor::create(S3Conf conf, 
std::shared_ptr<S3Accessor>* accessor) {
     return (*accessor)->init();
 }
 
+static std::shared_ptr<SimpleThreadPool> worker_pool;
+
 int S3Accessor::init() {
+    static std::once_flag log_annotated_tags_key_once;
+    std::call_once(log_annotated_tags_key_once, [&]() {
+        LOG_INFO("start s3 accessor parallel worker pool");
+        worker_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+        worker_pool->start();
+    });
     switch (conf_.provider) {
     case S3Conf::AZURE: {
 #ifdef USE_AZURE
@@ -291,7 +300,7 @@ int S3Accessor::delete_prefix_impl(const std::string& 
path_prefix, int64_t expir
     LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix));
     return obj_client_
             ->delete_objects_recursively({.bucket = conf_.bucket, .key = 
get_key(path_prefix)},
-                                         expiration_time)
+                                         {.executor = worker_pool}, 
expiration_time)
             .ret;
 }
 
@@ -333,7 +342,8 @@ int S3Accessor::delete_files(const 
std::vector<std::string>& paths) {
         keys.emplace_back(get_key(path));
     }
 
-    return obj_client_->delete_objects(conf_.bucket, std::move(keys)).ret;
+    return obj_client_->delete_objects(conf_.bucket, std::move(keys), 
{.executor = worker_pool})
+            .ret;
 }
 
 int S3Accessor::delete_file(const std::string& path) {
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 91f75765a9b..41adc93f04f 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -37,6 +37,7 @@ class S3RateLimiterHolder;
 enum class S3RateLimitType;
 namespace cloud {
 class ObjectStoreInfoPB;
+class SimpleThreadPool;
 
 namespace s3_bvar {
 extern bvar::LatencyRecorder s3_get_latency;
diff --git a/cloud/src/recycler/s3_obj_client.cpp 
b/cloud/src/recycler/s3_obj_client.cpp
index d1307de53b7..fc0c7e9e901 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -201,7 +201,8 @@ std::unique_ptr<ObjectListIterator> 
S3ObjClient::list_objects(ObjectStoragePathR
 }
 
 ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket,
-                                                  std::vector<std::string> 
keys) {
+                                                  std::vector<std::string> 
keys,
+                                                  ObjClientOptions option) {
     if (keys.empty()) {
         return {0};
     }
@@ -295,8 +296,9 @@ ObjectStorageResponse 
S3ObjClient::delete_object(ObjectStoragePathRef path) {
 }
 
 ObjectStorageResponse 
S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,
+                                                              ObjClientOptions 
option,
                                                               int64_t 
expiration_time) {
-    return delete_objects_recursively_(path, expiration_time, MaxDeleteBatch);
+    return delete_objects_recursively_(path, option, expiration_time, 
MaxDeleteBatch);
 }
 
 ObjectStorageResponse S3ObjClient::get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/s3_obj_client.h 
b/cloud/src/recycler/s3_obj_client.h
index 7804d081816..c486fdc18dc 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -39,12 +39,13 @@ public:
 
     std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef 
path) override;
 
-    ObjectStorageResponse delete_objects(const std::string& bucket,
-                                         std::vector<std::string> keys) 
override;
+    ObjectStorageResponse delete_objects(const std::string& bucket, 
std::vector<std::string> keys,
+                                         ObjClientOptions option) override;
 
     ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;
 
     ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
+                                                     ObjClientOptions option,
                                                      int64_t expiration_time = 
0) override;
 
     ObjectStorageResponse get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/sync_executor.h 
b/cloud/src/recycler/sync_executor.h
new file mode 100644
index 00000000000..d7009a99ed4
--- /dev/null
+++ b/cloud/src/recycler/sync_executor.h
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/countdown_event.h>
+#include <fmt/core.h>
+#include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
+
+#include <future>
+#include <iostream>
+#include <string>
+
+#include "common/logging.h"
+#include "common/simple_thread_pool.h"
+
+namespace doris::cloud {
+
+template <typename T>
+class SyncExecutor {
+public:
+    SyncExecutor(
+            std::shared_ptr<SimpleThreadPool> pool, std::string name_tag,
+            std::function<bool(const T&)> cancel = [](const T& /**/) { return 
false; })
+            : _pool(std::move(pool)), _cancel(std::move(cancel)), 
_name_tag(std::move(name_tag)) {}
+    auto add(std::function<T()> callback) -> SyncExecutor<T>& {
+        auto task = std::make_unique<Task>(std::move(callback), _cancel, 
_count);
+        _count.add_count();
+        // The actual task logic would be wrapped by one promise and passed to 
the threadpool.
+        // The result would be returned by the future once the task is 
finished.
+        // Or the task would be invalid if the whole task is cancelled.
+        int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); });
+        CHECK(r == 0);
+        _res.emplace_back(std::move(task));
+        return *this;
+    }
+    std::vector<T> when_all(bool* finished) {
+        timespec current_time;
+        auto current_time_second = time(nullptr);
+        current_time.tv_sec = current_time_second + 300;
+        current_time.tv_nsec = 0;
+        auto msg = fmt::format("{} has already taken 5 min", _name_tag);
+        while (0 != _count.timed_wait(current_time)) {
+            current_time.tv_sec += 300;
+            LOG(WARNING) << msg;
+        }
+        *finished = !_stop_token;
+        std::vector<T> res;
+        res.reserve(_res.size());
+        for (auto& task : _res) {
+            if (!task->valid()) {
+                *finished = false;
+                return res;
+            }
+            res.emplace_back((*task).get());
+        }
+        return res;
+    }
+    void reset() {
+        _res.clear();
+        _stop_token = false;
+    }
+
+private:
+    class Task {
+    public:
+        Task(std::function<T()> callback, std::function<bool(const T&)> cancel,
+             bthread::CountdownEvent& count)
+                : _callback(std::move(callback)),
+                  _cancel(std::move(cancel)),
+                  _count(count),
+                  _fut(_pro.get_future()) {}
+        void operator()(std::atomic_bool& stop_token) {
+            std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
+                                                                  [&](int*) { 
_count.signal(); });
+            if (stop_token) {
+                _valid = false;
+                return;
+            }
+            T t = _callback();
+            // We'll return this task result to user even if this task return 
error
+            // So we don't set _valid to false here
+            if (_cancel(t)) {
+                stop_token = true;
+            }
+            _pro.set_value(std::move(t));
+        }
+        bool valid() { return _valid; }
+        T get() { return _fut.get(); }
+
+    private:
+        // It's guarantted that the valid function can only be called inside 
SyncExecutor's `when_all()` function
+        // and only be called when the _count.timed_wait function returned. So 
there would be no data race for
+        // _valid then it doesn't need to be one atomic bool.
+        bool _valid = true;
+        std::function<T()> _callback;
+        std::function<bool(const T&)> _cancel;
+        std::promise<T> _pro;
+        bthread::CountdownEvent& _count;
+        std::future<T> _fut;
+    };
+    std::vector<std::unique_ptr<Task>> _res;
+    // use CountdownEvent to do periodically log using 
CountdownEvent::time_wait()
+    bthread::CountdownEvent _count {0};
+    std::atomic_bool _stop_token {false};
+    std::shared_ptr<SimpleThreadPool> _pool;
+    std::function<bool(const T&)> _cancel;
+    std::string _name_tag;
+};
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h
index 6c62bcaf7b0..b63090062bf 100644
--- a/cloud/src/recycler/util.h
+++ b/cloud/src/recycler/util.h
@@ -19,6 +19,7 @@
 
 #include <fmt/core.h>
 #include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
 
 #include <string>
 
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index a27f7a1c538..ba5e2909918 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -50,10 +50,10 @@ add_executable(s3_accessor_test s3_accessor_test.cpp)
 
 add_executable(hdfs_accessor_test hdfs_accessor_test.cpp)
 
-add_executable(util_test util_test.cpp)
-
 add_executable(stopwatch_test stopwatch_test.cpp)
 
+add_executable(util_test util_test.cpp)
+
 add_executable(network_util_test network_util_test.cpp)
 
 message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
@@ -85,10 +85,10 @@ target_link_libraries(s3_accessor_test ${TEST_LINK_LIBS})
 
 target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS})
 
-target_link_libraries(util_test ${TEST_LINK_LIBS})
-
 target_link_libraries(stopwatch_test ${TEST_LINK_LIBS})
 
+target_link_libraries(util_test ${TEST_LINK_LIBS})
+
 target_link_libraries(network_util_test ${TEST_LINK_LIBS})
 
 # FDB related tests need to be linked with libfdb_c
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 3f9d1175746..ca8ffbcee61 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -29,6 +29,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/simple_thread_pool.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/keys.h"
@@ -48,6 +49,8 @@ static const std::string instance_id = 
"instance_id_recycle_test";
 static int64_t current_time = 0;
 static constexpr int64_t db_id = 1000;
 
+static doris::cloud::RecyclerThreadPoolGroup thread_group;
+
 int main(int argc, char** argv) {
     auto conf_file = "doris_cloud.conf";
     if (!cloud::config::init(conf_file, true)) {
@@ -63,6 +66,16 @@ int main(int argc, char** argv) {
     current_time = 
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
 
     ::testing::InitGoogleTest(&argc, argv);
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    auto recycle_tablet_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    recycle_tablet_pool->start();
+    auto group_recycle_function_pool =
+            
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    group_recycle_function_pool->start();
+    thread_group =
+            RecyclerThreadPoolGroup(std::move(s3_producer_pool), 
std::move(recycle_tablet_pool),
+                                    std::move(group_recycle_function_pool));
     return RUN_ALL_TESTS();
 }
 
@@ -618,7 +631,7 @@ TEST(RecyclerTest, recycle_empty) {
     obj_info->set_bucket(config::test_s3_bucket);
     obj_info->set_prefix("recycle_empty");
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     ASSERT_EQ(recycler.recycle_rowsets(), 0);
@@ -651,7 +664,7 @@ TEST(RecyclerTest, recycle_rowsets) {
     sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { 
++insert_inverted_index; });
     sp->enable_processing();
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -716,7 +729,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
 
     config::instance_recycler_worker_pool_size = 10;
     config::recycle_task_threshold_seconds = 0;
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     auto sp = SyncPoint::get_instance();
@@ -799,7 +812,7 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
     sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) { 
++insert_inverted_index; });
     sp->enable_processing();
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -861,7 +874,7 @@ TEST(RecyclerTest, recycle_tablet) {
     obj_info->set_bucket(config::test_s3_bucket);
     obj_info->set_prefix("recycle_tablet");
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -936,7 +949,7 @@ TEST(RecyclerTest, recycle_indexes) {
     obj_info->set_bucket(config::test_s3_bucket);
     obj_info->set_prefix("recycle_indexes");
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -1048,7 +1061,7 @@ TEST(RecyclerTest, recycle_partitions) {
     obj_info->set_bucket(config::test_s3_bucket);
     obj_info->set_prefix("recycle_partitions");
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
 
     std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -1159,7 +1172,7 @@ TEST(RecyclerTest, recycle_versions) {
 
     InstanceInfoPB instance;
     instance.set_instance_id(instance_id);
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     // Recycle all partitions in table except 30006
     ASSERT_EQ(recycler.recycle_partitions(), 0);
@@ -1228,7 +1241,7 @@ TEST(RecyclerTest, abort_timeout_txn) {
     }
     InstanceInfoPB instance;
     instance.set_instance_id(mock_instance);
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     sleep(1);
     ASSERT_EQ(recycler.abort_timeout_txn(), 0);
@@ -1271,7 +1284,7 @@ TEST(RecyclerTest, abort_timeout_txn_and_rebegin) {
     }
     InstanceInfoPB instance;
     instance.set_instance_id(mock_instance);
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     sleep(1);
     ASSERT_EQ(recycler.abort_timeout_txn(), 0);
@@ -1338,7 +1351,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
         }
         InstanceInfoPB instance;
         instance.set_instance_id(mock_instance);
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         recycler.abort_timeout_txn();
         TxnInfoPB txn_info_pb;
@@ -1389,7 +1402,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
         }
         InstanceInfoPB instance;
         instance.set_instance_id(mock_instance);
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         sleep(1);
         recycler.abort_timeout_txn();
@@ -1441,7 +1454,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
         }
         InstanceInfoPB instance;
         instance.set_instance_id(mock_instance);
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         sleep(1);
         recycler.abort_timeout_txn();
@@ -1500,7 +1513,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
         }
         InstanceInfoPB instance;
         instance.set_instance_id(mock_instance);
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         sleep(1);
         recycler.abort_timeout_txn();
@@ -1637,7 +1650,7 @@ TEST(RecyclerTest, recycle_copy_jobs) {
 
     InstanceInfoPB instance_info;
     create_instance(internal_stage_id, external_stage_id, instance_info);
-    InstanceRecycler recycler(txn_kv, instance_info);
+    InstanceRecycler recycler(txn_kv, instance_info, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     auto internal_accessor = 
recycler.accessor_map_.find(internal_stage_id)->second;
 
@@ -1796,7 +1809,7 @@ TEST(RecyclerTest, recycle_batch_copy_jobs) {
 
     InstanceInfoPB instance_info;
     create_instance(internal_stage_id, external_stage_id, instance_info);
-    InstanceRecycler recycler(txn_kv, instance_info);
+    InstanceRecycler recycler(txn_kv, instance_info, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     const auto& internal_accessor = 
recycler.accessor_map_.find(internal_stage_id)->second;
 
@@ -1910,7 +1923,7 @@ TEST(RecyclerTest, recycle_stage) {
     instance.set_instance_id(mock_instance);
     instance.add_obj_info()->CopyFrom(object_info);
 
-    InstanceRecycler recycler(txn_kv, instance);
+    InstanceRecycler recycler(txn_kv, instance, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     auto accessor = recycler.accessor_map_.begin()->second;
     for (int i = 0; i < 10; ++i) {
@@ -1970,7 +1983,7 @@ TEST(RecyclerTest, recycle_deleted_instance) {
 
     InstanceInfoPB instance_info;
     create_instance(internal_stage_id, external_stage_id, instance_info);
-    InstanceRecycler recycler(txn_kv, instance_info);
+    InstanceRecycler recycler(txn_kv, instance_info, thread_group);
     ASSERT_EQ(recycler.init(), 0);
     // create txn key
     for (size_t i = 0; i < 100; i++) {
@@ -2555,7 +2568,7 @@ TEST(RecyclerTest, delete_rowset_data) {
     }
 
     {
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
         int64_t txn_id_base = 114115;
@@ -2589,7 +2602,7 @@ TEST(RecyclerTest, delete_rowset_data) {
         tmp_obj_info->set_bucket(config::test_s3_bucket);
         tmp_obj_info->set_prefix(resource_id);
 
-        InstanceRecycler recycler(txn_kv, tmp_instance);
+        InstanceRecycler recycler(txn_kv, tmp_instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
         // Delete multiple rowset files using one series of RowsetPB
@@ -2609,7 +2622,7 @@ TEST(RecyclerTest, delete_rowset_data) {
         ASSERT_FALSE(list_iter->has_next());
     }
     {
-        InstanceRecycler recycler(txn_kv, instance);
+        InstanceRecycler recycler(txn_kv, instance, thread_group);
         ASSERT_EQ(recycler.init(), 0);
         auto accessor = recycler.accessor_map_.begin()->second;
         // Delete multiple rowset files using one series of RowsetPB
diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp
index 02921170761..c88ef555f82 100644
--- a/cloud/test/util_test.cpp
+++ b/cloud/test/util_test.cpp
@@ -15,17 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "recycler/util.h"
+
+#include <chrono>
 #include <string>
+#include <string_view>
+#include <thread>
 #include <tuple>
 #include <vector>
 
 #include "common/config.h"
+#include "common/logging.h"
+#include "common/simple_thread_pool.h"
 #include "common/string_util.h"
-#include "glog/logging.h"
 #include "gtest/gtest.h"
+#include "recycler/recycler.h"
+#include "recycler/sync_executor.h"
+
+using namespace doris::cloud;
 
 int main(int argc, char** argv) {
-    doris::cloud::config::init(nullptr, true);
+    const auto* conf_file = "doris_cloud.conf";
+    if (!config::init(conf_file, true)) {
+        std::cerr << "failed to init config file, conf=" << conf_file << 
std::endl;
+        return -1;
+    }
+    if (!::init_glog("util")) {
+        std::cerr << "failed to init glog" << std::endl;
+        return -1;
+    }
+
     ::testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();
 }
@@ -89,3 +108,130 @@ TEST(StringUtilTest, test_string_strip) {
 
     // clang-format on
 }
+
+template <typename... Func>
+auto task_wrapper(Func... funcs) -> std::function<int()> {
+    return [funcs...]() {
+        return [](std::initializer_list<int> numbers) {
+            int i = 0;
+            for (int num : numbers) {
+                if (num != 0) {
+                    i = num;
+                }
+            }
+            return i;
+        }({funcs()...});
+    };
+}
+
+TEST(UtilTest, stage_wrapper) {
+    std::function<int()> func1 = []() { return 0; };
+    std::function<int()> func2 = []() { return -1; };
+    std::function<int()> func3 = []() { return 0; };
+    auto f = task_wrapper(func1, func2, func3);
+    ASSERT_EQ(-1, f());
+
+    f = task_wrapper(func1, func3);
+    ASSERT_EQ(0, f());
+}
+
+TEST(UtilTest, delay) {
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    // test normal execute
+    {
+        SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+                                        [](int k) { return k == -1; });
+        auto f1 = []() { return -1; };
+        auto f2 = []() {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            return 1;
+        };
+        sync_executor.add(f2);
+        sync_executor.add(f2);
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        ASSERT_EQ(finished, false);
+        ASSERT_EQ(3, res.size());
+    }
+    // test normal execute
+    {
+        SyncExecutor<std::string_view> sync_executor(
+                s3_producer_pool, "normal test",
+                [](const std::string_view k) { return k.empty(); });
+        auto f1 = []() { return ""; };
+        auto f2 = []() {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+            return "fake";
+        };
+        sync_executor.add(f2);
+        sync_executor.add(f2);
+        sync_executor.add(f1);
+        bool finished = true;
+        auto res = sync_executor.when_all(&finished);
+        ASSERT_EQ(finished, false);
+        ASSERT_EQ(3, res.size());
+    }
+}
+
+TEST(UtilTest, normal) {
+    auto s3_producer_pool = 
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+    s3_producer_pool->start();
+    // test normal execute
+    {
+        SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+                                        [](int k) { return k == -1; });
+        auto f1 = []() { return 1; };
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        ASSERT_EQ(3, res.size());
+        ASSERT_EQ(finished, true);
+        std::for_each(res.begin(), res.end(), [](auto&& n) { ASSERT_EQ(1, n); 
});
+    }
+    // test when error happen
+    {
+        SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+                                        [](int k) { return k == -1; });
+        auto f1 = []() { return 1; };
+        sync_executor._stop_token = true;
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        ASSERT_EQ(finished, false);
+        ASSERT_EQ(0, res.size());
+    }
+    {
+        SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+                                        [](int k) { return k == -1; });
+        auto f1 = []() { return 1; };
+        auto cancel = []() { return -1; };
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        sync_executor.add(cancel);
+        bool finished = true;
+        std::vector<int> res = sync_executor.when_all(&finished);
+        ASSERT_EQ(finished, false);
+    }
+    // test string_view
+    {
+        SyncExecutor<std::string_view> sync_executor(
+                s3_producer_pool, "normal test",
+                [](const std::string_view k) { return k.empty(); });
+        std::string s = "Hello World";
+        auto f1 = [&s]() { return std::string_view(s); };
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        sync_executor.add(f1);
+        bool finished = true;
+        std::vector<std::string_view> res = sync_executor.when_all(&finished);
+        ASSERT_EQ(3, res.size());
+        std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s, 
n); });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to