This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5fdfb9bb90 [chore](cloud) Add a defer utility `DORIS_CLOUD_DEFER`
(#52041)
a5fdfb9bb90 is described below
commit a5fdfb9bb90c3b4cbe34ef6e72fd2bb98a064fb7
Author: walter <[email protected]>
AuthorDate: Mon Jun 23 22:04:56 2025 +0800
[chore](cloud) Add a defer utility `DORIS_CLOUD_DEFER` (#52041)
---
.clang-format-ignore | 1 +
cloud/src/common/defer.h | 90 ++++++++++++++++++
cloud/src/common/network_util.cpp | 5 +-
cloud/src/meta-service/meta_service.cpp | 9 +-
cloud/src/meta-service/meta_service_helper.h | 39 ++++----
cloud/src/meta-service/meta_service_job.cpp | 36 ++++----
cloud/src/meta-service/meta_service_resource.cpp | 43 ++++-----
cloud/src/meta-service/meta_service_txn.cpp | 25 +++--
cloud/src/meta-service/txn_kv.cpp | 10 +-
cloud/src/recycler/checker.cpp | 12 +--
cloud/src/recycler/hdfs_accessor.cpp | 5 +-
cloud/src/recycler/recycler.cpp | 95 +++++++++----------
cloud/src/recycler/recycler_service.cpp | 34 ++++---
cloud/src/recycler/sync_executor.h | 10 +-
cloud/src/recycler/util.h | 11 ++-
cloud/src/resource-manager/resource_manager.cpp | 30 +++---
cloud/test/encryption_test.cpp | 26 ++++--
cloud/test/http_encode_key_test.cpp | 6 +-
cloud/test/meta_server_test.cpp | 5 +-
cloud/test/meta_service_http_test.cpp | 16 ++--
cloud/test/meta_service_job_test.cpp | 86 ++++++++++-------
cloud/test/meta_service_test.cpp | 95 +++++++++++--------
cloud/test/recycler_test.cpp | 112 +++++++++++++----------
cloud/test/schema_kv_test.cpp | 29 +++---
cloud/test/txn_kv_test.cpp | 11 ++-
25 files changed, 502 insertions(+), 339 deletions(-)
diff --git a/.clang-format-ignore b/.clang-format-ignore
index fbbc4cb0446..5f10e2a2212 100644
--- a/.clang-format-ignore
+++ b/.clang-format-ignore
@@ -9,3 +9,4 @@ be/src/util/sse2neon.h
be/src/util/mustache/mustache.h
be/src/util/mustache/mustache.cc
be/src/util/utf8_check.cpp
+cloud/src/common/defer.h
diff --git a/cloud/src/common/defer.h b/cloud/src/common/defer.h
new file mode 100644
index 00000000000..ab087aeb2fb
--- /dev/null
+++ b/cloud/src/common/defer.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <type_traits>
+#include <utility>
+
+namespace doris::cloud {
+
+// A simple RAII class to defer the execution of a function until the end of
the
+// scope.
+template <typename Fn>
+requires std::is_invocable_v<Fn>
+class DeferFn {
+public:
+ DeferFn(Fn &&fn) : fn_(std::move(fn)) {}
+ DeferFn(const DeferFn &) = delete;
+ DeferFn &operator=(const DeferFn &) = delete;
+ ~DeferFn() { fn_(); }
+
+private:
+ Fn fn_;
+};
+
+} // namespace doris::cloud
+
+// A macro to create a DeferFn object that will execute the given function
+// when it goes out of scope. This is useful for cleanup tasks or finalization
+// actions that should always run, regardless of how the scope is exited (e.g.
+// normal return, exception thrown, etc.).
+//
+// Usage:
+// DORIS_CLOUD_DEFER {
+// // Code to execute at the end of the scope
+// };
+//
+#define DORIS_CLOUD_DEFER_IMPL(line, counter)
\
+ ::doris::cloud::DeferFn defer_fn_##line##_##counter = [&]()
+#define DORIS_CLOUD_DEFER_EXPAND(line, counter)
\
+ DORIS_CLOUD_DEFER_IMPL(line, counter)
+#define DORIS_CLOUD_DEFER DORIS_CLOUD_DEFER_EXPAND(__LINE__, __COUNTER__)
+
+// A macro to create a DeferFn object that will execute the given function
+// with additional parameters when it goes out of scope. This is useful for
+// cleanup tasks or finalization actions that should always run, regardless of
+// how the scope is exited (e.g. normal return, exception thrown, etc.).
+//
+// Usage:
+// DORIS_CLOUD_DEFER_COPY(param1, param2) {
+// // Code to execute at the end of the scope, using param1 and param2
+// };
+//
+// Note: The parameters are captured by copy, so they can be used safely, for
+// example,
+// void foo(int &a, int &b) {
+// DORIS_CLOUD_DEFER_COPY(a, b) mutable {
+// a += 1;
+// b *= 2;
+// };
+// }
+//
+// int x = 1, y = 2;
+// foo(x, y);
+// assert(x == 1 && y == 2);
+//
+// The captured parameters are passed by value, so they modifications inside
the
+// deferred function do not affect the original variables outside the scope, or
+// the modifications after the definition of the defer function will not affect
+// the captured values.
+#define DORIS_CLOUD_DEFER_COPY_IMPL(line, counter, ...)
\
+ ::doris::cloud::DeferFn defer_fn_##line##_##counter = [&, __VA_ARGS__ ]()
+#define DORIS_CLOUD_DEFER_COPY_EXPAND(line, counter, ...)
\
+ DORIS_CLOUD_DEFER_COPY_IMPL(line, counter, __VA_ARGS__)
+#define DORIS_CLOUD_DEFER_COPY(...)
\
+ DORIS_CLOUD_DEFER_COPY_EXPAND(__LINE__, __COUNTER__, __VA_ARGS__)
diff --git a/cloud/src/common/network_util.cpp
b/cloud/src/common/network_util.cpp
index afec16200e0..5ac8483cc14 100644
--- a/cloud/src/common/network_util.cpp
+++ b/cloud/src/common/network_util.cpp
@@ -30,6 +30,7 @@
#include <vector>
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
namespace doris::cloud {
@@ -161,7 +162,7 @@ static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
std::string get_local_ip(const std::string& priority_networks) {
std::string localhost_str = butil::my_ip_cstr();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&localhost_str](int*) {
+ DORIS_CLOUD_DEFER {
// Check if ip eq 127.0.0.1, ms/recycler exit
LOG(INFO) << "get the IP for ms is " << localhost_str;
if (config::enable_loopback_address_for_ms || localhost_str !=
"127.0.0.1") return;
@@ -170,7 +171,7 @@ std::string get_local_ip(const std::string&
priority_networks) {
<< "please set priority_networks with a CIDR expression
in doris_cloud.conf "
<< "to choose a non-loopback address accordingly";
exit(-1);
- });
+ };
if (priority_networks == "") {
LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" <<
localhost_str;
return localhost_str;
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 172c8af129c..528b94d5d4c 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1543,11 +1543,10 @@ void internal_get_rowset(Transaction* txn, int64_t
start, int64_t end,
std::unique_ptr<RangeGetIterator> it;
int num_rowsets = 0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
- (int*)0x01, [key0, key1, &num_rowsets](int*) {
- LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets <<
" range=["
- << hex(key0) << "," << hex(key1) << "]";
- });
+ DORIS_CLOUD_DEFER_COPY(key0, key1) {
+ LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets << "
range=[" << hex(key0)
+ << "," << hex(key1) << "]";
+ };
std::stringstream ss;
do {
diff --git a/cloud/src/meta-service/meta_service_helper.h
b/cloud/src/meta-service/meta_service_helper.h
index cd9ed2f7f1d..5e724eae14f 100644
--- a/cloud/src/meta-service/meta_service_helper.h
+++ b/cloud/src/meta-service/meta_service_helper.h
@@ -28,6 +28,7 @@
#include "common/bvars.h"
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "common/stopwatch.h"
#include "common/util.h"
@@ -225,25 +226,25 @@ inline MetaServiceCode cast_as(TxnErrorCode code) {
}
}
-#define RPC_PREPROCESS(func_name)
\
- StopWatch sw;
\
- auto ctrl = static_cast<brpc::Controller*>(controller);
\
- begin_rpc(#func_name, ctrl, request);
\
- brpc::ClosureGuard closure_guard(done);
\
- [[maybe_unused]] std::stringstream ss;
\
- [[maybe_unused]] MetaServiceCode code = MetaServiceCode::OK;
\
- [[maybe_unused]] std::string msg;
\
- [[maybe_unused]] std::string instance_id;
\
- [[maybe_unused]] bool drop_request = false;
\
- std::unique_ptr<int, std::function<void(int*)>> defer_status((int*)0x01,
[&](int*) { \
- response->mutable_status()->set_code(code);
\
- response->mutable_status()->set_msg(msg);
\
- finish_rpc(#func_name, ctrl, response);
\
- closure_guard.reset(nullptr);
\
- if (config::use_detailed_metrics && !instance_id.empty() &&
!drop_request) { \
- g_bvar_ms_##func_name.put(instance_id, sw.elapsed_us());
\
- }
\
- });
+#define RPC_PREPROCESS(func_name)
\
+ StopWatch sw;
\
+ auto ctrl = static_cast<brpc::Controller*>(controller);
\
+ begin_rpc(#func_name, ctrl, request);
\
+ brpc::ClosureGuard closure_guard(done);
\
+ [[maybe_unused]] std::stringstream ss;
\
+ [[maybe_unused]] MetaServiceCode code = MetaServiceCode::OK;
\
+ [[maybe_unused]] std::string msg;
\
+ [[maybe_unused]] std::string instance_id;
\
+ [[maybe_unused]] bool drop_request = false;
\
+ DORIS_CLOUD_DEFER {
\
+ response->mutable_status()->set_code(code);
\
+ response->mutable_status()->set_msg(msg);
\
+ finish_rpc(#func_name, ctrl, response);
\
+ closure_guard.reset(nullptr);
\
+ if (config::use_detailed_metrics && !instance_id.empty() &&
!drop_request) { \
+ g_bvar_ms_##func_name.put(instance_id, sw.elapsed_us());
\
+ }
\
+ };
#define RPC_RATE_LIMIT(func_name)
\
if (config::enable_rate_limit && config::use_detailed_metrics &&
!instance_id.empty()) { \
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 1e019ac2728..25f3957e325 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -454,17 +454,16 @@ void
MetaServiceImpl::start_tablet_job(::google::protobuf::RpcController* contro
}
bool need_commit = false;
- std::unique_ptr<int, std::function<void(int*)>> defer_commit(
- (int*)0x01, [&ss, &txn, &code, &msg, &need_commit](int*) {
- if (!need_commit) return;
- TxnErrorCode err = txn->commit();
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::COMMIT>(err);
- ss << "failed to commit job kv, err=" << err;
- msg = ss.str();
- return;
- }
- });
+ DORIS_CLOUD_DEFER {
+ if (!need_commit) return;
+ TxnErrorCode err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit job kv, err=" << err;
+ msg = ss.str();
+ return;
+ }
+ };
if (!request->job().compaction().empty()) {
start_compaction_job(code, msg, ss, txn, request, response,
instance_id, need_commit);
@@ -930,11 +929,10 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
std::unique_ptr<RangeGetIterator> it;
int num_rowsets = 0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
- (int*)0x01, [&rs_start, &rs_end, &num_rowsets, &instance_id](int*)
{
- INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" <<
num_rowsets << " range=["
- << hex(rs_start) << "," << hex(rs_end) <<
"]";
- });
+ DORIS_CLOUD_DEFER {
+ INSTANCE_LOG(INFO) << "get rowset meta, num_rowsets=" << num_rowsets
<< " range=["
+ << hex(rs_start) << "," << hex(rs_end) << "]";
+ };
auto rs_start1 = rs_start;
do {
@@ -1544,9 +1542,7 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
<< " job=" << proto_to_json(recorded_job);
FinishTabletJobRequest_Action action = request->action();
- std::unique_ptr<int, std::function<void(int*)>> defer_commit((int*)0x01,
[&ss, &txn, &code,
-
&msg, &need_commit,
-
&action](int*) {
+ DORIS_CLOUD_DEFER {
if (!need_commit) return;
TxnErrorCode err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
@@ -1564,7 +1560,7 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
msg = ss.str();
return;
}
- });
+ };
std::string use_version =
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
LOG(INFO) << "finish_tablet_job instance_id=" << instance_id << "
use_version=" << use_version;
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 8c8b0646c94..44389a41e2b 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -1780,17 +1780,16 @@ void
MetaServiceImpl::alter_instance(google::protobuf::RpcController* controller
std::string msg = "OK";
[[maybe_unused]] std::stringstream ss;
std::string instance_id = request->has_instance_id() ?
request->instance_id() : "";
- std::unique_ptr<int, std::function<void(int*)>> defer_status(
- (int*)0x01, [&code, &msg, &response, &ctrl, &closure_guard, &sw,
&instance_id](int*) {
- response->mutable_status()->set_code(code);
- response->mutable_status()->set_msg(msg);
- LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " :
"failed to ")
- << __PRETTY_FUNCTION__ << " " << ctrl->remote_side()
<< " " << msg;
- closure_guard.reset(nullptr);
- if (config::use_detailed_metrics && !instance_id.empty()) {
- g_bvar_ms_alter_instance.put(instance_id, sw.elapsed_us());
- }
- });
+ DORIS_CLOUD_DEFER {
+ response->mutable_status()->set_code(code);
+ response->mutable_status()->set_msg(msg);
+ LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ")
+ << __PRETTY_FUNCTION__ << " " << ctrl->remote_side() << " "
<< msg;
+ closure_guard.reset(nullptr);
+ if (config::use_detailed_metrics && !instance_id.empty()) {
+ g_bvar_ms_alter_instance.put(instance_id, sw.elapsed_us());
+ }
+ };
std::pair<MetaServiceCode, std::string> ret;
switch (request->op()) {
@@ -3039,18 +3038,16 @@ void
MetaServiceImpl::drop_stage(google::protobuf::RpcController* controller,
std::string msg = "OK";
std::string instance_id;
bool drop_request = false;
- std::unique_ptr<int, std::function<void(int*)>> defer_status(
- (int*)0x01, [&ret, &code, &msg, &response, &ctrl, &closure_guard,
&sw, &instance_id,
- &drop_request](int*) {
- response->mutable_status()->set_code(code);
- response->mutable_status()->set_msg(msg);
- LOG(INFO) << (ret == 0 ? "succ to " : "failed to ") <<
__PRETTY_FUNCTION__ << " "
- << ctrl->remote_side() << " " << msg;
- closure_guard.reset(nullptr);
- if (config::use_detailed_metrics && !instance_id.empty() &&
!drop_request) {
- g_bvar_ms_drop_stage.put(instance_id, sw.elapsed_us());
- }
- });
+ DORIS_CLOUD_DEFER {
+ response->mutable_status()->set_code(code);
+ response->mutable_status()->set_msg(msg);
+ LOG(INFO) << (ret == 0 ? "succ to " : "failed to ") <<
__PRETTY_FUNCTION__ << " "
+ << ctrl->remote_side() << " " << msg;
+ closure_guard.reset(nullptr);
+ if (config::use_detailed_metrics && !instance_id.empty() &&
!drop_request) {
+ g_bvar_ms_drop_stage.put(instance_id, sw.elapsed_us());
+ }
+ };
std::string cloud_unique_id = request->has_cloud_unique_id() ?
request->cloud_unique_id() : "";
if (cloud_unique_id.empty()) {
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 04d12657ed5..1709d867fd0 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -817,12 +817,10 @@ void scan_tmp_rowset(
meta_rowset_tmp_key(rs_tmp_key_info1, &rs_tmp_key1);
int num_rowsets = 0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
- (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets,
&txn_id](int*) {
- LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
- << " num_rowsets=" << num_rowsets << " range=[" <<
hex(rs_tmp_key0) << ","
- << hex(rs_tmp_key1) << ")";
- });
+ DORIS_CLOUD_DEFER_COPY(rs_tmp_key0, rs_tmp_key1) {
+ LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id << "
num_rowsets=" << num_rowsets
+ << " range=[" << hex(rs_tmp_key0) << "," << hex(rs_tmp_key1)
<< ")";
+ };
std::unique_ptr<RangeGetIterator> it;
do {
@@ -1573,11 +1571,11 @@ void commit_txn_eventually(
MetaServiceCode& code, std::string& msg, const std::string&
instance_id, int64_t db_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>&
tmp_rowsets_meta) {
StopWatch sw;
- std::unique_ptr<int, std::function<void(int*)>> defer_status((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
if (config::use_detailed_metrics && !instance_id.empty()) {
g_bvar_ms_commit_txn_eventually.put(instance_id, sw.elapsed_us());
}
- });
+ };
std::stringstream ss;
TxnErrorCode err = TxnErrorCode::TXN_OK;
@@ -2000,12 +1998,11 @@ void commit_txn_with_sub_txn(const CommitTxnRequest*
request, CommitTxnResponse*
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
tmp_rowsets_meta;
int num_rowsets = 0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
- (int*)0x01, [rs_tmp_key0, rs_tmp_key1, &num_rowsets, &txn_id,
&sub_txn_id](int*) {
- LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id
- << ", sub_txn_id=" << sub_txn_id << "
num_rowsets=" << num_rowsets
- << " range=[" << hex(rs_tmp_key0) << "," <<
hex(rs_tmp_key1) << ")";
- });
+ DORIS_CLOUD_DEFER_COPY(rs_tmp_key_info0, rs_tmp_key_info1) {
+ LOG(INFO) << "get tmp rowset meta, txn_id=" << txn_id << ",
sub_txn_id=" << sub_txn_id
+ << " num_rowsets=" << num_rowsets << " range=[" <<
hex(rs_tmp_key0) << ","
+ << hex(rs_tmp_key1) << ")";
+ };
std::unique_ptr<RangeGetIterator> it;
do {
diff --git a/cloud/src/meta-service/txn_kv.cpp
b/cloud/src/meta-service/txn_kv.cpp
index a328538ed78..ad9b62bb0c1 100644
--- a/cloud/src/meta-service/txn_kv.cpp
+++ b/cloud/src/meta-service/txn_kv.cpp
@@ -34,6 +34,7 @@
#include "common/bvars.h"
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "common/stopwatch.h"
#include "common/util.h"
@@ -416,8 +417,9 @@ TxnErrorCode Transaction::get(std::string_view begin,
std::string_view end,
int limit) {
StopWatch sw;
approximate_bytes_ += begin.size() + end.size();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [&sw](int*) { g_bvar_txn_kv_range_get <<
sw.elapsed_us(); });
+ DORIS_CLOUD_DEFER {
+ g_bvar_txn_kv_range_get << sw.elapsed_us();
+ };
FDBFuture* fut = fdb_transaction_get_range(
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)begin.data(),
begin.size()),
@@ -555,10 +557,10 @@ TxnErrorCode Transaction::commit() {
TxnErrorCode Transaction::get_read_version(int64_t* version) {
StopWatch sw;
auto* fut = fdb_transaction_get_read_version(txn_);
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [fut,
&sw](...) {
+ DORIS_CLOUD_DEFER {
fdb_future_destroy(fut);
g_bvar_txn_kv_get_read_version << sw.elapsed_us();
- });
+ };
RETURN_IF_ERROR(await_future(fut));
auto err = fdb_future_get_error(fut);
TEST_SYNC_POINT_CALLBACK("transaction:get_read_version:get_err", &err);
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index d8c2c313183..c78e0fb47db 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -486,7 +486,7 @@ int InstanceChecker::do_check() {
long instance_volume = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" <<
num_scanned
@@ -499,7 +499,7 @@ int InstanceChecker::do_check() {
g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
// FIXME(plat1ko): What if some list operation failed?
g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
- });
+ };
struct TabletFiles {
int64_t tablet_id {0};
@@ -645,14 +645,14 @@ int InstanceChecker::do_inverted_check() {
long num_file_leak = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
g_bvar_inverted_checker_num_check_failed.put(instance_id_,
num_file_leak);
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "inverted check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" <<
num_scanned
<< " num_file_leak=" << num_file_leak;
- });
+ };
struct TabletRowsets {
int64_t tablet_id {0};
@@ -939,7 +939,7 @@ int InstanceChecker::do_delete_bitmap_inverted_check() {
int64_t leaked_delete_bitmaps {0};
auto start_time = std::chrono::steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_,
leaked_delete_bitmaps);
g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_,
abnormal_delete_bitmaps);
g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_,
total_delete_bitmap_keys);
@@ -960,7 +960,7 @@ int InstanceChecker::do_delete_bitmap_inverted_check() {
"passed. cost={} ms, total_delete_bitmap_keys={}",
instance_id_, cost, total_delete_bitmap_keys);
}
- });
+ };
struct TabletsRowsetsCache {
int64_t tablet_id {-1};
diff --git a/cloud/src/recycler/hdfs_accessor.cpp
b/cloud/src/recycler/hdfs_accessor.cpp
index 024acd0efe7..0ff9523e073 100644
--- a/cloud/src/recycler/hdfs_accessor.cpp
+++ b/cloud/src/recycler/hdfs_accessor.cpp
@@ -32,6 +32,7 @@
#include <string_view>
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "common/string_util.h"
#include "cpp/sync_point.h"
@@ -533,12 +534,12 @@ int HdfsAccessor::put_file(const std::string&
relative_path, const std::string&
return -1;
}
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
if (file) {
SCOPED_BVAR_LATENCY(hdfs_close_latency);
hdfsCloseFile(fs_.get(), file);
}
- });
+ };
int64_t written_bytes = 0;
{
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 1224f8fdeb7..9c81b0364fd 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -35,6 +35,7 @@
#include <string_view>
#include <utility>
+#include "common/defer.h"
#include "common/stopwatch.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_helper.h"
@@ -705,12 +706,12 @@ int InstanceRecycler::recycle_deleted_instance() {
int ret = 0;
auto start_time = steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << (ret == 0 ? "successfully" : "failed to")
<< " recycle deleted instance, cost=" << cost
<< "s, instance_id=" << instance_id_;
- });
+ };
// delete all remote data
for (auto& [_, accessor] : accessor_map_) {
@@ -818,7 +819,7 @@ int InstanceRecycler::recycle_indexes() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -827,7 +828,7 @@ int InstanceRecycler::recycle_indexes() {
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled);
- });
+ };
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
@@ -920,8 +921,9 @@ int InstanceRecycler::recycle_indexes() {
auto loop_done = [&index_keys, this]() -> int {
if (index_keys.empty()) return 0;
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
- [&](int*) {
index_keys.clear(); });
+ DORIS_CLOUD_DEFER {
+ index_keys.clear();
+ };
if (0 != txn_remove(txn_kv_.get(), index_keys)) {
LOG(WARNING) << "failed to delete recycle index kv, instance_id="
<< instance_id_;
return -1;
@@ -1035,7 +1037,7 @@ int InstanceRecycler::recycle_partitions() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -1044,7 +1046,7 @@ int InstanceRecycler::recycle_partitions() {
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled);
- });
+ };
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
@@ -1152,10 +1154,10 @@ int InstanceRecycler::recycle_partitions() {
auto loop_done = [&partition_keys, &partition_version_keys, this]() -> int
{
if (partition_keys.empty()) return 0;
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
partition_keys.clear();
partition_version_keys.clear();
- });
+ };
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -1188,13 +1190,13 @@ int InstanceRecycler::recycle_versions() {
auto start_time = steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle table and partition versions finished, cost={}s",
cost)
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_recycled", num_recycled);
- });
+ };
auto version_key_begin = partition_version_key({instance_id_, 0, 0, 0});
auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0,
0});
@@ -1307,7 +1309,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
auto start_time = steady_clock::now();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle tablets of " + tablet_belongs + " finished,
cost={}s", cost)
.tag("instance_id", instance_id_)
@@ -1316,7 +1318,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
.tag("partition_id", partition_id)
.tag("num_scanned", num_scanned)
.tag("num_recycled", num_recycled);
- });
+ };
// The first string_view represents the tablet key which has been recycled
// The second bool represents whether the following fdb's tablet key
deletion could be done using range move or not
@@ -1412,10 +1414,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
break;
}
}
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
tablet_idx_keys.clear();
init_rs_keys.clear();
- });
+ };
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to delete tablet meta kv, instance_id=" <<
instance_id_;
@@ -1735,7 +1737,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
int64_t min_rowset_expiration_time = INT64_MAX;
int64_t max_rowset_expiration_time = 0;
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s",
cost)
.tag("instance_id", instance_id_)
@@ -1750,7 +1752,7 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
.tag("min rowset expiration time", min_rowset_expiration_time)
.tag("max rowset expiration time", max_rowset_expiration_time)
.tag("ret", ret);
- });
+ };
std::unique_ptr<Transaction> txn;
if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
@@ -1939,7 +1941,7 @@ int InstanceRecycler::recycle_rowsets() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -1954,7 +1956,7 @@ int InstanceRecycler::recycle_rowsets() {
.tag("total_rowset_meta_key_size_scanned",
total_rowset_key_size)
.tag("total_rowset_meta_value_size_scanned",
total_rowset_value_size)
.tag("expired_rowset_meta_size", expired_rowset_size);
- });
+ };
std::vector<std::string> rowset_keys;
std::vector<doris::RowsetMetaCloudPB> rowsets;
@@ -2250,7 +2252,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -2262,7 +2264,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
.tag("total_rowset_meta_key_size_scanned",
total_rowset_key_size)
.tag("total_rowset_meta_value_size_scanned",
total_rowset_value_size)
.tag("expired_rowset_meta_size_recycled", expired_rowset_size);
- });
+ };
// Elements in `tmp_rowset_keys` has the same lifetime as `it`
std::vector<std::string_view> tmp_rowset_keys;
@@ -2346,10 +2348,10 @@ int InstanceRecycler::recycle_tmp_rowsets() {
};
auto loop_done = [&tmp_rowset_keys, &tmp_rowsets, &num_recycled, this]()
-> int {
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
- });
+ };
if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET)
!= 0) {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id="
<< instance_id_;
return -1;
@@ -2375,12 +2377,11 @@ int InstanceRecycler::scan_and_recycle(
int64_t cnt = 0;
int get_range_retried = 0;
std::string err;
- std::unique_ptr<int, std::function<void(int*)>> defer_log(
- (int*)0x01, [begin, end, &err, &ret, &cnt,
&get_range_retried](int*) {
- LOG(INFO) << "finish scan_and_recycle key_range=[" <<
hex(begin) << "," << hex(end)
- << ") num_scanned=" << cnt << " get_range_retried="
<< get_range_retried
- << " ret=" << ret << " err=" << err;
- });
+ DORIS_CLOUD_DEFER_COPY(begin, end) {
+ LOG(INFO) << "finish scan_and_recycle key_range=[" << hex(begin) <<
"," << hex(end)
+ << ") num_scanned=" << cnt << " get_range_retried=" <<
get_range_retried
+ << " ret=" << ret << " err=" << err;
+ };
std::unique_ptr<RangeGetIterator> it;
do {
@@ -2445,7 +2446,7 @@ int InstanceRecycler::abort_timeout_txn() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -2455,7 +2456,7 @@ int InstanceRecycler::abort_timeout_txn() {
.tag("num_timeout", num_timeout)
.tag("num_abort", num_abort)
.tag("num_advance", num_advance);
- });
+ };
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
@@ -2585,7 +2586,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -2594,7 +2595,7 @@ int InstanceRecycler::recycle_expired_txn_label() {
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled);
- });
+ };
int64_t earlest_ts = std::numeric_limits<int64_t>::max();
auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB&
recycle_txn_pb) {
@@ -2717,8 +2718,9 @@ int InstanceRecycler::recycle_expired_txn_label() {
};
auto loop_done = [&]() -> int {
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [&](int*) { recycle_txn_info_keys.clear(); });
+ DORIS_CLOUD_DEFER {
+ recycle_txn_info_keys.clear();
+ };
TEST_SYNC_POINT_CALLBACK(
"InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
&recycle_txn_info_keys);
@@ -2814,11 +2816,11 @@ struct BatchObjStoreAccessor {
private:
void consume() {
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[this](int*) {
+ DORIS_CLOUD_DEFER {
paths_.clear();
copy_file_keys_.clear();
batch_count_++;
- });
+ };
LOG_INFO("begin to delete {} internal stage objects in batch {}",
paths_.size(),
batch_count_);
StopWatch sw;
@@ -2884,7 +2886,7 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -2894,7 +2896,7 @@ int InstanceRecycler::recycle_copy_jobs() {
.tag("num_finished", num_finished)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled);
- });
+ };
CopyJobKeyInfo key_info0 {instance_id_, "", 0, "", 0};
CopyJobKeyInfo key_info1 {instance_id_, "\xff", 0, "", 0};
@@ -3125,7 +3127,7 @@ int InstanceRecycler::recycle_stage() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
register_recycle_task(task_name, start_time);
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
unregister_recycle_task(task_name);
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
@@ -3133,7 +3135,7 @@ int InstanceRecycler::recycle_stage() {
.tag("instance_id", instance_id_)
.tag("num_scanned", num_scanned)
.tag("num_recycled", num_recycled);
- });
+ };
RecycleStageKeyInfo key_info0 {instance_id_, ""};
RecycleStageKeyInfo key_info1 {instance_id_, "\xff"};
@@ -3205,8 +3207,9 @@ int InstanceRecycler::recycle_stage() {
auto loop_done = [&stage_keys, this]() -> int {
if (stage_keys.empty()) return 0;
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
- [&](int*) {
stage_keys.clear(); });
+ DORIS_CLOUD_DEFER {
+ stage_keys.clear();
+ };
if (0 != txn_remove(txn_kv_.get(), stage_keys)) {
LOG(WARNING) << "failed to delete recycle partition kv,
instance_id=" << instance_id_;
return -1;
@@ -3222,11 +3225,11 @@ int InstanceRecycler::recycle_expired_stage_objects() {
int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
- std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ DORIS_CLOUD_DEFER {
int64_t cost =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
LOG_INFO("recycle expired stage objects, cost={}s",
cost).tag("instance_id", instance_id_);
- });
+ };
int ret = 0;
for (const auto& stage : instance_info_.stages()) {
std::stringstream ss;
diff --git a/cloud/src/recycler/recycler_service.cpp
b/cloud/src/recycler/recycler_service.cpp
index 8f3a736671c..c77357c764e 100644
--- a/cloud/src/recycler/recycler_service.cpp
+++ b/cloud/src/recycler/recycler_service.cpp
@@ -24,6 +24,7 @@
#include <google/protobuf/util/json_util.h>
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/s3_rate_limiter.h"
@@ -60,14 +61,12 @@ void
RecyclerServiceImpl::recycle_instance(::google::protobuf::RpcController* co
brpc::ClosureGuard closure_guard(done);
MetaServiceCode code = MetaServiceCode::OK;
std::string msg = "OK";
- std::unique_ptr<int, std::function<void(int*)>> defer_status(
- (int*)0x01, [&code, &msg, &response, &ctrl](int*) {
- response->mutable_status()->set_code(code);
- response->mutable_status()->set_msg(msg);
- LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " :
"failed to ")
- << "recycle_instance"
- << " " << ctrl->remote_side() << " " << msg;
- });
+ DORIS_CLOUD_DEFER {
+ response->mutable_status()->set_code(code);
+ response->mutable_status()->set_msg(msg);
+ LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ")
<< "recycle_instance"
+ << " " << ctrl->remote_side() << " " << msg;
+ };
std::vector<InstanceInfoPB> instances;
instances.reserve(request->instance_ids_size());
@@ -275,16 +274,15 @@ void
RecyclerServiceImpl::http(::google::protobuf::RpcController* controller,
std::string req;
std::string response_body;
std::string request_body;
- std::unique_ptr<int, std::function<void(int*)>> defer_status(
- (int*)0x01, [&code, &msg, &status_code, &response_body, &cntl,
&req](int*) {
- status_code = std::get<0>(convert_ms_code_to_http_code(code));
- LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " :
"failed to ") << "http"
- << " " << cntl->remote_side() << " request=\n"
- << req << "\n ret=" << code << " msg=" << msg;
- cntl->http_response().set_status_code(status_code);
- cntl->response_attachment().append(response_body);
- cntl->response_attachment().append("\n");
- });
+ DORIS_CLOUD_DEFER {
+ status_code = std::get<0>(convert_ms_code_to_http_code(code));
+ LOG(INFO) << (code == MetaServiceCode::OK ? "succ to " : "failed to ")
<< "http"
+ << " " << cntl->remote_side() << " request=\n"
+ << req << "\n ret=" << code << " msg=" << msg;
+ cntl->http_response().set_status_code(status_code);
+ cntl->response_attachment().append(response_body);
+ cntl->response_attachment().append("\n");
+ };
// Prepare input request info
auto unresolved_path = cntl->http_request().unresolved_path();
diff --git a/cloud/src/recycler/sync_executor.h
b/cloud/src/recycler/sync_executor.h
index 909f36a56c4..95650e5316a 100644
--- a/cloud/src/recycler/sync_executor.h
+++ b/cloud/src/recycler/sync_executor.h
@@ -27,6 +27,7 @@
#include <future>
#include <string>
+#include "common/defer.h"
#include "common/simple_thread_pool.h"
namespace doris::cloud {
@@ -50,7 +51,9 @@ public:
return *this;
}
std::vector<T> when_all(bool* finished) {
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) { _reset(); });
+ DORIS_CLOUD_DEFER {
+ _reset();
+ };
timespec current_time;
auto current_time_second = time(nullptr);
current_time.tv_sec = current_time_second + 300;
@@ -103,8 +106,9 @@ private:
_count(count),
_fut(_pro.get_future()) {}
void operator()(std::atomic_bool& stop_token) {
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
- [&](int*) {
_count.signal(); });
+ DORIS_CLOUD_DEFER {
+ _count.signal();
+ };
if (stop_token) {
_valid = false;
return;
diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h
index 5aa929c2398..6efa0bec76e 100644
--- a/cloud/src/recycler/util.h
+++ b/cloud/src/recycler/util.h
@@ -23,13 +23,16 @@
#include <string>
+#include "common/defer.h"
+
namespace doris::cloud {
// The time unit is the same with BE: us
-#define SCOPED_BVAR_LATENCY(bvar_item) \
- StopWatch sw; \
- std::unique_ptr<int, std::function<void(int*)>> defer( \
- (int*)0x01, [&](int*) { bvar_item << sw.elapsed_us(); });
+#define SCOPED_BVAR_LATENCY(bvar_item) \
+ StopWatch sw; \
+ DORIS_CLOUD_DEFER { \
+ bvar_item << sw.elapsed_us(); \
+ };
class TxnKv;
diff --git a/cloud/src/resource-manager/resource_manager.cpp
b/cloud/src/resource-manager/resource_manager.cpp
index cd0381b7189..99296d025cc 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -53,11 +53,10 @@ int ResourceManager::init() {
std::unique_ptr<RangeGetIterator> it;
int num_instances = 0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
- (int*)0x01, [key0, key1, &num_instances](int*) {
- LOG(INFO) << "get instances, num_instances=" << num_instances
<< " range=["
- << hex(key0) << "," << hex(key1) << "]";
- });
+ DORIS_CLOUD_DEFER_COPY(key0, key1) {
+ LOG(INFO) << "get instances, num_instances=" << num_instances << "
range=[" << hex(key0)
+ << "," << hex(key1) << "]";
+ };
// instance_id instance
std::vector<std::tuple<std::string, InstanceInfoPB>> instances;
@@ -312,8 +311,9 @@ std::pair<MetaServiceCode, std::string>
ResourceManager::add_cluster(const std::
std::string msg;
std::stringstream ss;
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [&msg](int*) { LOG(INFO) << "add_cluster err=" << msg;
});
+ DORIS_CLOUD_DEFER {
+ LOG(INFO) << "add_cluster err=" << msg;
+ };
// just check cluster_name not empty in add_cluster
if (!check_cluster_params_valid(cluster.cluster, &msg, true, true)) {
@@ -856,8 +856,9 @@ std::string ResourceManager::modify_nodes(const
std::string& instance_id,
const std::vector<NodeInfo>& to_del)
{
std::string msg;
std::stringstream ss;
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [&msg](int*) { LOG(INFO) << "modify_nodes err=" <<
msg; });
+ DORIS_CLOUD_DEFER {
+ LOG(INFO) << "modify_nodes err=" << msg;
+ };
if ((to_add.size() && to_del.size()) || (!to_add.size() &&
!to_del.size())) {
msg = "to_add and to_del both empty or both not empty";
@@ -1220,12 +1221,11 @@ std::pair<MetaServiceCode, std::string>
ResourceManager::refresh_instance(
LOG(INFO) << "begin to refresh instance, instance_id=" << instance_id << "
seq=" << ++seq;
std::pair<MetaServiceCode, std::string> ret0 {MetaServiceCode::OK, "OK"};
auto& [code, msg] = ret0;
- std::unique_ptr<int, std::function<void(int*)>> defer_log(
- (int*)0x01, [&ret0, &instance_id](int*) {
- LOG(INFO) << (std::get<0>(ret0) == MetaServiceCode::OK ? "succ
to " : "failed to ")
- << "refresh_instance, instance_id=" << instance_id
- << " code=" << std::get<0>(ret0) << " msg=" <<
std::get<1>(ret0);
- });
+ DORIS_CLOUD_DEFER {
+ LOG(INFO) << (std::get<0>(ret0) == MetaServiceCode::OK ? "succ to " :
"failed to ")
+ << "refresh_instance, instance_id=" << instance_id
+ << " code=" << std::get<0>(ret0) << " msg=" <<
std::get<1>(ret0);
+ };
std::unique_ptr<Transaction> txn0;
TxnErrorCode err = txn_kv_->create_txn(&txn0);
diff --git a/cloud/test/encryption_test.cpp b/cloud/test/encryption_test.cpp
index ed29177ac27..072e6a820cf 100644
--- a/cloud/test/encryption_test.cpp
+++ b/cloud/test/encryption_test.cpp
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include "common/config.h"
+#include "common/defer.h"
#include "common/encryption_util.h"
#include "common/logging.h"
#include "common/util.h"
@@ -186,8 +187,9 @@ TEST(EncryptionTest, RootKeyTestWithKms2) {
{
// mock falied to generate key
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("alikms::generate_data_key", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
@@ -208,8 +210,9 @@ TEST(EncryptionTest, RootKeyTestWithKms2) {
{
// mock succ to generate key
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [&](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("alikms::generate_data_key", [&](auto&& args) {
auto* ciphertext = try_any_cast<std::string*>(args[0]);
*ciphertext = mock_encoded_ciphertext;
@@ -249,8 +252,9 @@ TEST(EncryptionTest, RootKeyTestWithKms2) {
// mock abnormal decryption
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("alikms::decrypt", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
@@ -267,8 +271,9 @@ TEST(EncryptionTest, RootKeyTestWithKms2) {
// Decryption succeeded
{
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("alikms::decrypt", [&](auto&& args) {
auto* output = try_any_cast<std::string*>(args[0]);
*output = mock_encoded_plaintext;
@@ -320,8 +325,9 @@ TEST(EncryptionTest, RootKeyTestWithKms3) {
std::string mock_encoded_ciphertext = mock_encoded_plaintext;
// mock succ to generate key
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("alikms::generate_data_key", [&](auto&& args) {
auto* ciphertext = try_any_cast<std::string*>(args[0]);
*ciphertext = mock_encoded_ciphertext;
diff --git a/cloud/test/http_encode_key_test.cpp
b/cloud/test/http_encode_key_test.cpp
index d2456824743..58fdb3a4c97 100644
--- a/cloud/test/http_encode_key_test.cpp
+++ b/cloud/test/http_encode_key_test.cpp
@@ -19,6 +19,7 @@
#include <gen_cpp/cloud.pb.h>
#include <gtest/gtest.h>
+#include "common/defer.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
@@ -95,8 +96,9 @@ v v v v
v
// test empty body branch
auto sp = doris::SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
doris::SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ doris::SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("process_http_encode_key::empty_body", [](auto&& args) {
auto* body = doris::try_any_cast<std::string*>(args[0]);
body->clear();
diff --git a/cloud/test/meta_server_test.cpp b/cloud/test/meta_server_test.cpp
index 15fd50785ac..8de04af4a8a 100644
--- a/cloud/test/meta_server_test.cpp
+++ b/cloud/test/meta_server_test.cpp
@@ -33,6 +33,7 @@
#include <thread>
#include "common/config.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
@@ -170,12 +171,12 @@ TEST(MetaServerTest, StartAndStop) {
// use structured binding for point alias (avoid multi lines of
declaration)
auto [meta_server_start_1, meta_server_start_2, meta_server_start_3] = sps;
sp->enable_processing();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&](...)
{
+ DORIS_CLOUD_DEFER {
for (auto& i : sps) {
sp->clear_call_back(i);
} // redundant
sp->disable_processing();
- });
+ };
auto foo = [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
diff --git a/cloud/test/meta_service_http_test.cpp
b/cloud/test/meta_service_http_test.cpp
index c16e8e705df..f267cc8261b 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -41,6 +41,7 @@
#include "common/config.h"
#include "common/configbase.h"
+#include "common/defer.h"
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
@@ -1004,8 +1005,9 @@ TEST(MetaServiceHttpTest, AlterIamTest) {
auto cloud_unique_id = "test_cloud_unique_id";
std::string instance_id = "alter_iam_test_instance_id";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -1523,8 +1525,9 @@ TEST(MetaServiceHttpTest, TxnLazyCommit) {
TEST(MetaServiceHttpTest, get_stage_response_sk) {
auto sp = SyncPoint::get_instance();
sp->enable_processing();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
- [&](...) {
sp->disable_processing(); });
+ DORIS_CLOUD_DEFER {
+ sp->disable_processing();
+ };
GetStageResponse res;
auto* stage = res.add_stage();
@@ -1557,8 +1560,9 @@ TEST(MetaServiceHttpTest, get_stage_response_sk) {
TEST(MetaServiceHttpTest, get_obj_store_info_response_sk) {
auto sp = SyncPoint::get_instance();
sp->enable_processing();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
- [&](...) {
sp->disable_processing(); });
+ DORIS_CLOUD_DEFER {
+ sp->disable_processing();
+ };
GetObjStoreInfoResponse res;
auto* obj_info = res.add_obj_info();
diff --git a/cloud/test/meta_service_job_test.cpp
b/cloud/test/meta_service_job_test.cpp
index a81e9c843d6..caa2db13c41 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -30,6 +30,7 @@
#include <random>
#include <string>
+#include "common/defer.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
@@ -313,8 +314,9 @@ void finish_schema_change_job(
TEST(MetaServiceJobTest, StartCompactionArguments) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -376,8 +378,9 @@ TEST(MetaServiceJobTest, StartCompactionArguments) {
TEST(MetaServiceJobTest, StartFullCompaction) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -431,8 +434,9 @@ TEST(MetaServiceJobTest, StartFullCompaction) {
TEST(MetaServiceJobTest, StartSchemaChangeArguments) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -536,8 +540,9 @@ TEST(MetaServiceJobTest, StartSchemaChangeArguments) {
TEST(MetaServiceJobTest, ProcessCompactionArguments) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -635,8 +640,9 @@ TEST(MetaServiceJobTest, ProcessCompactionArguments) {
TEST(MetaServiceJobTest, ProcessSchemaChangeArguments) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -779,8 +785,9 @@ TEST(MetaServiceJobTest, CompactionJobTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -1155,8 +1162,9 @@ void check_compaction_key(MetaServiceProxy* meta_service,
std::string instance_i
TEST(MetaServiceJobTest, DeleteBitmapUpdateLockCompatibilityTest) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -2666,8 +2674,9 @@ TEST(MetaServiceJobTest,
DeleteBitmapUpdateLockCompatibilityTest) {
TEST(MetaServiceJobTest, CompactionJobWithMoWTest) {
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -2937,8 +2946,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3161,8 +3171,9 @@ TEST(MetaServiceJobTest, RetrySchemaChangeJobTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3326,8 +3337,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobWithMoWTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3488,8 +3500,9 @@ TEST(MetaServiceJobTest, ConcurrentCompactionTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3839,8 +3852,9 @@ TEST(MetaServiceJobTest, ParallelCumuCompactionTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3958,8 +3972,9 @@ TEST(MetaServiceJobTest, SchemaChangeJobPersistTest) {
auto meta_service = get_meta_service();
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4025,8 +4040,9 @@ TEST(MetaServiceJobTest, DoCompactionWhenSC) {
auto meta_service = get_meta_service();
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4112,8 +4128,9 @@ TEST(MetaServiceJobTest, ReStartSC) {
auto meta_service = get_meta_service();
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4157,8 +4174,9 @@ TEST(MetaServiceJobTest, CancelSC) {
auto meta_service = get_meta_service();
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index f3d3323f476..c7d1edc0ee1 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -351,8 +351,9 @@ TEST(MetaServiceTest, GetInstanceIdTest) {
const std::string& cloud_unique_id);
auto meta_service = get_meta_service();
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id_err", [&](auto&& args) {
std::string* err = try_any_cast<std::string*>(args[0]);
*err = "can't find node from cache";
@@ -1183,8 +1184,9 @@ TEST(MetaServiceTest, BeginTxnTest) {
std::condition_variable go_cv;
bool go = false;
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
std::atomic<int32_t> count_txn1 = {0};
std::atomic<int32_t> count_txn2 = {0};
@@ -1330,8 +1332,9 @@ TEST(MetaServiceTest, BeginTxnTest) {
std::condition_variable go_cv;
bool go = false;
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
std::atomic<int32_t> count_txn1 = {0};
std::atomic<int32_t> count_txn2 = {0};
@@ -3453,8 +3456,9 @@ TEST(MetaServiceTest, CopyJobTest) {
std::string instance_id = "copy_job_test_instance_id";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3931,8 +3935,9 @@ TEST(MetaServiceTest, StageTest) {
auto cloud_unique_id = "test_cloud_unique_id";
std::string instance_id = "stage_test_instance_id";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4269,8 +4274,9 @@ TEST(MetaServiceTest, GetIamTest) {
auto cloud_unique_id = "test_cloud_unique_id";
std::string instance_id = "get_iam_test_instance_id";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4352,8 +4358,9 @@ TEST(MetaServiceTest, AlterRamTest) {
auto cloud_unique_id = "test_cloud_unique_id";
std::string instance_id = "alter_iam_test_instance_id";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4661,8 +4668,9 @@ void remove_delete_bitmap_lock(MetaServiceProxy*
meta_service, int64_t table_id)
TEST(MetaServiceTest, GetDeleteBitmapUpdateLock) {
auto meta_service = get_meta_service();
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
remove_delete_bitmap_lock(meta_service.get(), 1);
remove_delete_bitmap_lock(meta_service.get(), 2);
int64_t table_id = 9;
@@ -4919,10 +4927,10 @@ TEST(MetaServiceTest,
GetDeleteBitmapUpdateLockTabletStatsNormal) {
std::string instance_id = "test_get_delete_bitmap_update_lock_normal";
[[maybe_unused]] auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[](int*) {
+ DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- });
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -4969,10 +4977,10 @@ TEST(MetaServiceTest,
GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
// the reading of tablet stats
std::string instance_id =
"test_get_delete_bitmap_update_lock_abnormal1";
[[maybe_unused]] auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[](int*) {
+ DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- });
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -5012,10 +5020,10 @@ TEST(MetaServiceTest,
GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
// the reading of tablet stats
std::string instance_id =
"test_get_delete_bitmap_update_lock_abnormal2";
[[maybe_unused]] auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[](int*) {
+ DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- });
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -5055,10 +5063,10 @@ TEST(MetaServiceTest,
GetDeleteBitmapUpdateLockTabletStatsError) {
// 2.3 abnormal path, meeting error when reading tablets' stats
std::string instance_id =
"test_get_delete_bitmap_update_lock_abnormal3";
[[maybe_unused]] auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[](int*) {
+ DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- });
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -5095,10 +5103,10 @@ TEST(MetaServiceTest,
GetDeleteBitmapUpdateLockTabletStatsError) {
// this should not fail if lock is not expired
std::string instance_id =
"test_get_delete_bitmap_update_lock_abnormal4";
[[maybe_unused]] auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[](int*) {
+ DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->disable_processing();
SyncPoint::get_instance()->clear_all_call_backs();
- });
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -6395,8 +6403,9 @@ void
update_delete_bitmap_with_remove_pre(MetaServiceProxy* meta_service, int64_
TEST(MetaServiceTest, UpdateDeleteBitmapWithRemovePreDeleteBitmap) {
auto meta_service = get_meta_service();
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
update_delete_bitmap_with_remove_pre(meta_service.get(), 200, 202);
@@ -6982,8 +6991,9 @@ TEST(MetaServiceTest, BatchGetVersionFallback) {
constexpr size_t N = 100;
size_t i = 0;
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("batch_get_version_err", [&](auto&& args) {
if (i++ == N / 10) {
*try_any_cast<TxnErrorCode*>(args) = TxnErrorCode::TXN_TOO_OLD;
@@ -7028,8 +7038,9 @@ TEST(MetaServiceTest, IsDroppedTablet) {
auto meta_service = get_meta_service();
std::string instance_id = "IsDroppedTablet";
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -7109,8 +7120,9 @@ TEST(MetaServiceTest, IndexRequest) {
auto meta_service = get_meta_service();
std::string instance_id = "IndexRequest";
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -7352,8 +7364,9 @@ TEST(MetaServiceTest, PartitionRequest) {
auto meta_service = get_meta_service();
std::string instance_id = "PartitionRequest";
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -9206,8 +9219,9 @@ TEST(MetaServiceTest, UpdateTmpRowsetTest) {
std::string instance_id = "update_rowset_meta_test_instance_id";
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -9625,8 +9639,9 @@ TEST(MetaServiceTest, CheckJobExisted) {
std::string instance_id = "check_job_existed_instance_id";
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 7711631ab25..12f548f8022 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -790,8 +790,9 @@ TEST(RecyclerTest, recycle_rowsets) {
int insert_no_inverted_index = 0;
int insert_inverted_index = 0;
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("InvertedIndexIdCache::insert1", [&](auto&&) {
++insert_no_inverted_index; });
sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) {
++insert_inverted_index; });
sp->enable_processing();
@@ -867,8 +868,9 @@ TEST(RecyclerTest, bench_recycle_rowsets) {
ASSERT_EQ(recycler.init(), 0);
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("memkv::Transaction::get", [](auto&& args) {
auto* limit = try_any_cast<int*>(args[0]);
*limit = 100;
@@ -940,8 +942,9 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
int insert_no_inverted_index = 0;
int insert_inverted_index = 0;
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("InvertedIndexIdCache::insert1", [&](auto&&) {
++insert_no_inverted_index; });
sp->set_call_back("InvertedIndexIdCache::insert2", [&](auto&&) {
++insert_inverted_index; });
sp->enable_processing();
@@ -2025,8 +2028,9 @@ TEST(RecyclerTest, recycle_copy_jobs) {
TEST(RecyclerTest, recycle_batch_copy_jobs) {
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("MockAccessor::delete_files", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
@@ -2148,8 +2152,9 @@ TEST(RecyclerTest, recycle_batch_copy_jobs) {
TEST(RecyclerTest, recycle_stage) {
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
auto txn_kv =
std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
ASSERT_NE(txn_kv.get(), nullptr);
ASSERT_EQ(txn_kv->init(), 0);
@@ -2446,8 +2451,9 @@ TEST(CheckerTest, normal_inverted_check) {
},
&guard);
sp->enable_processing();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->disable_processing(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->disable_processing();
+ };
InstanceChecker checker(txn_kv, instance_id);
ASSERT_EQ(checker.init(instance), 0);
@@ -2495,8 +2501,9 @@ TEST(CheckerTest, DISABLED_abnormal_inverted_check) {
},
&guard);
sp->enable_processing();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->disable_processing(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->disable_processing();
+ };
InstanceChecker checker(txn_kv, instance_id);
ASSERT_EQ(checker.init(instance), 0);
@@ -2701,8 +2708,9 @@ TEST(CheckerTest, do_inspect) {
{
// empty job info
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("Checker:do_inspect", [](auto&& args) {
auto last_ctime = *try_any_cast<int64_t*>(args[0]);
ASSERT_EQ(last_ctime, 11111);
@@ -2723,8 +2731,9 @@ TEST(CheckerTest, do_inspect) {
ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
checker.do_inspect(instance);
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("Checker:do_inspect", [](auto&& args) {
ASSERT_EQ(*try_any_cast<int64_t*>(args[0]), 11111);
});
@@ -2739,8 +2748,9 @@ TEST(CheckerTest, do_inspect) {
job_info.set_instance_id(instance_id);
job_info.set_last_ctime_ms(12345);
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("Checker:do_inspect", [](auto&& args) {
ASSERT_EQ(*try_any_cast<int64_t*>(args[0]), 12345);
});
@@ -2763,8 +2773,9 @@ TEST(CheckerTest, do_inspect) {
job_info.set_instance_id(instance_id);
job_info.set_last_ctime_ms(now - expiration_ms - 10);
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
bool alarm = false;
sp->set_call_back("Checker:do_inspect", [&alarm](auto&&) { alarm =
true; });
@@ -2880,8 +2891,9 @@ TEST(CheckerTest, delete_bitmap_inverted_check_abnormal) {
std::map<std::int64_t, std::set<std::tuple<std::string, int64_t, int64_t>>>
expected_leaked_delete_bitmaps {}, real_leaked_delete_bitmaps {};
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back(
"InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
[&real_abnormal_delete_bitmaps](auto&& args) {
@@ -3105,8 +3117,9 @@ TEST(CheckerTest,
delete_bitmap_storage_optimize_check_abnormal) {
std::map<std::int64_t, std::set<std::string>> expected_abnormal_rowsets {};
std::map<std::int64_t, std::set<std::string>> real_abnormal_rowsets {};
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("InstanceChecker::check_delete_bitmap_storage_optimize.get_abnormal_rowset",
[&real_abnormal_rowsets](auto&& args) {
int64_t tablet_id = *try_any_cast<int64_t*>(args[0]);
@@ -3213,8 +3226,9 @@ TEST(CheckerTest, check_compaction_key) {
config::delete_bitmap_lock_v2_white_list = "*";
std::string instance_id = "test_check_compaction_key";
[[maybe_unused]] auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -3330,8 +3344,9 @@ TEST(CheckerTest,
delete_bitmap_storage_optimize_v2_check_abnormal) {
std::map<std::int64_t, std::set<std::string>> expected_abnormal_rowsets {};
std::map<std::int64_t, std::set<std::string>> real_abnormal_rowsets {};
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back(
"InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_rowset",
[&](auto&& args) {
@@ -3646,11 +3661,11 @@ TEST(RecyclerTest,
delete_rowset_data_without_inverted_index_storage_format) {
TEST(RecyclerTest, init_vault_accessor_failed_test) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
@@ -3781,11 +3796,11 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
TEST(RecyclerTest, recycle_tablet_without_resource_id) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
@@ -3863,11 +3878,11 @@ TEST(RecyclerTest, recycle_tablet_without_resource_id) {
TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
@@ -3945,11 +3960,11 @@ TEST(RecyclerTest,
recycle_tablet_with_wrong_resource_id) {
TEST(RecyclerTest, init_all_vault_accessors_failed_test) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
@@ -3991,11 +4006,11 @@ TEST(RecyclerTest,
init_all_vault_accessors_failed_test) {
TEST(RecyclerTest, recycler_storage_vault_white_list_test) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
@@ -4125,11 +4140,11 @@ TEST(RecyclerTest,
recycler_storage_vault_white_list_test) {
TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
@@ -4204,11 +4219,11 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
@@ -4282,11 +4297,11 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) {
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&sp](int*) {
+ DORIS_CLOUD_DEFER {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
- });
+ };
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
@@ -4681,8 +4696,9 @@ TEST(RecyclerTest,
concurrent_recycle_txn_label_failure_test) {
check_multiple_txn_info_kvs(txn_kv, 20000);
auto* sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("InstanceRecycler::recycle_expired_txn_label.check_recycle_txn_info_keys",
[](auto&& args) {
auto* recycle_txn_info_keys =
diff --git a/cloud/test/schema_kv_test.cpp b/cloud/test/schema_kv_test.cpp
index 2ab8261949e..1840598b40e 100644
--- a/cloud/test/schema_kv_test.cpp
+++ b/cloud/test/schema_kv_test.cpp
@@ -24,6 +24,7 @@
#include <random>
#include "common/config.h"
+#include "common/defer.h"
#include "cpp/sync_point.h"
#include "meta-service/keys.h"
#include "meta-service/meta_service.h"
@@ -114,8 +115,9 @@ TEST(DetachSchemaKVTest, TabletTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -396,8 +398,9 @@ TEST(DetachSchemaKVTest, RowsetTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -527,9 +530,9 @@ TEST(DetachSchemaKVTest, RowsetTest) {
}
// check get rowset response
auto get_rowset_res =
google::protobuf::Arena::CreateMessage<GetRowsetResponse>(arena);
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
if (!arena) delete get_rowset_res;
- });
+ };
get_rowset(meta_service.get(), table_id, index_id, partition_id,
tablet_id,
*get_rowset_res);
ASSERT_EQ(get_rowset_res->rowset_meta_size(), schema_versions.size());
@@ -578,8 +581,9 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
@@ -633,9 +637,9 @@ TEST(DetachSchemaKVTest, InsertExistedRowsetTest) {
tablet_id, next_rowset_id(), 1));
auto committed_rowset = create_rowset(txn_id, tablet_id,
next_rowset_id(), 2, 2);
auto res =
google::protobuf::Arena::CreateMessage<CreateRowsetResponse>(arena);
- std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01,
[&](int*) {
+ DORIS_CLOUD_DEFER {
if (!arena) delete res;
- });
+ };
prepare_rowset(meta_service.get(), committed_rowset, *res);
ASSERT_EQ(res->status().code(), MetaServiceCode::OK);
res->Clear();
@@ -671,8 +675,9 @@ TEST(SchemaKVTest, InsertExistedRowsetTest) {
// meta_service->resource_mgr().reset(); // Do not use resource manager
auto sp = SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
diff --git a/cloud/test/txn_kv_test.cpp b/cloud/test/txn_kv_test.cpp
index 27088f3da72..78c8c18da42 100644
--- a/cloud/test/txn_kv_test.cpp
+++ b/cloud/test/txn_kv_test.cpp
@@ -30,6 +30,7 @@
#include <thread>
#include "common/config.h"
+#include "common/defer.h"
#include "common/stopwatch.h"
#include "common/util.h"
#include "cpp/sync_point.h"
@@ -284,8 +285,9 @@ TEST(TxnKvTest, PutLargeValueTest) {
auto txn_kv = std::make_shared<MemTxnKv>();
auto sp = doris::SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
doris::SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ doris::SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->enable_processing();
doris::TabletSchemaCloudPB schema;
@@ -573,8 +575,9 @@ TEST(TxnKvTest, FullRangeGetIterator) {
encode_int64(INT64_MAX, &end);
auto* sp = doris::SyncPoint::get_instance();
- std::unique_ptr<int, std::function<void(int*)>> defer(
- (int*)0x01, [](int*) {
doris::SyncPoint::get_instance()->clear_all_call_backs(); });
+ DORIS_CLOUD_DEFER {
+ doris::SyncPoint::get_instance()->clear_all_call_backs();
+ };
sp->enable_processing();
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]