This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 78689d71053 branch-3.1: [enhancement](cloud) Add checker and more bvar 
for cloud restore #54912 #54994 #55161 (#55174)
78689d71053 is described below

commit 78689d71053fa0c2ff3161e16f66d2c6d31b514f
Author: Uniqueyou <[email protected]>
AuthorDate: Wed Aug 27 17:54:19 2025 +0800

    branch-3.1: [enhancement](cloud) Add checker and more bvar for cloud 
restore #54912 #54994 #55161 (#55174)
    
    picked #54912 #54994 #55161
    
    ---------
    
    Co-authored-by: xy720 <[email protected]>
---
 be/src/cloud/cloud_meta_mgr.cpp         |   4 +-
 cloud/src/common/bvars.cpp              |   6 ++
 cloud/src/common/bvars.h                |  19 +++++
 cloud/src/common/config.h               |   1 +
 cloud/src/meta-service/meta_service.cpp |  37 ++++++++-
 cloud/src/recycler/checker.cpp          | 113 +++++++++++++++++++++++++-
 cloud/src/recycler/checker.h            |   2 +
 cloud/src/recycler/recycler.cpp         |   1 +
 cloud/test/meta_service_test.cpp        |  56 ++++++++++++-
 cloud/test/recycler_test.cpp            | 137 +++++++++++++++++++++++++++++++-
 gensrc/proto/cloud.proto                |   9 ++-
 11 files changed, 373 insertions(+), 12 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 833786e4157..7e8f07d26cb 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1205,6 +1205,7 @@ Status CloudMetaMgr::prepare_restore_job(const 
TabletMetaPB& tablet_meta) {
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_meta.tablet_id());
     req.set_expiration(config::snapshot_expire_time_sec);
+    req.set_action(RestoreJobRequest::PREPARE);
 
     doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), 
std::move(tablet_meta));
     return retry_rpc("prepare restore job", req, &resp, 
&MetaService_Stub::prepare_restore_job);
@@ -1216,6 +1217,7 @@ Status CloudMetaMgr::commit_restore_job(const int64_t 
tablet_id) {
     RestoreJobResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_id);
+    req.set_action(RestoreJobRequest::COMMIT);
 
     return retry_rpc("commit restore job", req, &resp, 
&MetaService_Stub::commit_restore_job);
 }
@@ -1227,7 +1229,7 @@ Status CloudMetaMgr::finish_restore_job(const int64_t 
tablet_id, bool is_complet
     RestoreJobResponse resp;
     req.set_cloud_unique_id(config::cloud_unique_id);
     req.set_tablet_id(tablet_id);
-    req.set_is_completed(is_completed);
+    req.set_action(is_completed ? RestoreJobRequest::COMPLETE : 
RestoreJobRequest::ABORT);
 
     return retry_rpc("finish restore job", req, &resp, 
&MetaService_Stub::finish_restore_job);
 }
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 1678f29bdff..eb8618d4048 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -231,6 +231,12 @@ BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_leaked_delete_bitmaps("checke
 BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker", 
"abnormal_delete_bitmaps");
 BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_delete_bitmaps_scanned("checker", 
"delete_bitmap_keys_scanned");
 BvarStatusWithTag<int64_t> 
g_bvar_max_rowsets_with_useless_delete_bitmap_version("checker", 
"max_rowsets_with_useless_delete_bitmap_version");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_prepared_state("checker", 
"restore_job_prepared_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_committed_state("checker", 
"restore_job_committed_state");
+BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state("checker", 
"restore_job_dropped_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_completed_state("checker", 
"restore_job_completed_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_recycling_state("checker", 
"restore_job_recycling_state");
+BvarStatusWithTag<int64_t> 
g_bvar_checker_restore_job_cost_many_time("checker", 
"restore_job_cost_many_time");
 
 // rpc kv rw count
 // get_rowset
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index 1525cc260d9..9d7dfa232f0 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -374,6 +374,13 @@ extern BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_abnormal_delete_bitmap
 extern BvarStatusWithTag<int64_t> 
g_bvar_inverted_checker_delete_bitmaps_scanned;
 extern BvarStatusWithTag<int64_t> 
g_bvar_max_rowsets_with_useless_delete_bitmap_version;
 
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_prepared_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_committed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_dropped_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_completed_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_recycling_state;
+extern BvarStatusWithTag<int64_t> g_bvar_checker_restore_job_cost_many_time;
+
 // rpc kv
 extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_counter;
