This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0705bfadc10 [feature](Recycler) Parallelize s3 delete operations and
recycle_tablet (#37630)
0705bfadc10 is described below
commit 0705bfadc108d3452d8c828a31b5b76111153548
Author: AlexYue <[email protected]>
AuthorDate: Fri Jul 19 22:39:01 2024 +0800
[feature](Recycler) Parallelize s3 delete operations and recycle_tablet
(#37630)
Previously the procedure of recycler instance is single-threaded, which
is not full sufficiently parallel. And there exists many network IO
operation. So this pr tries to spilt recycle tasks into different stage
which can be parallel. And make the delete operations parallel.
---
cloud/src/common/config.h | 2 +
cloud/src/recycler/azure_obj_client.cpp | 6 +-
cloud/src/recycler/azure_obj_client.h | 5 +-
cloud/src/recycler/obj_storage_client.cpp | 30 ++++-
cloud/src/recycler/obj_storage_client.h | 11 +-
cloud/src/recycler/recycler.cpp | 180 ++++++++++++++++++++++++------
cloud/src/recycler/recycler.h | 23 +++-
cloud/src/recycler/recycler_service.cpp | 9 +-
cloud/src/recycler/s3_accessor.cpp | 14 ++-
cloud/src/recycler/s3_accessor.h | 1 +
cloud/src/recycler/s3_obj_client.cpp | 6 +-
cloud/src/recycler/s3_obj_client.h | 5 +-
cloud/src/recycler/sync_executor.h | 125 +++++++++++++++++++++
cloud/src/recycler/util.h | 1 +
cloud/test/CMakeLists.txt | 8 +-
cloud/test/recycler_test.cpp | 55 +++++----
cloud/test/util_test.cpp | 150 ++++++++++++++++++++++++-
17 files changed, 545 insertions(+), 86 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index c5786bedff7..cb4bee9648e 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -62,6 +62,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
CONF_mInt32(instance_recycler_worker_pool_size, "1");
CONF_Bool(enable_checker, "false");
+// The parallelism for parallel recycle operation
+CONF_Int32(recycle_pool_parallelism, "10");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
// interval for scanning instances to do checks and inspections
diff --git a/cloud/src/recycler/azure_obj_client.cpp
b/cloud/src/recycler/azure_obj_client.cpp
index 8674768fcd8..6a20bff0950 100644
--- a/cloud/src/recycler/azure_obj_client.cpp
+++ b/cloud/src/recycler/azure_obj_client.cpp
@@ -215,7 +215,8 @@ std::unique_ptr<ObjectListIterator>
AzureObjClient::list_objects(ObjectStoragePa
// You can find out the num in
https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
ObjectStorageResponse AzureObjClient::delete_objects(const std::string& bucket,
- std::vector<std::string>
keys) {
+ std::vector<std::string>
keys,
+ ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
@@ -289,8 +290,9 @@ ObjectStorageResponse
AzureObjClient::delete_object(ObjectStoragePathRef path) {
}
ObjectStorageResponse
AzureObjClient::delete_objects_recursively(ObjectStoragePathRef path,
+
ObjClientOptions option,
int64_t
expiration_time) {
- return delete_objects_recursively_(path, expiration_time,
BlobBatchMaxOperations);
+ return delete_objects_recursively_(path, option, expiration_time,
BlobBatchMaxOperations);
}
ObjectStorageResponse AzureObjClient::get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/azure_obj_client.h
b/cloud/src/recycler/azure_obj_client.h
index 49b54ca8c6d..96212d720a3 100644
--- a/cloud/src/recycler/azure_obj_client.h
+++ b/cloud/src/recycler/azure_obj_client.h
@@ -38,12 +38,13 @@ public:
std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef
path) override;
- ObjectStorageResponse delete_objects(const std::string& bucket,
- std::vector<std::string> keys)
override;
+ ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys,
+ ObjClientOptions option) override;
ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;
ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
+ ObjClientOptions option,
int64_t expiration_time =
0) override;
ObjectStorageResponse get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/obj_storage_client.cpp
b/cloud/src/recycler/obj_storage_client.cpp
index 1dd6435214d..855fa110a4c 100644
--- a/cloud/src/recycler/obj_storage_client.cpp
+++ b/cloud/src/recycler/obj_storage_client.cpp
@@ -18,10 +18,13 @@
#include "recycler/obj_storage_client.h"
#include "cpp/sync_point.h"
+#include "recycler/sync_executor.h"
+#include "recycler/util.h"
namespace doris::cloud {
ObjectStorageResponse
ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path,
+ const
ObjClientOptions& option,
int64_t
expired_time,
size_t
batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_",
&batch_size);
@@ -29,6 +32,10 @@ ObjectStorageResponse
ObjStorageClient::delete_objects_recursively_(ObjectStorag
ObjectStorageResponse ret;
std::vector<std::string> keys;
+ SyncExecutor<int> concurrent_delete_executor(
+ option.executor,
+ fmt::format("delete objects under bucket {}, path {}",
path.bucket, path.key),
+ [](const int& ret) { return ret != 0; });
for (auto obj = list_iter->next(); obj.has_value(); obj =
list_iter->next()) {
if (expired_time > 0 && obj->mtime_s > expired_time) {
@@ -39,20 +46,31 @@ ObjectStorageResponse
ObjStorageClient::delete_objects_recursively_(ObjectStorag
if (keys.size() < batch_size) {
continue;
}
-
- ret = delete_objects(path.bucket, std::move(keys));
- if (ret.ret != 0) {
- return ret;
- }
+ concurrent_delete_executor.add([this, &path, k = std::move(keys),
option]() mutable {
+ return delete_objects(path.bucket, std::move(k), option).ret;
+ });
}
if (!list_iter->is_valid()) {
+ bool finished;
+ concurrent_delete_executor.when_all(&finished);
return {-1};
}
if (!keys.empty()) {
- return delete_objects(path.bucket, std::move(keys));
+ concurrent_delete_executor.add([this, &path, k = std::move(keys),
option]() mutable {
+ return delete_objects(path.bucket, std::move(k), option).ret;
+ });
}
+ bool finished = true;
+ std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+ for (int r : rets) {
+ if (r != 0) {
+ ret = -1;
+ }
+ }
+
+ ret = finished ? ret : -1;
return ret;
}
diff --git a/cloud/src/recycler/obj_storage_client.h
b/cloud/src/recycler/obj_storage_client.h
index 955a8c174e5..fc0211820d1 100644
--- a/cloud/src/recycler/obj_storage_client.h
+++ b/cloud/src/recycler/obj_storage_client.h
@@ -51,6 +51,12 @@ public:
virtual std::optional<ObjectMeta> next() = 0;
};
+class SimpleThreadPool;
+struct ObjClientOptions {
+ bool prefetch {true};
+ std::shared_ptr<SimpleThreadPool> executor;
+};
+
class ObjStorageClient {
public:
ObjStorageClient() = default;
@@ -71,7 +77,8 @@ public:
// According to the bucket and prefix specified by the user, it performs
batch deletion based on the object names in the object array.
virtual ObjectStorageResponse delete_objects(const std::string& bucket,
- std::vector<std::string>
keys) = 0;
+ std::vector<std::string> keys,
+ ObjClientOptions option) = 0;
// Delete the file named key in the object storage bucket.
virtual ObjectStorageResponse delete_object(ObjectStoragePathRef path) = 0;
@@ -79,6 +86,7 @@ public:
// According to the prefix, recursively delete all objects under the
prefix.
// If `expiration_time` > 0, only delete objects with mtime earlier than
`expiration_time`.
virtual ObjectStorageResponse
delete_objects_recursively(ObjectStoragePathRef path,
+ ObjClientOptions
option,
int64_t
expiration_time = 0) = 0;
// Get the objects' expiration time on the bucket
@@ -91,6 +99,7 @@ public:
protected:
ObjectStorageResponse delete_objects_recursively_(ObjectStoragePathRef
path,
+ const ObjClientOptions&
option,
int64_t expiration_time,
size_t batch_size);
};
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 204ff8d2f18..0b2267e601d 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -47,6 +47,7 @@
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "recycler/recycler_service.h"
+#include "recycler/sync_executor.h"
#include "recycler/util.h"
namespace doris::cloud {
@@ -166,6 +167,16 @@ static inline void check_recycle_task(const std::string&
instance_id, const std:
Recycler::Recycler(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv))
{
ip_port_ = std::string(butil::my_ip_cstr()) + ":" +
std::to_string(config::brpc_listen_port);
+ auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ s3_producer_pool->start();
+ auto recycle_tablet_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_tablet_pool->start();
+ auto group_recycle_function_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ group_recycle_function_pool->start();
+ _thread_pool_group =
+ RecyclerThreadPoolGroup(std::move(s3_producer_pool),
std::move(recycle_tablet_pool),
+ std::move(group_recycle_function_pool));
}
Recycler::~Recycler() {
@@ -225,7 +236,8 @@ void Recycler::recycle_callback() {
// skip instance in recycling
if (recycling_instance_map_.count(instance_id)) continue;
}
- auto instance_recycler = std::make_shared<InstanceRecycler>(txn_kv_,
instance);
+ auto instance_recycler =
+ std::make_shared<InstanceRecycler>(txn_kv_, instance,
_thread_pool_group);
if (instance_recycler->init() != 0) {
LOG(WARNING) << "failed to init instance recycler, instance_id="
<< instance_id;
continue;
@@ -441,11 +453,13 @@ private:
std::unordered_set<Key, HashOfKey> schemas_without_inverted_index_;
};
-InstanceRecycler::InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance)
+InstanceRecycler::InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance,
+ RecyclerThreadPoolGroup thread_pool_group)
: txn_kv_(std::move(txn_kv)),
instance_id_(instance.instance_id()),
instance_info_(instance),
-
inverted_index_id_cache_(std::make_unique<InvertedIndexIdCache>(instance_id_,
txn_kv_)) {}
+
inverted_index_id_cache_(std::make_unique<InvertedIndexIdCache>(instance_id_,
txn_kv_)),
+ _thread_pool_group(std::move(thread_pool_group)) {}
InstanceRecycler::~InstanceRecycler() = default;
@@ -542,22 +556,51 @@ int InstanceRecycler::init() {
return init_storage_vault_accessors();
}
+template <typename... Func>
+auto task_wrapper(Func... funcs) -> std::function<int()> {
+ return [funcs...]() {
+ return [](std::initializer_list<int> numbers) {
+ int i = 0;
+ for (int num : numbers) {
+ if (num != 0) {
+ i = num;
+ }
+ }
+ return i;
+ }({funcs()...});
+ };
+}
+
int InstanceRecycler::do_recycle() {
TEST_SYNC_POINT("InstanceRecycler.do_recycle");
if (instance_info_.status() == InstanceInfoPB::DELETED) {
return recycle_deleted_instance();
} else if (instance_info_.status() == InstanceInfoPB::NORMAL) {
- int ret = recycle_indexes();
- if (recycle_partitions() != 0) ret = -1;
- if (recycle_tmp_rowsets() != 0) ret = -1;
- if (recycle_rowsets() != 0) ret = -1;
- if (abort_timeout_txn() != 0) ret = -1;
- if (recycle_expired_txn_label() != 0) ret = -1;
- if (recycle_copy_jobs() != 0) ret = -1;
- if (recycle_stage() != 0) ret = -1;
- if (recycle_expired_stage_objects() != 0) ret = -1;
- if (recycle_versions() != 0) ret = -1;
- return ret;
+ SyncExecutor<int>
sync_executor(_thread_pool_group.group_recycle_function_pool,
+ fmt::format("instance id {}",
instance_id_),
+ [](int r) { return r != 0; });
+ sync_executor
+ .add(task_wrapper(
+ [this]() -> int { return
InstanceRecycler::recycle_indexes(); },
+ [this]() -> int { return
InstanceRecycler::recycle_partitions(); },
+ [this]() -> int { return
InstanceRecycler::recycle_tmp_rowsets(); },
+ [this]() -> int { return
InstanceRecycler::recycle_rowsets(); }))
+ .add(task_wrapper(
+ [this]() { return
InstanceRecycler::abort_timeout_txn(); },
+ [this]() { return
InstanceRecycler::recycle_expired_txn_label(); }))
+ .add(task_wrapper([this]() { return
InstanceRecycler::recycle_copy_jobs(); }))
+ .add(task_wrapper([this]() { return
InstanceRecycler::recycle_stage(); }))
+ .add(task_wrapper(
+ [this]() { return
InstanceRecycler::recycle_expired_stage_objects(); }))
+ .add(task_wrapper([this]() { return
InstanceRecycler::recycle_versions(); }));
+ bool finished = true;
+ std::vector<int> rets = sync_executor.when_all(&finished);
+ for (int ret : rets) {
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return finished ? 0 : -1;
} else {
LOG(WARNING) << "invalid instance status: " << instance_info_.status()
<< " instance_id=" << instance_id_;
@@ -1012,7 +1055,7 @@ int InstanceRecycler::recycle_versions() {
int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
int64_t partition_id,
bool is_empty_tablet) {
int num_scanned = 0;
- int num_recycled = 0;
+ std::atomic_int num_recycled = 0;
std::string tablet_key_begin, tablet_key_end;
std::string stats_key_begin, stats_key_end;
@@ -1054,12 +1097,20 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
.tag("num_recycled", num_recycled);
});
+ // The first string_view represents the tablet key which has been recycled
+ // The second bool represents whether the following fdb's tablet key
deletion could be done using range move or not
+ using TabletKeyPair = std::pair<std::string_view, bool>;
+ SyncExecutor<TabletKeyPair> sync_executor(
+ _thread_pool_group.recycle_tablet_pool,
+ fmt::format("recycle tablets, tablet id {}, index id {}, partition
id {}", table_id,
+ index_id, partition_id),
+ [](const TabletKeyPair& k) { return k.first.empty(); });
+
// Elements in `tablet_keys` has the same lifetime as `it` in
`scan_and_recycle`
- std::vector<std::string_view> tablet_keys;
std::vector<std::string> tablet_idx_keys;
std::vector<std::string> init_rs_keys;
- bool use_range_remove = true;
auto recycle_func = [&, is_empty_tablet, this](std::string_view k,
std::string_view v) -> int {
+ bool use_range_remove = true;
++num_scanned;
doris::TabletMetaCloudPB tablet_meta_pb;
if (!tablet_meta_pb.ParseFromArray(v.data(), v.size())) {
@@ -1070,13 +1121,19 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
int64_t tablet_id = tablet_meta_pb.tablet_id();
tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id_,
tablet_id}));
if (!is_empty_tablet) {
- if (recycle_tablet(tablet_id) != 0) {
- LOG_WARNING("failed to recycle tablet")
- .tag("instance_id", instance_id_)
- .tag("tablet_id", tablet_id);
- use_range_remove = false;
- return -1;
- }
+ sync_executor.add([this, &num_recycled, tid = tablet_id,
range_move = use_range_remove,
+ k]() mutable -> TabletKeyPair {
+ if (recycle_tablet(tid) != 0) {
+ LOG_WARNING("failed to recycle tablet")
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tid);
+ range_move = false;
+ return {std::string_view(), range_move};
+ }
+ ++num_recycled;
+ LOG_INFO("k is {}, is empty {}", k, k.empty());
+ return {k, range_move};
+ });
} else {
// Empty tablet only has a [0-1] init rowset
init_rs_keys.push_back(meta_rowset_key({instance_id_, tablet_id,
1}));
@@ -1100,19 +1157,38 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
}
return true;
}());
+ sync_executor.add([k]() mutable -> TabletKeyPair {
+ LOG_INFO("k is {}, is empty {}", k, k.empty());
+ return {k, true};
+ });
+ ++num_recycled;
}
- ++num_recycled;
- tablet_keys.push_back(k);
return 0;
};
+ // TODO(AlexYue): Add one ut to cover use_range_remove = false
auto loop_done = [&, this]() -> int {
+ bool finished = true;
+ auto tablet_keys = sync_executor.when_all(&finished);
+ if (!finished) {
+ LOG_WARNING("failed to recycle tablet").tag("instance_id",
instance_id_);
+ return -1;
+ }
+ sync_executor.reset();
if (tablet_keys.empty() && tablet_idx_keys.empty()) return 0;
+ // sort the vector using key's order
+ std::sort(tablet_keys.begin(), tablet_keys.end(),
+ [](const auto& prev, const auto& last) { return prev.first <
last.first; });
+ bool use_range_remove = true;
+ for (auto& [_, remove] : tablet_keys) {
+ if (!remove) {
+ use_range_remove = remove;
+ break;
+ }
+ }
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
- tablet_keys.clear();
tablet_idx_keys.clear();
init_rs_keys.clear();
- use_range_remove = true;
});
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
@@ -1122,10 +1198,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
std::string tablet_key_end;
if (!tablet_keys.empty()) {
if (use_range_remove) {
- tablet_key_end = std::string(tablet_keys.back()) + '\x00';
- txn->remove(tablet_keys.front(), tablet_key_end);
+ tablet_key_end = std::string(tablet_keys.back().first) +
'\x00';
+ txn->remove(tablet_keys.front().first, tablet_key_end);
} else {
- for (auto k : tablet_keys) {
+ for (auto& [k, _] : tablet_keys) {
txn->remove(k);
}
}
@@ -1312,13 +1388,26 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
}
}
}
+
+ SyncExecutor<int>
concurrent_delete_executor(_thread_pool_group.s3_producer_pool,
+ "delete_rowset_data",
+ [](const int& ret) { return
ret != 0; });
for (auto& [resource_id, file_paths] : resource_file_paths) {
- auto& accessor = accessor_map_[resource_id];
- DCHECK(accessor);
- if (accessor->delete_files(file_paths) != 0) {
+ concurrent_delete_executor.add([&, rid = &resource_id, paths =
&file_paths]() -> int {
+ auto& accessor = accessor_map_[*rid];
+ DCHECK(accessor);
+ return accessor->delete_files(*paths);
+ });
+ }
+ bool finished = true;
+ std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+ for (int r : rets) {
+ if (r != 0) {
ret = -1;
+ break;
}
}
+ ret = finished ? ret : -1;
return ret;
}
@@ -1377,15 +1466,32 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id)
{
ret = -1;
}
+ SyncExecutor<int> concurrent_delete_executor(
+ _thread_pool_group.s3_producer_pool,
+ fmt::format("delete tablet {} s3 rowset", tablet_id),
+ [](const int& ret) { return ret != 0; });
+
// delete all rowset data in this tablet
for (auto& [_, accessor] : accessor_map_) {
- if (accessor->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
- LOG(WARNING) << "failed to delete rowset data of tablet " <<
tablet_id
- << " s3_path=" << accessor->uri();
+ concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
+ if
((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
+ LOG(WARNING) << "failed to delete rowset data of tablet " <<
tablet_id
+ << " s3_path=" << accessor->uri();
+ return -1;
+ }
+ return 0;
+ });
+ }
+ bool finished = true;
+ std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
+ for (int r : rets) {
+ if (r != 0) {
ret = -1;
}
}
+ ret = finished ? ret : -1;
+
if (ret == 0) {
// All object files under tablet have been deleted
std::lock_guard lock(recycled_tablets_mtx_);
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 1085d9362d5..950c0193f77 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -40,6 +40,24 @@ class TxnKv;
class InstanceRecycler;
class StorageVaultAccessor;
class Checker;
+class SimpleThreadPool;
+struct RecyclerThreadPoolGroup {
+ RecyclerThreadPoolGroup() = default;
+ RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
+ std::shared_ptr<SimpleThreadPool>
recycle_tablet_pool,
+ std::shared_ptr<SimpleThreadPool>
group_recycle_function_pool)
+ : s3_producer_pool(std::move(s3_producer_pool)),
+ recycle_tablet_pool(std::move(recycle_tablet_pool)),
+
group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
+ ~RecyclerThreadPoolGroup() = default;
+ RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
+ RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) =
default;
+ RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) =
default;
+ RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
+ std::shared_ptr<SimpleThreadPool> s3_producer_pool;
+ std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
+ std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
+};
class Recycler {
public:
@@ -83,11 +101,13 @@ private:
WhiteBlackList instance_filter_;
std::unique_ptr<Checker> checker_;
+ RecyclerThreadPoolGroup _thread_pool_group;
};
class InstanceRecycler {
public:
- explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance);
+ explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance,
+ RecyclerThreadPoolGroup thread_pool_group);
~InstanceRecycler();
// returns 0 for success otherwise error
@@ -219,6 +239,7 @@ private:
std::mutex recycle_tasks_mutex;
// <task_name, start_time>>
std::map<std::string, int64_t> running_recycle_tasks;
+ RecyclerThreadPoolGroup _thread_pool_group;
};
} // namespace doris::cloud
diff --git a/cloud/src/recycler/recycler_service.cpp
b/cloud/src/recycler/recycler_service.cpp
index f0b8959a517..52c510fb2e7 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -151,7 +151,8 @@ void RecyclerServiceImpl::check_instance(const std::string&
instance_id, MetaSer
}
void recycle_copy_jobs(const std::shared_ptr<TxnKv>& txn_kv, const
std::string& instance_id,
- MetaServiceCode& code, std::string& msg) {
+ MetaServiceCode& code, std::string& msg,
+ RecyclerThreadPoolGroup thread_pool_group) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -188,13 +189,13 @@ void recycle_copy_jobs(const std::shared_ptr<TxnKv>&
txn_kv, const std::string&
return;
}
}
- auto recycler = std::make_unique<InstanceRecycler>(txn_kv, instance);
+
+ auto recycler = std::make_unique<InstanceRecycler>(txn_kv, instance,
thread_pool_group);
if (recycler->init() != 0) {
LOG(WARNING) << "failed to init InstanceRecycler recycle_copy_jobs on
instance "
<< instance_id;
return;
}
-
std::thread worker([recycler = std::move(recycler), instance_id] {
LOG(INFO) << "manually trigger recycle_copy_jobs on instance " <<
instance_id;
recycler->recycle_copy_jobs();
@@ -332,7 +333,7 @@ void
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
status_code = 400;
return;
}
- recycle_copy_jobs(txn_kv_, *instance_id, code, msg);
+ recycle_copy_jobs(txn_kv_, *instance_id, code, msg,
recycler_->_thread_pool_group);
response_body = msg;
return;
}
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index c993e0fb547..2c983a5fa06 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -36,6 +36,7 @@
#include "common/config.h"
#include "common/encryption_util.h"
#include "common/logging.h"
+#include "common/simple_thread_pool.h"
#include "common/string_util.h"
#include "common/util.h"
#include "cpp/obj_retry_strategy.h"
@@ -234,7 +235,15 @@ int S3Accessor::create(S3Conf conf,
std::shared_ptr<S3Accessor>* accessor) {
return (*accessor)->init();
}
+static std::shared_ptr<SimpleThreadPool> worker_pool;
+
int S3Accessor::init() {
+ static std::once_flag log_annotated_tags_key_once;
+ std::call_once(log_annotated_tags_key_once, [&]() {
+ LOG_INFO("start s3 accessor parallel worker pool");
+ worker_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ worker_pool->start();
+ });
switch (conf_.provider) {
case S3Conf::AZURE: {
#ifdef USE_AZURE
@@ -291,7 +300,7 @@ int S3Accessor::delete_prefix_impl(const std::string&
path_prefix, int64_t expir
LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix));
return obj_client_
->delete_objects_recursively({.bucket = conf_.bucket, .key =
get_key(path_prefix)},
- expiration_time)
+ {.executor = worker_pool},
expiration_time)
.ret;
}
@@ -333,7 +342,8 @@ int S3Accessor::delete_files(const
std::vector<std::string>& paths) {
keys.emplace_back(get_key(path));
}
- return obj_client_->delete_objects(conf_.bucket, std::move(keys)).ret;
+ return obj_client_->delete_objects(conf_.bucket, std::move(keys),
{.executor = worker_pool})
+ .ret;
}
int S3Accessor::delete_file(const std::string& path) {
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 91f75765a9b..41adc93f04f 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -37,6 +37,7 @@ class S3RateLimiterHolder;
enum class S3RateLimitType;
namespace cloud {
class ObjectStoreInfoPB;
+class SimpleThreadPool;
namespace s3_bvar {
extern bvar::LatencyRecorder s3_get_latency;
diff --git a/cloud/src/recycler/s3_obj_client.cpp
b/cloud/src/recycler/s3_obj_client.cpp
index d1307de53b7..fc0c7e9e901 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -201,7 +201,8 @@ std::unique_ptr<ObjectListIterator>
S3ObjClient::list_objects(ObjectStoragePathR
}
ObjectStorageResponse S3ObjClient::delete_objects(const std::string& bucket,
- std::vector<std::string>
keys) {
+ std::vector<std::string>
keys,
+ ObjClientOptions option) {
if (keys.empty()) {
return {0};
}
@@ -295,8 +296,9 @@ ObjectStorageResponse
S3ObjClient::delete_object(ObjectStoragePathRef path) {
}
ObjectStorageResponse
S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,
+ ObjClientOptions
option,
int64_t
expiration_time) {
- return delete_objects_recursively_(path, expiration_time, MaxDeleteBatch);
+ return delete_objects_recursively_(path, option, expiration_time,
MaxDeleteBatch);
}
ObjectStorageResponse S3ObjClient::get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/s3_obj_client.h
b/cloud/src/recycler/s3_obj_client.h
index 7804d081816..c486fdc18dc 100644
--- a/cloud/src/recycler/s3_obj_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -39,12 +39,13 @@ public:
std::unique_ptr<ObjectListIterator> list_objects(ObjectStoragePathRef
path) override;
- ObjectStorageResponse delete_objects(const std::string& bucket,
- std::vector<std::string> keys)
override;
+ ObjectStorageResponse delete_objects(const std::string& bucket,
std::vector<std::string> keys,
+ ObjClientOptions option) override;
ObjectStorageResponse delete_object(ObjectStoragePathRef path) override;
ObjectStorageResponse delete_objects_recursively(ObjectStoragePathRef path,
+ ObjClientOptions option,
int64_t expiration_time =
0) override;
ObjectStorageResponse get_life_cycle(const std::string& bucket,
diff --git a/cloud/src/recycler/sync_executor.h
b/cloud/src/recycler/sync_executor.h
new file mode 100644
index 00000000000..d7009a99ed4
--- /dev/null
+++ b/cloud/src/recycler/sync_executor.h
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/countdown_event.h>
+#include <fmt/core.h>
+#include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
+
+#include <future>
+#include <iostream>
+#include <string>
+
+#include "common/logging.h"
+#include "common/simple_thread_pool.h"
+
+namespace doris::cloud {
+
+template <typename T>
+class SyncExecutor {
+public:
+ SyncExecutor(
+ std::shared_ptr<SimpleThreadPool> pool, std::string name_tag,
+ std::function<bool(const T&)> cancel = [](const T& /**/) { return
false; })
+ : _pool(std::move(pool)), _cancel(std::move(cancel)),
_name_tag(std::move(name_tag)) {}
+ auto add(std::function<T()> callback) -> SyncExecutor<T>& {
+ auto task = std::make_unique<Task>(std::move(callback), _cancel,
_count);
+ _count.add_count();
+ // The actual task logic would be wrapped by one promise and passed to
the threadpool.
+ // The result would be returned by the future once the task is
finished.
+ // Or the task would be invalid if the whole task is cancelled.
+ int r = _pool->submit([this, t = task.get()]() { (*t)(_stop_token); });
+ CHECK(r == 0);
+ _res.emplace_back(std::move(task));
+ return *this;
+ }
+ std::vector<T> when_all(bool* finished) {
+ timespec current_time;
+ auto current_time_second = time(nullptr);
+ current_time.tv_sec = current_time_second + 300;
+ current_time.tv_nsec = 0;
+ auto msg = fmt::format("{} has already taken 5 min", _name_tag);
+ while (0 != _count.timed_wait(current_time)) {
+ current_time.tv_sec += 300;
+ LOG(WARNING) << msg;
+ }
+ *finished = !_stop_token;
+ std::vector<T> res;
+ res.reserve(_res.size());
+ for (auto& task : _res) {
+ if (!task->valid()) {
+ *finished = false;
+ return res;
+ }
+ res.emplace_back((*task).get());
+ }
+ return res;
+ }
+ void reset() {
+ _res.clear();
+ _stop_token = false;
+ }
+
+private:
+ class Task {
+ public:
+ Task(std::function<T()> callback, std::function<bool(const T&)> cancel,
+ bthread::CountdownEvent& count)
+ : _callback(std::move(callback)),
+ _cancel(std::move(cancel)),
+ _count(count),
+ _fut(_pro.get_future()) {}
+ void operator()(std::atomic_bool& stop_token) {
+ std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
+ [&](int*) {
_count.signal(); });
+ if (stop_token) {
+ _valid = false;
+ return;
+ }
+ T t = _callback();
+ // We'll return this task result to user even if this task return
error
+ // So we don't set _valid to false here
+ if (_cancel(t)) {
+ stop_token = true;
+ }
+ _pro.set_value(std::move(t));
+ }
+ bool valid() { return _valid; }
+ T get() { return _fut.get(); }
+
+ private:
+ // It's guarantted that the valid function can only be called inside
SyncExecutor's `when_all()` function
+ // and only be called when the _count.timed_wait function returned. So
there would be no data race for
+ // _valid then it doesn't need to be one atomic bool.
+ bool _valid = true;
+ std::function<T()> _callback;
+ std::function<bool(const T&)> _cancel;
+ std::promise<T> _pro;
+ bthread::CountdownEvent& _count;
+ std::future<T> _fut;
+ };
+ std::vector<std::unique_ptr<Task>> _res;
+ // use CountdownEvent to do periodically log using
CountdownEvent::time_wait()
+ bthread::CountdownEvent _count {0};
+ std::atomic_bool _stop_token {false};
+ std::shared_ptr<SimpleThreadPool> _pool;
+ std::function<bool(const T&)> _cancel;
+ std::string _name_tag;
+};
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h
index 6c62bcaf7b0..b63090062bf 100644
--- a/cloud/src/recycler/util.h
+++ b/cloud/src/recycler/util.h
@@ -19,6 +19,7 @@
#include <fmt/core.h>
#include <gen_cpp/cloud.pb.h>
+#include <glog/logging.h>
#include <string>
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index a27f7a1c538..ba5e2909918 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -50,10 +50,10 @@ add_executable(s3_accessor_test s3_accessor_test.cpp)
add_executable(hdfs_accessor_test hdfs_accessor_test.cpp)
-add_executable(util_test util_test.cpp)
-
add_executable(stopwatch_test stopwatch_test.cpp)
+add_executable(util_test util_test.cpp)
+
add_executable(network_util_test network_util_test.cpp)
message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
@@ -85,10 +85,10 @@ target_link_libraries(s3_accessor_test ${TEST_LINK_LIBS})
target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS})
-target_link_libraries(util_test ${TEST_LINK_LIBS})
-
target_link_libraries(stopwatch_test ${TEST_LINK_LIBS})
+target_link_libraries(util_test ${TEST_LINK_LIBS})
+
target_link_libraries(network_util_test ${TEST_LINK_LIBS})
# FDB related tests need to be linked with libfdb_c
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 3f9d1175746..ca8ffbcee61 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -29,6 +29,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/simple_thread_pool.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
@@ -48,6 +49,8 @@ static const std::string instance_id =
"instance_id_recycle_test";
static int64_t current_time = 0;
static constexpr int64_t db_id = 1000;
+static doris::cloud::RecyclerThreadPoolGroup thread_group;
+
int main(int argc, char** argv) {
auto conf_file = "doris_cloud.conf";
if (!cloud::config::init(conf_file, true)) {
@@ -63,6 +66,16 @@ int main(int argc, char** argv) {
current_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
::testing::InitGoogleTest(&argc, argv);
+ auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ s3_producer_pool->start();
+ auto recycle_tablet_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ recycle_tablet_pool->start();
+ auto group_recycle_function_pool =
+
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ group_recycle_function_pool->start();
+ thread_group =
+ RecyclerThreadPoolGroup(std::move(s3_producer_pool),
std::move(recycle_tablet_pool),
+ std::move(group_recycle_function_pool));
return RUN_ALL_TESTS();
}
@@ -618,7 +631,7 @@ TEST(RecyclerTest, recycle_empty) {
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_empty");
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
ASSERT_EQ(recycler.recycle_rowsets(), 0);
@@ -651,7 +664,7 @@ TEST(RecyclerTest, recycle_rowsets) {
sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) {
++insert_inverted_index; });
sp->enable_processing();
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -716,7 +729,7 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
config::instance_recycler_worker_pool_size = 10;
config::recycle_task_threshold_seconds = 0;
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto sp = SyncPoint::get_instance();
@@ -799,7 +812,7 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) {
++insert_inverted_index; });
sp->enable_processing();
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -861,7 +874,7 @@ TEST(RecyclerTest, recycle_tablet) {
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_tablet");
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -936,7 +949,7 @@ TEST(RecyclerTest, recycle_indexes) {
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_indexes");
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -1048,7 +1061,7 @@ TEST(RecyclerTest, recycle_partitions) {
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("recycle_partitions");
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
std::vector<doris::TabletSchemaCloudPB> schemas;
@@ -1159,7 +1172,7 @@ TEST(RecyclerTest, recycle_versions) {
InstanceInfoPB instance;
instance.set_instance_id(instance_id);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
// Recycle all partitions in table except 30006
ASSERT_EQ(recycler.recycle_partitions(), 0);
@@ -1228,7 +1241,7 @@ TEST(RecyclerTest, abort_timeout_txn) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
sleep(1);
ASSERT_EQ(recycler.abort_timeout_txn(), 0);
@@ -1271,7 +1284,7 @@ TEST(RecyclerTest, abort_timeout_txn_and_rebegin) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
sleep(1);
ASSERT_EQ(recycler.abort_timeout_txn(), 0);
@@ -1338,7 +1351,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
recycler.abort_timeout_txn();
TxnInfoPB txn_info_pb;
@@ -1389,7 +1402,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
sleep(1);
recycler.abort_timeout_txn();
@@ -1441,7 +1454,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
sleep(1);
recycler.abort_timeout_txn();
@@ -1500,7 +1513,7 @@ TEST(RecyclerTest, recycle_expired_txn_label) {
}
InstanceInfoPB instance;
instance.set_instance_id(mock_instance);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
sleep(1);
recycler.abort_timeout_txn();
@@ -1637,7 +1650,7 @@ TEST(RecyclerTest, recycle_copy_jobs) {
InstanceInfoPB instance_info;
create_instance(internal_stage_id, external_stage_id, instance_info);
- InstanceRecycler recycler(txn_kv, instance_info);
+ InstanceRecycler recycler(txn_kv, instance_info, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto internal_accessor =
recycler.accessor_map_.find(internal_stage_id)->second;
@@ -1796,7 +1809,7 @@ TEST(RecyclerTest, recycle_batch_copy_jobs) {
InstanceInfoPB instance_info;
create_instance(internal_stage_id, external_stage_id, instance_info);
- InstanceRecycler recycler(txn_kv, instance_info);
+ InstanceRecycler recycler(txn_kv, instance_info, thread_group);
ASSERT_EQ(recycler.init(), 0);
const auto& internal_accessor =
recycler.accessor_map_.find(internal_stage_id)->second;
@@ -1910,7 +1923,7 @@ TEST(RecyclerTest, recycle_stage) {
instance.set_instance_id(mock_instance);
instance.add_obj_info()->CopyFrom(object_info);
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
for (int i = 0; i < 10; ++i) {
@@ -1970,7 +1983,7 @@ TEST(RecyclerTest, recycle_deleted_instance) {
InstanceInfoPB instance_info;
create_instance(internal_stage_id, external_stage_id, instance_info);
- InstanceRecycler recycler(txn_kv, instance_info);
+ InstanceRecycler recycler(txn_kv, instance_info, thread_group);
ASSERT_EQ(recycler.init(), 0);
// create txn key
for (size_t i = 0; i < 100; i++) {
@@ -2555,7 +2568,7 @@ TEST(RecyclerTest, delete_rowset_data) {
}
{
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
int64_t txn_id_base = 114115;
@@ -2589,7 +2602,7 @@ TEST(RecyclerTest, delete_rowset_data) {
tmp_obj_info->set_bucket(config::test_s3_bucket);
tmp_obj_info->set_prefix(resource_id);
- InstanceRecycler recycler(txn_kv, tmp_instance);
+ InstanceRecycler recycler(txn_kv, tmp_instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
@@ -2609,7 +2622,7 @@ TEST(RecyclerTest, delete_rowset_data) {
ASSERT_FALSE(list_iter->has_next());
}
{
- InstanceRecycler recycler(txn_kv, instance);
+ InstanceRecycler recycler(txn_kv, instance, thread_group);
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
// Delete multiple rowset files using one series of RowsetPB
diff --git a/cloud/test/util_test.cpp b/cloud/test/util_test.cpp
index 02921170761..c88ef555f82 100644
--- a/cloud/test/util_test.cpp
+++ b/cloud/test/util_test.cpp
@@ -15,17 +15,36 @@
// specific language governing permissions and limitations
// under the License.
+#include "recycler/util.h"
+
+#include <chrono>
#include <string>
+#include <string_view>
+#include <thread>
#include <tuple>
#include <vector>
#include "common/config.h"
+#include "common/logging.h"
+#include "common/simple_thread_pool.h"
#include "common/string_util.h"
-#include "glog/logging.h"
#include "gtest/gtest.h"
+#include "recycler/recycler.h"
+#include "recycler/sync_executor.h"
+
+using namespace doris::cloud;
int main(int argc, char** argv) {
- doris::cloud::config::init(nullptr, true);
+ const auto* conf_file = "doris_cloud.conf";
+ if (!config::init(conf_file, true)) {
+ std::cerr << "failed to init config file, conf=" << conf_file <<
std::endl;
+ return -1;
+ }
+ if (!::init_glog("util")) {
+ std::cerr << "failed to init glog" << std::endl;
+ return -1;
+ }
+
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
@@ -89,3 +108,130 @@ TEST(StringUtilTest, test_string_strip) {
// clang-format on
}
+
+template <typename... Func>
+auto task_wrapper(Func... funcs) -> std::function<int()> {
+ return [funcs...]() {
+ return [](std::initializer_list<int> numbers) {
+ int i = 0;
+ for (int num : numbers) {
+ if (num != 0) {
+ i = num;
+ }
+ }
+ return i;
+ }({funcs()...});
+ };
+}
+
+TEST(UtilTest, stage_wrapper) {
+ std::function<int()> func1 = []() { return 0; };
+ std::function<int()> func2 = []() { return -1; };
+ std::function<int()> func3 = []() { return 0; };
+ auto f = task_wrapper(func1, func2, func3);
+ ASSERT_EQ(-1, f());
+
+ f = task_wrapper(func1, func3);
+ ASSERT_EQ(0, f());
+}
+
+TEST(UtilTest, delay) {
+ auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ s3_producer_pool->start();
+ // test normal execute
+ {
+ SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+ [](int k) { return k == -1; });
+ auto f1 = []() { return -1; };
+ auto f2 = []() {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return 1;
+ };
+ sync_executor.add(f2);
+ sync_executor.add(f2);
+ sync_executor.add(f1);
+ bool finished = true;
+ std::vector<int> res = sync_executor.when_all(&finished);
+ ASSERT_EQ(finished, false);
+ ASSERT_EQ(3, res.size());
+ }
+ // test normal execute
+ {
+ SyncExecutor<std::string_view> sync_executor(
+ s3_producer_pool, "normal test",
+ [](const std::string_view k) { return k.empty(); });
+ auto f1 = []() { return ""; };
+ auto f2 = []() {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ return "fake";
+ };
+ sync_executor.add(f2);
+ sync_executor.add(f2);
+ sync_executor.add(f1);
+ bool finished = true;
+ auto res = sync_executor.when_all(&finished);
+ ASSERT_EQ(finished, false);
+ ASSERT_EQ(3, res.size());
+ }
+}
+
+TEST(UtilTest, normal) {
+ auto s3_producer_pool =
std::make_shared<SimpleThreadPool>(config::recycle_pool_parallelism);
+ s3_producer_pool->start();
+ // test normal execute
+ {
+ SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+ [](int k) { return k == -1; });
+ auto f1 = []() { return 1; };
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ bool finished = true;
+ std::vector<int> res = sync_executor.when_all(&finished);
+ ASSERT_EQ(3, res.size());
+ ASSERT_EQ(finished, true);
+ std::for_each(res.begin(), res.end(), [](auto&& n) { ASSERT_EQ(1, n);
});
+ }
+ // test when error happen
+ {
+ SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+ [](int k) { return k == -1; });
+ auto f1 = []() { return 1; };
+ sync_executor._stop_token = true;
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ bool finished = true;
+ std::vector<int> res = sync_executor.when_all(&finished);
+ ASSERT_EQ(finished, false);
+ ASSERT_EQ(0, res.size());
+ }
+ {
+ SyncExecutor<int> sync_executor(s3_producer_pool, "normal test",
+ [](int k) { return k == -1; });
+ auto f1 = []() { return 1; };
+ auto cancel = []() { return -1; };
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ sync_executor.add(cancel);
+ bool finished = true;
+ std::vector<int> res = sync_executor.when_all(&finished);
+ ASSERT_EQ(finished, false);
+ }
+ // test string_view
+ {
+ SyncExecutor<std::string_view> sync_executor(
+ s3_producer_pool, "normal test",
+ [](const std::string_view k) { return k.empty(); });
+ std::string s = "Hello World";
+ auto f1 = [&s]() { return std::string_view(s); };
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ sync_executor.add(f1);
+ bool finished = true;
+ std::vector<std::string_view> res = sync_executor.when_all(&finished);
+ ASSERT_EQ(3, res.size());
+ std::for_each(res.begin(), res.end(), [&s](auto&& n) { ASSERT_EQ(s,
n); });
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]