This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 78689d71053 branch-3.1: [enhancement](cloud) Add checker and more bvar
for cloud restore #54912 #54994 #55161 (#55174)
78689d71053 is described below
commit 78689d71053fa0c2ff3161e16f66d2c6d31b514f
Author: Uniqueyou <[email protected]>
AuthorDate: Wed Aug 27 17:54:19 2025 +0800
branch-3.1: [enhancement](cloud) Add checker and more bvar for cloud
restore #54912 #54994 #55161 (#55174)
picked #54912 #54994 #55161
---------
Co-authored-by: xy720 <[email protected]>
---
be/src/cloud/cloud_meta_mgr.cpp | 4 +-
cloud/src/common/bvars.cpp | 6 ++
cloud/src/common/bvars.h | 19 +++++
cloud/src/common/config.h | 1 +
cloud/src/meta-service/meta_service.cpp | 37 ++++++++-
cloud/src/recycler/checker.cpp | 113 +++++++++++++++++++++++++-
cloud/src/recycler/checker.h | 2 +
cloud/src/recycler/recycler.cpp | 1 +
cloud/test/meta_service_test.cpp | 56 ++++++++++++-
cloud/test/recycler_test.cpp | 137 +++++++++++++++++++++++++++++++-
gensrc/proto/cloud.proto | 9 ++-
11 files changed, 373 insertions(+), 12 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 833786e4157..7e8f07d26cb 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1205,6 +1205,7 @@ Status CloudMetaMgr::prepare_restore_job(const
TabletMetaPB& tablet_meta) {
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_meta.tablet_id());
req.set_expiration(config::snapshot_expire_time_sec);
+ req.set_action(RestoreJobRequest::PREPARE);
doris_tablet_meta_to_cloud(req.mutable_tablet_meta(),
std::move(tablet_meta));
return retry_rpc("prepare restore job", req, &resp,
&MetaService_Stub::prepare_restore_job);
@@ -1216,6 +1217,7 @@ Status CloudMetaMgr::commit_restore_job(const int64_t
tablet_id) {
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
return retry_rpc("commit restore job", req, &resp,
&MetaService_Stub::commit_restore_job);
}
@@ -1227,7 +1229,7 @@ Status CloudMetaMgr::finish_restore_job(const int64_t
tablet_id, bool is_complet
RestoreJobResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet_id);
- req.set_is_completed(is_completed);
+ req.set_action(is_completed ? RestoreJobRequest::COMPLETE :
RestoreJobRequest::ABORT);
return retry_rpc("finish restore job", req, &resp,
&MetaService_Stub::finish_restore_job);
}
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1678f29bdff..eb8618d4048 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -231,6 +231,12 @@ BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_leaked_delete_bitmaps("checke
BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker",
"abnormal_delete_bitmaps");
BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_delete_bitmaps_scanned("checker",
"delete_bitmap_keys_scanned");
BvarStatusWithTag<int64_t>
g_bvar_max_rowsets_with_useless_delete_bitmap_version("checker",
"max_rowsets_with_useless_delete_bitmap_version");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_prepared_state("checker",
"restore_job_prepared_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_committed_state("checker",
"restore_job_committed_state");
+BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state("checker",
"restore_job_dropped_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_completed_state("checker",
"restore_job_completed_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_recycling_state("checker",
"restore_job_recycling_state");
+BvarStatusWithTag<int64_t>
g_bvar_checker_restore_job_cost_many_time("checker",
"restore_job_cost_many_time");
// rpc kv rw count
// get_rowset
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 1525cc260d9..9d7dfa232f0 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -374,6 +374,13 @@ extern BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_abnormal_delete_bitmap
extern BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_delete_bitmaps_scanned;
extern BvarStatusWithTag<int64_t>
g_bvar_max_rowsets_with_useless_delete_bitmap_version;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_prepared_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_committed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_completed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_recycling_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_cost_many_time;
+
// rpc kv
extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_counter;
@@ -421,6 +428,12 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_commit_partition_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_counter;
@@ -527,6 +540,12 @@ extern mBvarInt64Adder
g_bvar_rpc_kv_commit_partition_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_bytes;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 351ade480a1..4cd46c605fa 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -118,6 +118,7 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
CONF_Bool(force_immediate_recycle, "false");
CONF_mBool(enable_mow_job_key_check, "false");
+CONF_mBool(enable_restore_job_check, "false");
CONF_mBool(enable_checker_for_meta_key_check, "false");
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 71bee7225de..c54c1e30b95 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -998,6 +998,13 @@ void
MetaServiceImpl::prepare_restore_job(::google::protobuf::RpcController* con
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(prepare_restore_job);
+ if (request->action() != RestoreJobRequest::PREPARE) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected PREPARE but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1179,6 +1186,13 @@ void
MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(commit_restore_job);
+ if (request->action() != RestoreJobRequest::COMMIT) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected COMMIT but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1516,6 +1530,14 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
RestoreJobResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(finish_restore_job);
+ if (request->action() != RestoreJobRequest::COMPLETE &&
+ request->action() != RestoreJobRequest::ABORT) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "invalid action, expected COMPLETE or ABORT but got " +
+ RestoreJobRequest::Action_Name(request->action());
+ return;
+ }
+
if (!request->has_tablet_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty tablet_id";
@@ -1570,7 +1592,6 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
return;
}
- bool is_completed = request->has_is_completed() && request->is_completed();
if (restore_job_pb.state() == RestoreJobCloudPB::DROPPED ||
restore_job_pb.state() == RestoreJobCloudPB::COMPLETED) {
LOG_INFO("restore job already finished")
@@ -1587,7 +1608,8 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
return;
} else {
// PREPARED, COMMITTED state
- if (is_completed && restore_job_pb.state() !=
RestoreJobCloudPB::COMMITTED) {
+ if (request->action() == RestoreJobRequest::COMPLETE &&
+ restore_job_pb.state() != RestoreJobCloudPB::COMMITTED) {
// Only allow COMMITTED -> COMPLETED
code = MetaServiceCode::INVALID_ARGUMENT;
msg = fmt::format("restore tablet {} in invalid state to complete,
state: {}",
@@ -1595,10 +1617,21 @@ void
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
return;
}
+ if (request->action() == RestoreJobRequest::ABORT &&
+ (restore_job_pb.state() != RestoreJobCloudPB::PREPARED &&
+ restore_job_pb.state() != RestoreJobCloudPB::COMMITTED)) {
+ // Only allow PREPARED/COMMITTED -> DROPPED
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("restore tablet {} in invalid state to abort,
state: {}",
+ tablet_idx.tablet_id(),
+
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
+ return;
+ }
}
// 2. update restore job
std::string to_save_val;
+ bool is_completed = request->action() == RestoreJobRequest::COMPLETE;
restore_job_pb.set_state(is_completed ? RestoreJobCloudPB::COMPLETED
: RestoreJobCloudPB::DROPPED);
restore_job_pb.set_need_recycle_data(!is_completed);
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index bd1a5add4a0..544d6dfb52f 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -198,6 +198,12 @@ int Checker::start() {
}
}
+ if (config::enable_restore_job_check) {
+ if (int ret = checker->do_restore_job_check(); ret != 0) {
+ success = false;
+ }
+ }
+
if (config::enable_delete_bitmap_storage_optimize_v2_check) {
if (int ret =
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
ret != 0) {
@@ -1375,7 +1381,7 @@ int
InstanceChecker::check_inverted_index_file_storage_format_v1(
// Garbage data leak
// clang-format off
LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains
segment_id, rowset should be recycled,"
- << " key = " << file_path
+ << " key = " << file_path
<< " segment_id = " << segment_id;
// clang-format on
return 1;
@@ -1385,7 +1391,7 @@ int
InstanceChecker::check_inverted_index_file_storage_format_v1(
// Garbage data leak
// clang-format off
LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains
index_id_with_suffix_name,"
- << " rowset with inde meta should be recycled, key=" <<
file_path
+ << " rowset with inde meta should be recycled, key=" <<
file_path
<< " index_id_with_suffix_name=" <<
index_id_with_suffix_name;
// clang-format on
return 1;
@@ -1758,4 +1764,107 @@ int InstanceChecker::do_mow_job_key_check() {
return 0;
}
+int InstanceChecker::do_restore_job_check() {
+ int64_t num_prepared = 0;
+ int64_t num_committed = 0;
+ int64_t num_dropped = 0;
+ int64_t num_completed = 0;
+ int64_t num_recycling = 0;
+ int64_t num_cost_many_time = 0;
+ const int64_t COST_MANY_THRESHOLD = 3600;
+
+ using namespace std::chrono;
+ auto start_time = steady_clock::now();
+ DORIS_CLOUD_DEFER {
+ g_bvar_checker_restore_job_prepared_state.put(instance_id_,
num_prepared);
+ g_bvar_checker_restore_job_committed_state.put(instance_id_,
num_committed);
+ g_bvar_checker_restore_job_dropped_state.put(instance_id_,
num_dropped);
+ g_bvar_checker_restore_job_completed_state.put(instance_id_,
num_completed);
+ g_bvar_checker_restore_job_recycling_state.put(instance_id_,
num_recycling);
+ g_bvar_checker_restore_job_cost_many_time.put(instance_id_,
num_cost_many_time);
+ auto cost_ms =
+ duration_cast<std::chrono::milliseconds>(steady_clock::now() -
start_time).count();
+ LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
+ << "ms. instance_id=" << instance_id_ << " num_prepared=" <<
num_prepared
+ << " num_committed=" << num_committed << " num_dropped=" <<
num_dropped
+ << " num_completed=" << num_completed << " num_recycling="
<< num_recycling
+ << " num_cost_many_time=" << num_cost_many_time;
+ };
+
+ LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
+
+ JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
+ JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
+ std::string begin;
+ std::string end;
+ job_restore_tablet_key(restore_job_key_info0, &begin);
+ job_restore_tablet_key(restore_job_key_info1, &end);
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn";
+ return -1;
+ }
+ err = txn->get(begin, end, &it);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+ return -1;
+ }
+ if (!it->has_next()) {
+ break;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ RestoreJobCloudPB restore_job_pb;
+ if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed restore job value").tag("key", hex(k));
+ return -1;
+ }
+
+ switch (restore_job_pb.state()) {
+ case RestoreJobCloudPB::PREPARED:
+ ++num_prepared;
+ break;
+ case RestoreJobCloudPB::COMMITTED:
+ ++num_committed;
+ break;
+ case RestoreJobCloudPB::DROPPED:
+ ++num_dropped;
+ break;
+ case RestoreJobCloudPB::COMPLETED:
+ ++num_completed;
+ break;
+ case RestoreJobCloudPB::RECYCLING:
+ ++num_recycling;
+ break;
+ default:
+ break;
+ }
+
+ int64_t current_time = ::time(nullptr);
+ if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
+ restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
+ current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD)
{
+ // restore job run more than 1 hour
+ ++num_cost_many_time;
+ LOG_WARNING("restore job cost too many time")
+ .tag("key", hex(k))
+ .tag("tablet_id", restore_job_pb.tablet_id())
+ .tag("state", restore_job_pb.state())
+ .tag("ctime_s", restore_job_pb.ctime_s())
+ .tag("mtime_s", restore_job_pb.mtime_s());
+ }
+
+ if (!it->has_next()) {
+ begin = k;
+ begin.push_back('\x00'); // Update to next smallest key for
iteration
+ break;
+ }
+ }
+ } while (it->more() && !stopped());
+ return 0;
+}
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index fb8f084297d..436e1302b07 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -107,6 +107,8 @@ public:
int do_mow_job_key_check();
+ int do_restore_job_check();
+
// If there are multiple buckets, return the minimum lifecycle; if there
are no buckets (i.e.
// all accessors are HdfsAccessor), return INT64_MAX.
// Return 0 if success, otherwise error
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 5d3e4c5713d..fab7de75c2a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2706,6 +2706,7 @@ int InstanceRecycler::recycle_restore_jobs() {
LOG_WARNING("failed to commit txn: {}", err);
return -1;
}
+ return 0;
}
std::string restore_job_rs_key0 =
job_restore_rowset_key({instance_id_, tablet_id, 0});
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 0e1175085af..a52df83e3b1 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10431,6 +10431,11 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
txn->put(meta_tablet_idx_key({instance_id, tablet_id}),
tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+ // empty action
+ meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::PREPARE);
// empty tablet id
meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
@@ -10443,6 +10448,13 @@ TEST(MetaServiceTest, RestoreJobTest) {
meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "no tablet meta");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10454,6 +10466,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
req.set_expiration(time(nullptr) + 3600);
+ req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = req.mutable_tablet_meta();
@@ -10494,6 +10507,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
txn->put(meta_tablet_idx_key({instance_id, tablet_id}),
tablet_idx_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = req.mutable_tablet_meta();
@@ -10536,10 +10550,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
// invalid args commit restore job
{
reset_meta_service();
+ // empty action
+ meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::COMMIT);
+
// empty tablet_id
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10551,6 +10578,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "restore job not exists or has been
recycled");
@@ -10569,6 +10597,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
tablet_meta->set_index_id(index_id);
@@ -10594,6 +10623,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// commit_restore_job
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
std::string tablet_key =
@@ -10639,6 +10669,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10670,6 +10701,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// commit_restore_job
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
@@ -10703,10 +10735,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
// invalid args finish restore job
{
reset_meta_service();
+ // empty action
+ meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+ ASSERT_TRUE(res.status().msg().find("invalid action") !=
std::string::npos);
+ req.set_action(RestoreJobRequest::COMPLETE);
+
// empty tablet_id
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+ // check key existence
+ std::string restore_job_key = job_restore_tablet_key({instance_id,
tablet_id});
+ std::string val;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
+ ASSERT_EQ(txn->get(restore_job_key, &val),
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
req.Clear();
res.Clear();
}
@@ -10718,6 +10763,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_EQ(res.status().msg(), "restore job not exists or has been
recycled");
@@ -10736,6 +10782,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10761,6 +10808,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
ASSERT_EQ(txn->get(restore_job_rs_key, &val), TxnErrorCode::TXN_OK);
req.set_tablet_id(tablet_id);
+ req.set_action(RestoreJobRequest::COMMIT);
meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn),
TxnErrorCode::TXN_OK);
@@ -10769,7 +10817,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to COMPLETED
req.set_tablet_id(tablet_id);
- req.set_is_completed(true);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
@@ -10802,6 +10850,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
auto* tablet_meta = make_req.mutable_tablet_meta();
tablet_meta->set_table_id(table_id);
@@ -10823,7 +10872,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to DROPPED
req.set_tablet_id(tablet_id);
- req.set_is_completed(false);
+ req.set_action(RestoreJobRequest::ABORT);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) <<
res.status().msg();
@@ -10856,6 +10905,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
RestoreJobResponse make_res;
make_req.set_tablet_id(tablet_id);
make_req.set_expiration(time(nullptr) + 3600);
+ make_req.set_action(RestoreJobRequest::PREPARE);
// set tablet meta
auto* tablet_meta = make_req.mutable_tablet_meta();
@@ -10872,7 +10922,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
// finish_restore_job to COMPLETED
req.set_tablet_id(tablet_id);
- req.set_is_completed(true);
+ req.set_action(RestoreJobRequest::COMPLETE);
meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
ASSERT_TRUE(res.status().msg().find("invalid state to complete") !=
std::string::npos);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 8330e52ae00..99cea3d72fd 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -650,7 +650,8 @@ static int create_restore_job_rowset(TxnKv* txn_kv,
StorageVaultAccessor* access
return 0;
}
-static int create_restore_job_tablet(TxnKv* txn_kv, int64_t tablet_id) {
+static int create_restore_job_tablet(TxnKv* txn_kv, int64_t tablet_id,
+ RestoreJobCloudPB::State state) {
std::string key;
std::string val;
@@ -661,7 +662,7 @@ static int create_restore_job_tablet(TxnKv* txn_kv, int64_t
tablet_id) {
restore_job_pb.set_tablet_id(tablet_id);
restore_job_pb.set_ctime_s(::time(nullptr) - 3600);
restore_job_pb.set_expired_at_s(0);
- restore_job_pb.set_state(RestoreJobCloudPB::DROPPED);
+ restore_job_pb.set_state(state);
restore_job_pb.SerializeToString(&val);
std::unique_ptr<Transaction> txn;
@@ -1837,7 +1838,9 @@ TEST(RecyclerTest, recycle_restore_jobs) {
for (int i = 0; i < 20; ++i) {
int64_t tablet_id = tablet_id_base + i;
ASSERT_EQ(create_tablet(txn_kv.get(), table_id, i, partition_id,
tablet_id), 0);
- create_restore_job_tablet(txn_kv.get(), tablet_id);
+ // create restore job for recycle
+ ASSERT_EQ(create_restore_job_tablet(txn_kv.get(), tablet_id,
RestoreJobCloudPB::COMPLETED),
+ 0);
for (int j = 0; j < 5; ++j) {
ASSERT_EQ(
create_restore_job_rowset(txn_kv.get(), accessor.get(),
"recycle_restore_jobs",
@@ -1846,6 +1849,7 @@ TEST(RecyclerTest, recycle_restore_jobs) {
}
}
+ // not recycle and change restore job from COMPLETED to RECYCLING
ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
std::unique_ptr<Transaction> txn;
@@ -1855,6 +1859,21 @@ TEST(RecyclerTest, recycle_restore_jobs) {
auto begin_key = job_restore_tablet_key({instance_id, 0});
auto end_key = job_restore_tablet_key({instance_id, INT64_MAX});
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 20);
+
+ begin_key = job_restore_rowset_key({instance_id, 0, 0});
+ end_key = job_restore_rowset_key({instance_id, INT64_MAX, 0});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+ ASSERT_EQ(it->size(), 100);
+
+ // recycle restore job with status RECYCLING
+ ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ begin_key = job_restore_tablet_key({instance_id, 0});
+ end_key = job_restore_tablet_key({instance_id, INT64_MAX});
+ ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
ASSERT_EQ(it->size(), 0);
begin_key = job_restore_rowset_key({instance_id, 0, 0});
@@ -4120,6 +4139,67 @@ TEST(CheckerTest, check_job_key) {
ASSERT_EQ(checker.do_mow_job_key_check(), -1);
}
+TEST(CheckerTest, do_restore_job_check) {
+ config::enable_restore_job_check = true;
+ std::string instance_id = "test_do_restore_job_check";
+ [[maybe_unused]] auto sp = SyncPoint::get_instance();
+ DORIS_CLOUD_DEFER {
+ SyncPoint::get_instance()->clear_all_call_backs();
+ };
+ sp->set_call_back("get_instance_id", [&](auto&& args) {
+ auto* ret = try_any_cast_ret<std::string>(args);
+ ret->first = instance_id;
+ ret->second = true;
+ });
+ sp->enable_processing();
+
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("1");
+ InstanceChecker checker(txn_kv, instance_id);
+ ASSERT_EQ(checker.init(instance), 0);
+
+ // Prepare test data: simulate restore jobs in different states
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+ // Add a PREPARED restore job
+ RestoreJobCloudPB prepared_job;
+ prepared_job.set_tablet_id(10001);
+ prepared_job.set_state(RestoreJobCloudPB::PREPARED);
+ prepared_job.set_ctime_s(::time(nullptr) - 1800); // 30 minutes ago
+ std::string prepared_key;
+ job_restore_tablet_key({instance_id, prepared_job.tablet_id()},
&prepared_key);
+ txn->put(prepared_key, prepared_job.SerializeAsString());
+
+ // Add a COMMITTED restore job
+ RestoreJobCloudPB committed_job;
+ committed_job.set_tablet_id(10002);
+ committed_job.set_state(RestoreJobCloudPB::COMMITTED);
+ committed_job.set_ctime_s(::time(nullptr) - 7200); // 2 hours ago
+ std::string committed_key;
+ job_restore_tablet_key({instance_id, committed_job.tablet_id()},
&committed_key);
+ txn->put(committed_key, committed_job.SerializeAsString());
+
+ // Add a COMPLETED restore job
+ RestoreJobCloudPB completed_job;
+ completed_job.set_tablet_id(10003);
+ completed_job.set_state(RestoreJobCloudPB::COMPLETED);
+ completed_job.set_ctime_s(::time(nullptr) - 3600); // 1 hour ago
+ std::string completed_key;
+ job_restore_tablet_key({instance_id, completed_job.tablet_id()},
&completed_key);
+ txn->put(completed_key, completed_job.SerializeAsString());
+
+ ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+ // Run the check
+ ASSERT_EQ(checker.do_restore_job_check(), 0);
+}
+
TEST(CheckerTest, delete_bitmap_storage_optimize_v2_check_normal) {
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);
@@ -5574,6 +5654,57 @@ TEST(RecyclerTest,
concurrent_recycle_txn_label_failure_test) {
<< "ms" << std::endl;
check_multiple_txn_info_kvs(txn_kv, 5000);
}
+
+TEST(RecyclerTest, recycle_restore_job_complete_state) {
+ // cloud::config::fdb_cluster_file_path = "fdb.cluster";
+ // auto txn_kv =
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+ // txn_kv->init();
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ ASSERT_EQ(txn_kv->init(), 0);
+
+ InstanceInfoPB instance;
+ instance.set_instance_id(instance_id);
+ auto obj_info = instance.add_obj_info();
+ obj_info->set_id("recycle_restore_job_transaction");
+ obj_info->set_ak(config::test_s3_ak);
+ obj_info->set_sk(config::test_s3_sk);
+ obj_info->set_endpoint(config::test_s3_endpoint);
+ obj_info->set_region(config::test_s3_region);
+ obj_info->set_bucket(config::test_s3_bucket);
+ obj_info->set_prefix("recycle_restore_job_transaction");
+
+ InstanceRecycler recycler(txn_kv, instance, thread_group,
+ std::make_shared<TxnLazyCommitter>(txn_kv));
+ ASSERT_EQ(recycler.init(), 0);
+ auto accessor = recycler.accessor_map_.begin()->second;
+
+ int64_t tablet_id = 9876;
+ std::string key;
+ JobRestoreTabletKeyInfo key_info {instance_id, tablet_id};
+ job_restore_tablet_key(key_info, &key);
+
+ RestoreJobCloudPB restore_job_pb;
+ restore_job_pb.set_tablet_id(tablet_id);
+ restore_job_pb.set_ctime_s(::time(nullptr) - 3600);
+ restore_job_pb.set_expired_at_s(0);
+ // set job state to COMPLETED
+ restore_job_pb.set_state(RestoreJobCloudPB::COMPLETED);
+
+ std::string val = restore_job_pb.SerializeAsString();
+ std::unique_ptr<Transaction> setup_txn;
+ ASSERT_EQ(txn_kv->create_txn(&setup_txn), TxnErrorCode::TXN_OK);
+ setup_txn->put(key, val);
+ ASSERT_EQ(setup_txn->commit(), TxnErrorCode::TXN_OK);
+
+ for (int i = 0; i < 3; i++) {
+ ASSERT_EQ(create_restore_job_rowset(txn_kv.get(), accessor.get(),
+ "recycle_restore_job_transaction",
tablet_id, i),
+ 0);
+ }
+
+ ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+}
+
TEST(RecyclerTest, concurrent_recycle_txn_label_conflict_test) {
config::label_keep_max_second = 0;
config::recycle_pool_parallelism = 20;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6b1e697559d..2f6c076f2c4 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1121,12 +1121,19 @@ message PartitionResponse {
}
message RestoreJobRequest {
+ enum Action {
+ UNKONWN = 0;
+ PREPARE = 1;
+ COMMIT = 2;
+ ABORT = 3;
+ COMPLETE = 4;
+ }
optional string cloud_unique_id = 1;
optional int64 tablet_id = 2;
optional doris.TabletMetaCloudPB tablet_meta = 3;
optional int64 expiration = 4;
optional string request_ip = 5;
- optional bool is_completed = 6;
+ optional Action action = 6;
}
message RestoreJobResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]