@@ -421,6 +428,12 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_commit_partition_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_counter;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_counter;
 extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_counter;
@@ -527,6 +540,12 @@ extern mBvarInt64Adder 
g_bvar_rpc_kv_commit_partition_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_commit_partition_del_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_drop_partition_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_prepare_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_commit_restore_job_put_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_get_bytes;
+extern mBvarInt64Adder g_bvar_rpc_kv_finish_restore_job_put_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_check_kv_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_get_obj_store_info_get_bytes;
 extern mBvarInt64Adder g_bvar_rpc_kv_alter_storage_vault_get_bytes;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 351ade480a1..4cd46c605fa 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -118,6 +118,7 @@ CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
 CONF_Bool(force_immediate_recycle, "false");
 
 CONF_mBool(enable_mow_job_key_check, "false");
+CONF_mBool(enable_restore_job_check, "false");
 
 CONF_mBool(enable_checker_for_meta_key_check, "false");
 CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 71bee7225de..c54c1e30b95 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -998,6 +998,13 @@ void 
MetaServiceImpl::prepare_restore_job(::google::protobuf::RpcController* con
                                           RestoreJobResponse* response,
                                           ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(prepare_restore_job);
+    if (request->action() != RestoreJobRequest::PREPARE) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected PREPARE but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1179,6 +1186,13 @@ void 
MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
                                          RestoreJobResponse* response,
                                          ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(commit_restore_job);
+    if (request->action() != RestoreJobRequest::COMMIT) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected COMMIT but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1516,6 +1530,14 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
                                          RestoreJobResponse* response,
                                          ::google::protobuf::Closure* done) {
     RPC_PREPROCESS(finish_restore_job);
+    if (request->action() != RestoreJobRequest::COMPLETE &&
+        request->action() != RestoreJobRequest::ABORT) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = "invalid action, expected COMPLETE or ABORT but got " +
+              RestoreJobRequest::Action_Name(request->action());
+        return;
+    }
+
     if (!request->has_tablet_id()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         msg = "empty tablet_id";
@@ -1570,7 +1592,6 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
         return;
     }
 
-    bool is_completed = request->has_is_completed() && request->is_completed();
     if (restore_job_pb.state() == RestoreJobCloudPB::DROPPED ||
         restore_job_pb.state() == RestoreJobCloudPB::COMPLETED) {
         LOG_INFO("restore job already finished")
@@ -1587,7 +1608,8 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
         return;
     } else {
         // PREPARED, COMMITTED state
-        if (is_completed && restore_job_pb.state() != 
RestoreJobCloudPB::COMMITTED) {
+        if (request->action() == RestoreJobRequest::COMPLETE &&
+            restore_job_pb.state() != RestoreJobCloudPB::COMMITTED) {
             // Only allow COMMITTED -> COMPLETED
             code = MetaServiceCode::INVALID_ARGUMENT;
             msg = fmt::format("restore tablet {} in invalid state to complete, 
state: {}",
@@ -1595,10 +1617,21 @@ void 
MetaServiceImpl::finish_restore_job(::google::protobuf::RpcController* cont
                               
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
             return;
         }
+        if (request->action() == RestoreJobRequest::ABORT &&
+            (restore_job_pb.state() != RestoreJobCloudPB::PREPARED &&
+             restore_job_pb.state() != RestoreJobCloudPB::COMMITTED)) {
+            // Only allow PREPARED/COMMITTED -> DROPPED
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            msg = fmt::format("restore tablet {} in invalid state to abort, 
state: {}",
+                              tablet_idx.tablet_id(),
+                              
RestoreJobCloudPB::State_Name(restore_job_pb.state()));
+            return;
+        }
     }
 
     // 2. update restore job
     std::string to_save_val;
+    bool is_completed = request->action() == RestoreJobRequest::COMPLETE;
     restore_job_pb.set_state(is_completed ? RestoreJobCloudPB::COMPLETED
                                           : RestoreJobCloudPB::DROPPED);
     restore_job_pb.set_need_recycle_data(!is_completed);
diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp
index bd1a5add4a0..544d6dfb52f 100644
--- a/cloud/src/recycler/checker.cpp
+++ b/cloud/src/recycler/checker.cpp
@@ -198,6 +198,12 @@ int Checker::start() {
                 }
             }
 
+            if (config::enable_restore_job_check) {
+                if (int ret = checker->do_restore_job_check(); ret != 0) {
+                    success = false;
+                }
+            }
+
             if (config::enable_delete_bitmap_storage_optimize_v2_check) {
                 if (int ret = 
checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
                     ret != 0) {
@@ -1375,7 +1381,7 @@ int 
InstanceChecker::check_inverted_index_file_storage_format_v1(
         // Garbage data leak
         // clang-format off
         LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains 
segment_id, rowset should be recycled,"
-                     << " key = " << file_path 
+                     << " key = " << file_path
                      << " segment_id = " << segment_id;
         // clang-format on
         return 1;
@@ -1385,7 +1391,7 @@ int 
InstanceChecker::check_inverted_index_file_storage_format_v1(
         // Garbage data leak
         // clang-format off
         LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains 
index_id_with_suffix_name,"
-                     << " rowset with inde meta should be recycled, key=" << 
file_path 
+                     << " rowset with inde meta should be recycled, key=" << 
file_path
                      << " index_id_with_suffix_name=" << 
index_id_with_suffix_name;
         // clang-format on
         return 1;
@@ -1758,4 +1764,107 @@ int InstanceChecker::do_mow_job_key_check() {
     return 0;
 }
 
+int InstanceChecker::do_restore_job_check() {
+    int64_t num_prepared = 0;
+    int64_t num_committed = 0;
+    int64_t num_dropped = 0;
+    int64_t num_completed = 0;
+    int64_t num_recycling = 0;
+    int64_t num_cost_many_time = 0;
+    const int64_t COST_MANY_THRESHOLD = 3600;
+
+    using namespace std::chrono;
+    auto start_time = steady_clock::now();
+    DORIS_CLOUD_DEFER {
+        g_bvar_checker_restore_job_prepared_state.put(instance_id_, 
num_prepared);
+        g_bvar_checker_restore_job_committed_state.put(instance_id_, 
num_committed);
+        g_bvar_checker_restore_job_dropped_state.put(instance_id_, 
num_dropped);
+        g_bvar_checker_restore_job_completed_state.put(instance_id_, 
num_completed);
+        g_bvar_checker_restore_job_recycling_state.put(instance_id_, 
num_recycling);
+        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, 
num_cost_many_time);
+        auto cost_ms =
+                duration_cast<std::chrono::milliseconds>(steady_clock::now() - 
start_time).count();
+        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
+                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << 
num_prepared
+                  << " num_committed=" << num_committed << " num_dropped=" << 
num_dropped
+                  << " num_completed=" << num_completed << " num_recycling=" 
<< num_recycling
+                  << " num_cost_many_time=" << num_cost_many_time;
+    };
+
+    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
+
+    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
+    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
+    std::string begin;
+    std::string end;
+    job_restore_tablet_key(restore_job_key_info0, &begin);
+    job_restore_tablet_key(restore_job_key_info1, &end);
+    std::unique_ptr<RangeGetIterator> it;
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv_->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to create txn";
+            return -1;
+        }
+        err = txn->get(begin, end, &it);
+        if (err != TxnErrorCode::TXN_OK) {
+            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
+            return -1;
+        }
+        if (!it->has_next()) {
+            break;
+        }
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+            RestoreJobCloudPB restore_job_pb;
+            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
+                LOG_WARNING("malformed restore job value").tag("key", hex(k));
+                return -1;
+            }
+
+            switch (restore_job_pb.state()) {
+            case RestoreJobCloudPB::PREPARED:
+                ++num_prepared;
+                break;
+            case RestoreJobCloudPB::COMMITTED:
+                ++num_committed;
+                break;
+            case RestoreJobCloudPB::DROPPED:
+                ++num_dropped;
+                break;
+            case RestoreJobCloudPB::COMPLETED:
+                ++num_completed;
+                break;
+            case RestoreJobCloudPB::RECYCLING:
+                ++num_recycling;
+                break;
+            default:
+                break;
+            }
+
+            int64_t current_time = ::time(nullptr);
+            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
+                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
+                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) 
{
+                // restore job run more than 1 hour
+                ++num_cost_many_time;
+                LOG_WARNING("restore job cost too many time")
+                        .tag("key", hex(k))
+                        .tag("tablet_id", restore_job_pb.tablet_id())
+                        .tag("state", restore_job_pb.state())
+                        .tag("ctime_s", restore_job_pb.ctime_s())
+                        .tag("mtime_s", restore_job_pb.mtime_s());
+            }
+
+            if (!it->has_next()) {
+                begin = k;
+                begin.push_back('\x00'); // Update to next smallest key for 
iteration
+                break;
+            }
+        }
+    } while (it->more() && !stopped());
+    return 0;
+}
+
 } // namespace doris::cloud
diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h
index fb8f084297d..436e1302b07 100644
--- a/cloud/src/recycler/checker.h
+++ b/cloud/src/recycler/checker.h
@@ -107,6 +107,8 @@ public:
 
     int do_mow_job_key_check();
 
+    int do_restore_job_check();
+
     // If there are multiple buckets, return the minimum lifecycle; if there 
are no buckets (i.e.
     // all accessors are HdfsAccessor), return INT64_MAX.
     // Return 0 if success, otherwise error
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 5d3e4c5713d..fab7de75c2a 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2706,6 +2706,7 @@ int InstanceRecycler::recycle_restore_jobs() {
                 LOG_WARNING("failed to commit txn: {}", err);
                 return -1;
             }
+            return 0;
         }
 
         std::string restore_job_rs_key0 = 
job_restore_rowset_key({instance_id_, tablet_id, 0});
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 0e1175085af..a52df83e3b1 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -10431,6 +10431,11 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
         txn->put(meta_tablet_idx_key({instance_id, tablet_id}), 
tablet_idx_val);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+        // empty action
+        meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // empty tablet id
         meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
@@ -10443,6 +10448,13 @@ TEST(MetaServiceTest, RestoreJobTest) {
         meta_service->prepare_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "no tablet meta");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10454,6 +10466,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
         req.set_tablet_id(tablet_id);
         req.set_expiration(time(nullptr) + 3600);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = req.mutable_tablet_meta();
@@ -10494,6 +10507,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         txn->put(meta_tablet_idx_key({instance_id, tablet_id}), 
tablet_idx_val);
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = req.mutable_tablet_meta();
@@ -10536,10 +10550,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
     // invalid args commit restore job
     {
         reset_meta_service();
+        // empty action
+        meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::COMMIT);
+
         // empty tablet_id
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10551,6 +10578,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "restore job not exists or has been 
recycled");
@@ -10569,6 +10597,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
         tablet_meta->set_index_id(index_id);
@@ -10594,6 +10623,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // commit_restore_job
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         std::string tablet_key =
@@ -10639,6 +10669,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10670,6 +10701,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // commit_restore_job
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
@@ -10703,10 +10735,23 @@ TEST(MetaServiceTest, RestoreJobTest) {
     // invalid args finish restore job
     {
         reset_meta_service();
+        // empty action
+        meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
+        ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
+        ASSERT_TRUE(res.status().msg().find("invalid action") != 
std::string::npos);
+        req.set_action(RestoreJobRequest::COMPLETE);
+
         // empty tablet_id
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "empty tablet_id");
+
+        // check key existence
+        std::string restore_job_key = job_restore_tablet_key({instance_id, 
tablet_id});
+        std::string val;
+        ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
+        ASSERT_EQ(txn->get(restore_job_key, &val), 
TxnErrorCode::TXN_KEY_NOT_FOUND);
+
         req.Clear();
         res.Clear();
     }
@@ -10718,6 +10763,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_EQ(res.status().msg(), "restore job not exists or has been 
recycled");
@@ -10736,6 +10782,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10761,6 +10808,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         ASSERT_EQ(txn->get(restore_job_rs_key, &val), TxnErrorCode::TXN_OK);
 
         req.set_tablet_id(tablet_id);
+        req.set_action(RestoreJobRequest::COMMIT);
         meta_service->commit_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
         ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), 
TxnErrorCode::TXN_OK);
@@ -10769,7 +10817,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to COMPLETED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(true);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
 
@@ -10802,6 +10850,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         auto* tablet_meta = make_req.mutable_tablet_meta();
         tablet_meta->set_table_id(table_id);
@@ -10823,7 +10872,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to DROPPED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(false);
+        req.set_action(RestoreJobRequest::ABORT);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << 
res.status().msg();
 
@@ -10856,6 +10905,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
         RestoreJobResponse make_res;
         make_req.set_tablet_id(tablet_id);
         make_req.set_expiration(time(nullptr) + 3600);
+        make_req.set_action(RestoreJobRequest::PREPARE);
 
         // set tablet meta
         auto* tablet_meta = make_req.mutable_tablet_meta();
@@ -10872,7 +10922,7 @@ TEST(MetaServiceTest, RestoreJobTest) {
 
         // finish_restore_job to COMPLETED
         req.set_tablet_id(tablet_id);
-        req.set_is_completed(true);
+        req.set_action(RestoreJobRequest::COMPLETE);
         meta_service->finish_restore_job(&cntl, &req, &res, nullptr);
         ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT);
         ASSERT_TRUE(res.status().msg().find("invalid state to complete") != 
std::string::npos);
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 8330e52ae00..99cea3d72fd 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -650,7 +650,8 @@ static int create_restore_job_rowset(TxnKv* txn_kv, 
StorageVaultAccessor* access
     return 0;
 }
 
-static int create_restore_job_tablet(TxnKv* txn_kv, int64_t tablet_id) {
+static int create_restore_job_tablet(TxnKv* txn_kv, int64_t tablet_id,
+                                     RestoreJobCloudPB::State state) {
     std::string key;
     std::string val;
 
@@ -661,7 +662,7 @@ static int create_restore_job_tablet(TxnKv* txn_kv, int64_t 
tablet_id) {
     restore_job_pb.set_tablet_id(tablet_id);
     restore_job_pb.set_ctime_s(::time(nullptr) - 3600);
     restore_job_pb.set_expired_at_s(0);
-    restore_job_pb.set_state(RestoreJobCloudPB::DROPPED);
+    restore_job_pb.set_state(state);
     restore_job_pb.SerializeToString(&val);
 
     std::unique_ptr<Transaction> txn;
@@ -1837,7 +1838,9 @@ TEST(RecyclerTest, recycle_restore_jobs) {
     for (int i = 0; i < 20; ++i) {
         int64_t tablet_id = tablet_id_base + i;
         ASSERT_EQ(create_tablet(txn_kv.get(), table_id, i, partition_id, 
tablet_id), 0);
-        create_restore_job_tablet(txn_kv.get(), tablet_id);
+        // create restore job for recycle
+        ASSERT_EQ(create_restore_job_tablet(txn_kv.get(), tablet_id, 
RestoreJobCloudPB::COMPLETED),
+                  0);
         for (int j = 0; j < 5; ++j) {
             ASSERT_EQ(
                     create_restore_job_rowset(txn_kv.get(), accessor.get(), 
"recycle_restore_jobs",
@@ -1846,6 +1849,7 @@ TEST(RecyclerTest, recycle_restore_jobs) {
         }
     }
 
+    // not recycle and change restore job from COMPLETED to RECYCLING
     ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
 
     std::unique_ptr<Transaction> txn;
@@ -1855,6 +1859,21 @@ TEST(RecyclerTest, recycle_restore_jobs) {
     auto begin_key = job_restore_tablet_key({instance_id, 0});
     auto end_key = job_restore_tablet_key({instance_id, INT64_MAX});
     ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(it->size(), 20);
+
+    begin_key = job_restore_rowset_key({instance_id, 0, 0});
+    end_key = job_restore_rowset_key({instance_id, INT64_MAX, 0});
+    ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(it->size(), 100);
+
+    // recycle restore job with status RECYCLING
+    ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    begin_key = job_restore_tablet_key({instance_id, 0});
+    end_key = job_restore_tablet_key({instance_id, INT64_MAX});
+    ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
     ASSERT_EQ(it->size(), 0);
 
     begin_key = job_restore_rowset_key({instance_id, 0, 0});
@@ -4120,6 +4139,67 @@ TEST(CheckerTest, check_job_key) {
     ASSERT_EQ(checker.do_mow_job_key_check(), -1);
 }
 
+TEST(CheckerTest, do_restore_job_check) {
+    config::enable_restore_job_check = true;
+    std::string instance_id = "test_do_restore_job_check";
+    [[maybe_unused]] auto sp = SyncPoint::get_instance();
+    DORIS_CLOUD_DEFER {
+        SyncPoint::get_instance()->clear_all_call_backs();
+    };
+    sp->set_call_back("get_instance_id", [&](auto&& args) {
+        auto* ret = try_any_cast_ret<std::string>(args);
+        ret->first = instance_id;
+        ret->second = true;
+    });
+    sp->enable_processing();
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("1");
+    InstanceChecker checker(txn_kv, instance_id);
+    ASSERT_EQ(checker.init(instance), 0);
+
+    // Prepare test data: simulate restore jobs in different states
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn_kv->create_txn(&txn));
+
+    // Add a PREPARED restore job
+    RestoreJobCloudPB prepared_job;
+    prepared_job.set_tablet_id(10001);
+    prepared_job.set_state(RestoreJobCloudPB::PREPARED);
+    prepared_job.set_ctime_s(::time(nullptr) - 1800); // 30 minutes ago
+    std::string prepared_key;
+    job_restore_tablet_key({instance_id, prepared_job.tablet_id()}, 
&prepared_key);
+    txn->put(prepared_key, prepared_job.SerializeAsString());
+
+    // Add a COMMITTED restore job
+    RestoreJobCloudPB committed_job;
+    committed_job.set_tablet_id(10002);
+    committed_job.set_state(RestoreJobCloudPB::COMMITTED);
+    committed_job.set_ctime_s(::time(nullptr) - 7200); // 2 hours ago
+    std::string committed_key;
+    job_restore_tablet_key({instance_id, committed_job.tablet_id()}, 
&committed_key);
+    txn->put(committed_key, committed_job.SerializeAsString());
+
+    // Add a COMPLETED restore job
+    RestoreJobCloudPB completed_job;
+    completed_job.set_tablet_id(10003);
+    completed_job.set_state(RestoreJobCloudPB::COMPLETED);
+    completed_job.set_ctime_s(::time(nullptr) - 3600); // 1 hour ago
+    std::string completed_key;
+    job_restore_tablet_key({instance_id, completed_job.tablet_id()}, 
&completed_key);
+    txn->put(completed_key, completed_job.SerializeAsString());
+
+    ASSERT_EQ(TxnErrorCode::TXN_OK, txn->commit());
+
+    // Run the check
+    ASSERT_EQ(checker.do_restore_job_check(), 0);
+}
+
 TEST(CheckerTest, delete_bitmap_storage_optimize_v2_check_normal) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);
@@ -5574,6 +5654,57 @@ TEST(RecyclerTest, 
concurrent_recycle_txn_label_failure_test) {
               << "ms" << std::endl;
     check_multiple_txn_info_kvs(txn_kv, 5000);
 }
+
+TEST(RecyclerTest, recycle_restore_job_complete_state) {
+    // cloud::config::fdb_cluster_file_path = "fdb.cluster";
+    // auto txn_kv = 
std::dynamic_pointer_cast<cloud::TxnKv>(std::make_shared<cloud::FdbTxnKv>());
+    // txn_kv->init();
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_restore_job_transaction");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_restore_job_transaction");
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+    auto accessor = recycler.accessor_map_.begin()->second;
+
+    int64_t tablet_id = 9876;
+    std::string key;
+    JobRestoreTabletKeyInfo key_info {instance_id, tablet_id};
+    job_restore_tablet_key(key_info, &key);
+
+    RestoreJobCloudPB restore_job_pb;
+    restore_job_pb.set_tablet_id(tablet_id);
+    restore_job_pb.set_ctime_s(::time(nullptr) - 3600);
+    restore_job_pb.set_expired_at_s(0);
+    // set job state to COMPLETED
+    restore_job_pb.set_state(RestoreJobCloudPB::COMPLETED);
+
+    std::string val = restore_job_pb.SerializeAsString();
+    std::unique_ptr<Transaction> setup_txn;
+    ASSERT_EQ(txn_kv->create_txn(&setup_txn), TxnErrorCode::TXN_OK);
+    setup_txn->put(key, val);
+    ASSERT_EQ(setup_txn->commit(), TxnErrorCode::TXN_OK);
+
+    for (int i = 0; i < 3; i++) {
+        ASSERT_EQ(create_restore_job_rowset(txn_kv.get(), accessor.get(),
+                                            "recycle_restore_job_transaction", 
tablet_id, i),
+                  0);
+    }
+
+    ASSERT_EQ(recycler.recycle_restore_jobs(), 0);
+}
+
 TEST(RecyclerTest, concurrent_recycle_txn_label_conflict_test) {
     config::label_keep_max_second = 0;
     config::recycle_pool_parallelism = 20;
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 6b1e697559d..2f6c076f2c4 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1121,12 +1121,19 @@ message PartitionResponse {
 }
 
 message RestoreJobRequest {
+    enum Action {
+        UNKONWN  = 0;
+        PREPARE = 1;
+        COMMIT = 2;
+        ABORT = 3;
+        COMPLETE = 4;
+    }
     optional string cloud_unique_id = 1;
     optional int64 tablet_id = 2;
     optional doris.TabletMetaCloudPB tablet_meta = 3;
     optional int64 expiration = 4;
     optional string request_ip = 5;
-    optional bool is_completed = 6;
+    optional Action action = 6;
 }
 
 message RestoreJobResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to