This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e23a2fd1cd [feat](snapshot) Support storage vault for clone instance 
(#63217)
9e23a2fd1cd is described below

commit 9e23a2fd1cdd4565492ac408dc862b325a230403
Author: Yixuan Wang <[email protected]>
AuthorDate: Fri May 15 11:11:24 2026 +0800

    [feat](snapshot) Support storage vault for clone instance (#63217)
---
 cloud/src/meta-service/meta_service_resource.cpp | 520 ++++++++++++++++++-----
 cloud/test/resource_test.cpp                     | 166 ++++++++
 2 files changed, 589 insertions(+), 97 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index 1c511488673..ed36b54322c 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -325,6 +325,96 @@ static int find_cascade_instances(TxnKv* txn_kv, const 
std::string& root_instanc
     return 0;
 }
 
+static int find_storage_vault_position_by_id(const InstanceInfoPB& instance,
+                                             std::string_view vault_id) {
+    auto id_itr =
+            std::find(instance.resource_ids().begin(), 
instance.resource_ids().end(), vault_id);
+    if (id_itr == instance.resource_ids().end()) {
+        return -1;
+    }
+    return static_cast<int>(id_itr - instance.resource_ids().begin());
+}
+
+static int find_storage_vault_id_by_name(const InstanceInfoPB& instance,
+                                         std::string_view vault_name, 
std::string* vault_id) {
+    auto name_itr = std::find_if(
+            instance.storage_vault_names().begin(), 
instance.storage_vault_names().end(),
+            [&](const auto& current_name) { return current_name == vault_name; 
});
+    if (name_itr == instance.storage_vault_names().end()) {
+        return -1;
+    }
+    int pos = static_cast<int>(name_itr - 
instance.storage_vault_names().begin());
+    *vault_id = instance.resource_ids().Get(pos);
+    return 0;
+}
+
+static int alter_instance_obj_store_info_by_id(InstanceInfoPB& instance,
+                                               std::string_view target_obj_id, 
std::string_view ak,
+                                               std::string_view sk, 
std::string_view role_arn,
+                                               std::string_view external_id,
+                                               const EncryptionInfoPB& 
encryption_info,
+                                               MetaServiceCode& code, 
std::string& msg) {
+    auto& obj_info = 
const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
+    for (auto& it : obj_info) {
+        if (it.id() != target_obj_id) {
+            continue;
+        }
+
+        if (role_arn.empty()) {
+            if (it.ak() == ak && it.sk() == sk) {
+                code = MetaServiceCode::OK;
+                msg = "ak/sk not changed";
+                return 1;
+            }
+            it.clear_role_arn();
+            it.clear_external_id();
+            it.clear_cred_provider_type();
+
+            it.set_ak(std::string(ak));
+            it.set_sk(std::string(sk));
+            it.mutable_encryption_info()->CopyFrom(encryption_info);
+        } else {
+            if (!ak.empty() || !sk.empty()) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = "invaild argument, both set ak/sk and role_arn is not 
allowed";
+                LOG(INFO) << msg;
+                return -1;
+            }
+
+            if (it.provider() != ObjectStoreInfoPB::S3) {
+                code = MetaServiceCode::INVALID_ARGUMENT;
+                msg = "role_arn is only supported for s3 provider";
+                LOG(INFO) << msg << " provider=" << it.provider();
+                return -1;
+            }
+
+            if (it.role_arn() == role_arn && it.external_id() == external_id) {
+                code = MetaServiceCode::OK;
+                msg = "ak/sk not changed";
+                return 1;
+            }
+            it.clear_ak();
+            it.clear_sk();
+            it.clear_encryption_info();
+
+            it.set_role_arn(std::string(role_arn));
+            it.set_external_id(std::string(external_id));
+            it.set_cred_provider_type(CredProviderTypePB::INSTANCE_PROFILE);
+        }
+
+        auto now_time = std::chrono::system_clock::now();
+        uint64_t time =
+                
std::chrono::duration_cast<std::chrono::seconds>(now_time.time_since_epoch())
+                        .count();
+        it.set_mtime(time);
+        return 0;
+    }
+
+    code = MetaServiceCode::INVALID_ARGUMENT;
+    msg = fmt::format("obj info id={} not found", target_obj_id);
+    return -1;
+}
+
 // Helper function to update AK/SK for a single instance
 // Returns 0 on success, -1 on error
 static int update_instance_ak_sk(InstanceInfoPB& instance, const 
UpdateAkSkRequest* request,
@@ -805,9 +895,11 @@ static bool vault_exist(const InstanceInfoPB& instance, 
const std::string& new_v
     return false;
 }
 
-static int alter_hdfs_storage_vault(InstanceInfoPB& instance, 
std::unique_ptr<Transaction>& txn,
-                                    const StorageVaultPB& vault, 
MetaServiceCode& code,
-                                    std::string& msg, 
AlterObjStoreInfoResponse* response) {
+static int alter_hdfs_storage_vault_by_id(InstanceInfoPB& instance,
+                                          std::unique_ptr<Transaction>& txn,
+                                          std::string_view target_vault_id,
+                                          const StorageVaultPB& vault, 
MetaServiceCode& code,
+                                          std::string& msg, 
AlterObjStoreInfoResponse* response) {
     if (!vault.has_hdfs_info()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         std::stringstream ss;
@@ -825,19 +917,25 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tr
         return -1;
     }
     const auto& name = vault.name();
-    // Here we try to get mutable iter since we might need to alter the vault 
name
-    auto name_itr = 
std::find_if(instance.mutable_storage_vault_names()->begin(),
-                                 instance.mutable_storage_vault_names()->end(),
-                                 [&](const auto& vault_name) { return name == 
vault_name; });
-    if (name_itr == instance.storage_vault_names().end()) {
+    int pos = find_storage_vault_position_by_id(instance, target_vault_id);
+    if (pos < 0) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         std::stringstream ss;
-        ss << "invalid storage vault name, not found, name =" << name;
+        ss << "invalid storage vault id, not found, id =" << target_vault_id;
         msg = ss.str();
         return -1;
     }
-    auto pos = name_itr - instance.storage_vault_names().begin();
-    std::string vault_id = instance.resource_ids().begin()[pos];
+    auto* storage_vault_names = instance.mutable_storage_vault_names();
+    auto* name_ptr = storage_vault_names->Mutable(pos);
+    DCHECK(name_ptr != nullptr);
+    const std::string old_name = *name_ptr;
+    if (old_name != name) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = fmt::format("storage vault id={} name mismatch, expected={}, 
actual={}",
+                          target_vault_id, name, old_name);
+        return -1;
+    }
+    std::string vault_id = instance.resource_ids().Get(pos);
     auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
     std::string val;
 
@@ -881,7 +979,10 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tr
         }
 
         new_vault.set_name(vault.alter_name());
-        *name_itr = vault.alter_name();
+        *name_ptr = vault.alter_name();
+        if (instance.default_storage_vault_id() == vault_id) {
+            instance.set_default_storage_vault_name(vault.alter_name());
+        }
     }
     auto* alter_hdfs_info = new_vault.mutable_hdfs_info();
     if (hdfs_info.build_conf().has_hdfs_kerberos_keytab()) {
@@ -918,9 +1019,24 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tr
     return 0;
 }
 
-static int alter_s3_storage_vault(InstanceInfoPB& instance, 
std::unique_ptr<Transaction>& txn,
-                                  const StorageVaultPB& vault, 
MetaServiceCode& code,
-                                  std::string& msg, AlterObjStoreInfoResponse* 
response) {
+static int alter_hdfs_storage_vault(InstanceInfoPB& instance, 
std::unique_ptr<Transaction>& txn,
+                                    const StorageVaultPB& vault, 
MetaServiceCode& code,
+                                    std::string& msg, 
AlterObjStoreInfoResponse* response) {
+    std::string vault_id;
+    if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0) 
{
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        std::stringstream ss;
+        ss << "invalid storage vault name, not found, name =" << vault.name();
+        msg = ss.str();
+        return -1;
+    }
+    return alter_hdfs_storage_vault_by_id(instance, txn, vault_id, vault, 
code, msg, response);
+}
+
+static int alter_s3_storage_vault_by_id(InstanceInfoPB& instance, 
std::unique_ptr<Transaction>& txn,
+                                        std::string_view target_vault_id,
+                                        const StorageVaultPB& vault, 
MetaServiceCode& code,
+                                        std::string& msg, 
AlterObjStoreInfoResponse* response) {
     if (!vault.has_obj_info()) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         std::stringstream ss;
@@ -939,19 +1055,25 @@ static int alter_s3_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tran
     }
 
     const auto& name = vault.name();
-    // Here we try to get mutable iter since we might need to alter the vault 
name
-    auto name_itr = 
std::find_if(instance.mutable_storage_vault_names()->begin(),
-                                 instance.mutable_storage_vault_names()->end(),
-                                 [&](const auto& vault_name) { return name == 
vault_name; });
-    if (name_itr == instance.storage_vault_names().end()) {
+    int pos = find_storage_vault_position_by_id(instance, target_vault_id);
+    if (pos < 0) {
         code = MetaServiceCode::INVALID_ARGUMENT;
         std::stringstream ss;
-        ss << "invalid storage vault name, not found, name =" << name;
+        ss << "invalid storage vault id, not found, id =" << target_vault_id;
         msg = ss.str();
         return -1;
     }
-    auto pos = name_itr - instance.storage_vault_names().begin();
-    std::string vault_id = instance.resource_ids().begin()[pos];
+    auto* storage_vault_names = instance.mutable_storage_vault_names();
+    auto* name_ptr = storage_vault_names->Mutable(pos);
+    DCHECK(name_ptr != nullptr);
+    const std::string old_name = *name_ptr;
+    if (old_name != name) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = fmt::format("storage vault id={} name mismatch, expected={}, 
actual={}",
+                          target_vault_id, name, old_name);
+        return -1;
+    }
+    std::string vault_id = instance.resource_ids().Get(pos);
     auto vault_key = storage_vault_key({instance.instance_id(), vault_id});
     std::string val;
 
@@ -995,7 +1117,10 @@ static int alter_s3_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tran
         }
 
         new_vault.set_name(vault.alter_name());
-        *name_itr = vault.alter_name();
+        *name_ptr = vault.alter_name();
+        if (instance.default_storage_vault_id() == vault_id) {
+            instance.set_default_storage_vault_name(vault.alter_name());
+        }
     }
 
     if (obj_info.has_role_arn() && (obj_info.has_ak() || obj_info.has_sk())) {
@@ -1074,6 +1199,20 @@ static int alter_s3_storage_vault(InstanceInfoPB& 
instance, std::unique_ptr<Tran
     return 0;
 }
 
+static int alter_s3_storage_vault(InstanceInfoPB& instance, 
std::unique_ptr<Transaction>& txn,
+                                  const StorageVaultPB& vault, 
MetaServiceCode& code,
+                                  std::string& msg, AlterObjStoreInfoResponse* 
response) {
+    std::string vault_id;
+    if (find_storage_vault_id_by_name(instance, vault.name(), &vault_id) != 0) 
{
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        std::stringstream ss;
+        ss << "invalid storage vault name, not found, name =" << vault.name();
+        msg = ss.str();
+        return -1;
+    }
+    return alter_s3_storage_vault_by_id(instance, txn, vault_id, vault, code, 
msg, response);
+}
+
 struct ObjectStorageDesc {
     std::string& ak;
     std::string& sk;
@@ -1307,6 +1446,17 @@ void 
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
         return;
     }
 
+    bool supports_cascade = request->op() == 
AlterObjStoreInfoRequest::ALTER_S3_VAULT ||
+                            request->op() == 
AlterObjStoreInfoRequest::ALTER_HDFS_VAULT;
+    std::string root_vault_id;
+    if (supports_cascade &&
+        find_storage_vault_id_by_name(instance, request->vault().name(), 
&root_vault_id) != 0) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = fmt::format("invalid storage vault name, not found, name ={}",
+                          request->vault().name());
+        return;
+    }
+
     switch (request->op()) {
     case AlterObjStoreInfoRequest::ADD_S3_VAULT: {
         if (!instance.enable_storage_vault()) {
@@ -1494,6 +1644,115 @@ void 
MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
         code = cast_as<ErrCategory::COMMIT>(err);
         msg = fmt::format("failed to commit kv txn, err={}", err);
         LOG(WARNING) << msg;
+        return;
+    }
+
+    async_notify_refresh_instance(txn_kv_, instance_id, true);
+
+    if (!supports_cascade) {
+        return;
+    }
+
+    if (!instance.has_snapshot_switch_status() ||
+        instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
+        LOG(INFO) << "snapshot disabled for instance_id=" << instance_id
+                  << ", skip cascade updating derived instances after 
alter_storage_vault";
+        return;
+    }
+
+    std::vector<std::string> cascade_instance_ids;
+    if (find_cascade_instances(txn_kv_.get(), instance_id, 
&cascade_instance_ids) != 0) {
+        LOG(WARNING) << "failed to find derived instances for storage vault 
cascade, instance_id="
+                     << instance_id;
+        return;
+    }
+
+    for (const auto& cascade_id : cascade_instance_ids) {
+        std::unique_ptr<Transaction> cascade_txn;
+        TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(cascade_err);
+            msg = fmt::format(
+                    "failed to create txn for derived storage vault update, 
instance_id={}, "
+                    "err={}",
+                    cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        std::string cascade_key;
+        std::string cascade_val;
+        instance_key({cascade_id}, &cascade_key);
+        cascade_err = cascade_txn->get(cascade_key, &cascade_val);
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(cascade_err);
+            msg = fmt::format(
+                    "failed to get derived instance for storage vault update, 
instance_id={}, "
+                    "err={}",
+                    cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        InstanceInfoPB cascade_instance;
+        if (!cascade_instance.ParseFromString(cascade_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = fmt::format(
+                    "failed to parse derived InstanceInfoPB for storage vault 
update, "
+                    "instance_id={}",
+                    cascade_id);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        MetaServiceCode cascade_code = MetaServiceCode::OK;
+        std::string cascade_msg;
+        AlterObjStoreInfoResponse cascade_response;
+        int ret = -1;
+        if (request->op() == AlterObjStoreInfoRequest::ALTER_S3_VAULT) {
+            ret = alter_s3_storage_vault_by_id(cascade_instance, cascade_txn, 
root_vault_id,
+                                               request->vault(), cascade_code, 
cascade_msg,
+                                               &cascade_response);
+        } else if (request->op() == 
AlterObjStoreInfoRequest::ALTER_HDFS_VAULT) {
+            ret = alter_hdfs_storage_vault_by_id(cascade_instance, 
cascade_txn, root_vault_id,
+                                                 request->vault(), 
cascade_code, cascade_msg,
+                                                 &cascade_response);
+        }
+        if (ret != 0) {
+            code = cascade_code;
+            msg = fmt::format(
+                    "failed to cascade storage vault update, instance_id={}, 
vault_id={}, msg={}",
+                    cascade_id, root_vault_id, cascade_msg);
+            LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+            return;
+        }
+
+        cascade_val = cascade_instance.SerializeAsString();
+        if (cascade_val.empty()) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            msg = fmt::format(
+                    "failed to serialize derived instance after storage vault 
update, "
+                    "instance_id={}",
+                    cascade_id);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1);
+        cascade_txn->put(cascade_key, cascade_val);
+        cascade_err = cascade_txn->commit();
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::COMMIT>(cascade_err);
+            msg = fmt::format(
+                    "failed to commit derived storage vault update, 
instance_id={}, err={}",
+                    cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        async_notify_refresh_instance(txn_kv_, cascade_id, true);
+        LOG(INFO) << "cascade storage vault update finished, 
root_instance_id=" << instance_id
+                  << " derived_instance_id=" << cascade_id << " vault_id=" << 
root_vault_id;
     }
 }
 
@@ -1585,73 +1844,28 @@ void 
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
         return;
     }
 
+    bool supports_cascade = request->op() == 
AlterObjStoreInfoRequest::ALTER_OBJ_INFO ||
+                            request->op() == 
AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK;
+    std::string root_obj_id =
+            request->has_obj() && request->obj().has_id() ? 
request->obj().id() : "0";
+
     switch (request->op()) {
     case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK:
     case AlterObjStoreInfoRequest::ALTER_OBJ_INFO: {
-        // get id
-        std::string id = request->obj().has_id() ? request->obj().id() : "0";
-        int idx = std::stoi(id);
+        int idx = std::stoi(root_obj_id);
         if (idx < 1 || idx > instance.obj_info().size()) {
             // err
             code = MetaServiceCode::INVALID_ARGUMENT;
             msg = "id invalid, please check it";
             return;
         }
-        auto& obj_info =
-                
const_cast<std::decay_t<decltype(instance.obj_info())>&>(instance.obj_info());
-        for (auto& it : obj_info) {
-            if (std::stoi(it.id()) == idx) {
-                if (role_arn.empty()) {
-                    if (it.ak() == ak && it.sk() == sk) {
-                        // not change, just return ok
-                        code = MetaServiceCode::OK;
-                        msg = "ak/sk not changed";
-                        return;
-                    }
-                    it.clear_role_arn();
-                    it.clear_external_id();
-                    it.clear_cred_provider_type();
-
-                    it.set_ak(ak);
-                    it.set_sk(sk);
-                    it.mutable_encryption_info()->CopyFrom(encryption_info);
-                } else {
-                    if (!ak.empty() || !sk.empty()) {
-                        code = MetaServiceCode::INVALID_ARGUMENT;
-                        msg = "invaild argument, both set ak/sk and role_arn 
is not allowed";
-                        LOG(INFO) << msg;
-                        return;
-                    }
-
-                    if (it.provider() != ObjectStoreInfoPB::S3) {
-                        code = MetaServiceCode::INVALID_ARGUMENT;
-                        msg = "role_arn is only supported for s3 provider";
-                        LOG(INFO) << msg << " provider=" << it.provider();
-                        return;
-                    }
-
-                    if (it.role_arn() == role_arn && it.external_id() == 
external_id &&
-                        get_cred_provider_type(it) == 
get_cred_provider_type(request->obj())) {
-                        // not change, just return ok
-                        code = MetaServiceCode::OK;
-                        msg = "ak/sk not changed";
-                        return;
-                    }
-                    it.clear_ak();
-                    it.clear_sk();
-                    it.clear_encryption_info();
-
-                    it.set_role_arn(role_arn);
-                    it.set_external_id(external_id);
-                    
it.set_cred_provider_type(get_cred_provider_type(request->obj()));
-                }
-
-                auto now_time = std::chrono::system_clock::now();
-                uint64_t time = 
std::chrono::duration_cast<std::chrono::seconds>(
-                                        now_time.time_since_epoch())
-                                        .count();
-                it.set_mtime(time);
+        int ret = alter_instance_obj_store_info_by_id(instance, root_obj_id, 
ak, sk, role_arn,
+                                                      external_id, 
encryption_info, code, msg);
+        if (ret != 0) {
+            if (ret > 0) {
+                return;
             }
+            return;
         }
     } break;
     case AlterObjStoreInfoRequest::ADD_OBJ_INFO: {
@@ -1727,6 +1941,105 @@ void 
MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* cont
         code = cast_as<ErrCategory::COMMIT>(err);
         msg = fmt::format("failed to commit kv txn, err={}", err);
         LOG(WARNING) << msg;
+        return;
+    }
+
+    async_notify_refresh_instance(txn_kv_, instance_id, true);
+
+    if (!supports_cascade) {
+        return;
+    }
+
+    if (!instance.has_snapshot_switch_status() ||
+        instance.snapshot_switch_status() == SNAPSHOT_SWITCH_DISABLED) {
+        LOG(INFO) << "snapshot disabled for instance_id=" << instance_id
+                  << ", skip cascade updating derived instances after 
alter_obj_store_info";
+        return;
+    }
+
+    std::vector<std::string> cascade_instance_ids;
+    if (find_cascade_instances(txn_kv_.get(), instance_id, 
&cascade_instance_ids) != 0) {
+        LOG(WARNING) << "failed to find derived instances for obj store 
cascade, instance_id="
+                     << instance_id;
+        return;
+    }
+
+    for (const auto& cascade_id : cascade_instance_ids) {
+        std::unique_ptr<Transaction> cascade_txn;
+        TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(cascade_err);
+            msg = fmt::format(
+                    "failed to create txn for derived obj store update, 
instance_id={}, err={}",
+                    cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        std::string cascade_key;
+        std::string cascade_val;
+        instance_key({cascade_id}, &cascade_key);
+        cascade_err = cascade_txn->get(cascade_key, &cascade_val);
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(cascade_err);
+            msg = fmt::format(
+                    "failed to get derived instance for obj store update, 
instance_id={}, err={}",
+                    cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        InstanceInfoPB cascade_instance;
+        if (!cascade_instance.ParseFromString(cascade_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = fmt::format(
+                    "failed to parse derived InstanceInfoPB for obj store 
update, instance_id={}",
+                    cascade_id);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        MetaServiceCode cascade_code = MetaServiceCode::OK;
+        std::string cascade_msg;
+        int ret = alter_instance_obj_store_info_by_id(cascade_instance, 
root_obj_id, ak, sk,
+                                                      role_arn, external_id, 
encryption_info,
+                                                      cascade_code, 
cascade_msg);
+        if (ret != 0) {
+            if (ret < 0) {
+                code = cascade_code;
+                msg = fmt::format(
+                        "failed to cascade obj store update, instance_id={}, 
obj_info_id={}, "
+                        "msg={}",
+                        cascade_id, root_obj_id, cascade_msg);
+                LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+                return;
+            }
+        }
+
+        cascade_val = cascade_instance.SerializeAsString();
+        if (cascade_val.empty()) {
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            msg = fmt::format(
+                    "failed to serialize derived instance after obj store 
update, instance_id={}",
+                    cascade_id);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        cascade_txn->atomic_add(system_meta_service_instance_update_key(), 1);
+        cascade_txn->put(cascade_key, cascade_val);
+        cascade_err = cascade_txn->commit();
+        if (cascade_err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::COMMIT>(cascade_err);
+            msg = fmt::format("failed to commit derived obj store update, 
instance_id={}, err={}",
+                              cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
+        }
+
+        async_notify_refresh_instance(txn_kv_, cascade_id, true);
+        LOG(INFO) << "cascade obj store update finished, root_instance_id=" << 
instance_id
+                  << " derived_instance_id=" << cascade_id << " obj_info_id=" 
<< root_obj_id;
     }
 }
 
@@ -1849,9 +2162,11 @@ void 
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
         std::unique_ptr<Transaction> cascade_txn;
         TxnErrorCode cascade_err = txn_kv_->create_txn(&cascade_txn);
         if (cascade_err != TxnErrorCode::TXN_OK) {
-            LOG(WARNING) << "failed to create txn for derived instance, 
instance_id=" << cascade_id
-                         << " err=" << cascade_err;
-            continue;
+            code = cast_as<ErrCategory::CREATE>(cascade_err);
+            msg = fmt::format("failed to create txn for derived instance, 
instance_id={}, err={}",
+                              cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
         }
 
         InstanceKeyInfo cascade_key_info {cascade_id};
@@ -1861,15 +2176,20 @@ void 
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
 
         cascade_err = cascade_txn->get(cascade_key, &cascade_val);
         if (cascade_err != TxnErrorCode::TXN_OK) {
-            LOG(WARNING) << "failed to get derived instance, instance_id=" << 
cascade_id
-                         << " err=" << cascade_err;
-            continue;
+            code = cast_as<ErrCategory::READ>(cascade_err);
+            msg = fmt::format("failed to get derived instance, instance_id={}, 
err={}", cascade_id,
+                              cascade_err);
+            LOG(WARNING) << msg;
+            return;
         }
 
         InstanceInfoPB cascade_instance;
         if (!cascade_instance.ParseFromString(cascade_val)) {
-            LOG(WARNING) << "failed to parse InstanceInfoPB for derived 
instance_id=" << cascade_id;
-            continue;
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = fmt::format("failed to parse InstanceInfoPB for derived 
instance_id={}",
+                              cascade_id);
+            LOG(WARNING) << msg;
+            return;
         }
 
         // Update the cascade instance using helper function
@@ -1878,24 +2198,30 @@ void 
MetaServiceImpl::update_ak_sk(google::protobuf::RpcController* controller,
         std::string cascade_msg;
         if (update_instance_ak_sk(cascade_instance, request, time, 
cascade_code, cascade_msg,
                                   cascade_update_record) != 0) {
-            LOG(WARNING) << "failed to update derived instance, instance_id=" 
<< cascade_id
-                         << " msg=" << cascade_msg;
-            continue;
+            code = cascade_code;
+            msg = fmt::format("failed to update derived instance, 
instance_id={}, msg={}",
+                              cascade_id, cascade_msg);
+            LOG(WARNING) << msg << " code=" << static_cast<int>(code);
+            return;
         }
 
         cascade_val = cascade_instance.SerializeAsString();
         if (cascade_val.empty()) {
-            LOG(WARNING) << "failed to serialize derived instance, 
instance_id=" << cascade_id;
-            continue;
+            code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+            msg = fmt::format("failed to serialize derived instance, 
instance_id={}", cascade_id);
+            LOG(WARNING) << msg;
+            return;
         }
 
         cascade_txn->put(cascade_key, cascade_val);
 
         cascade_err = cascade_txn->commit();
         if (cascade_err != TxnErrorCode::TXN_OK) {
-            LOG(WARNING) << "failed to commit derived instance txn, 
instance_id=" << cascade_id
-                         << " err=" << cascade_err;
-            continue;
+            code = cast_as<ErrCategory::COMMIT>(cascade_err);
+            msg = fmt::format("failed to commit derived instance txn, 
instance_id={}, err={}",
+                              cascade_id, cascade_err);
+            LOG(WARNING) << msg;
+            return;
         }
 
         async_notify_refresh_instance(txn_kv_, cascade_id, true);
diff --git a/cloud/test/resource_test.cpp b/cloud/test/resource_test.cpp
index ee3f8598df8..2c02d9bfccf 100644
--- a/cloud/test/resource_test.cpp
+++ b/cloud/test/resource_test.cpp
@@ -567,6 +567,84 @@ static void verify_instance_aksk(MetaServiceProxy* 
meta_service, const std::stri
     EXPECT_EQ(instance.obj_info(0).sk(), expected_sk);
 }
 
+static void create_instance_with_storage_vault(MetaServiceProxy* meta_service,
+                                               const std::string& instance_id,
+                                               const std::string& 
source_instance_id,
+                                               const std::string& vault_id,
+                                               const std::string& vault_name, 
const std::string& ak,
+                                               const std::string& sk, bool 
enable_snapshot = true) {
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    instance.set_enable_storage_vault(true);
+    instance.add_resource_ids(vault_id);
+    instance.add_storage_vault_names(vault_name);
+    instance.set_default_storage_vault_id(vault_id);
+    instance.set_default_storage_vault_name(vault_name);
+
+    std::optional<Versionstamp> snapshot_version;
+    if (!source_instance_id.empty()) {
+        instance.set_source_instance_id(source_instance_id);
+        snapshot_version = next_test_snapshot_versionstamp();
+        instance.set_source_snapshot_id(snapshot_version->to_string());
+    }
+    instance.set_snapshot_switch_status(enable_snapshot ? SNAPSHOT_SWITCH_ON
+                                                        : 
SNAPSHOT_SWITCH_DISABLED);
+
+    StorageVaultPB vault;
+    vault.set_id(vault_id);
+    vault.set_name(vault_name);
+    auto* obj_info = vault.mutable_obj_info();
+    obj_info->set_id(vault_id);
+    obj_info->set_ak(ak);
+    obj_info->set_sk(sk);
+    obj_info->set_bucket("bucket");
+    obj_info->set_prefix("prefix");
+    obj_info->set_endpoint("endpoint");
+    obj_info->set_external_endpoint("external-endpoint");
+    obj_info->set_region("region");
+    obj_info->set_provider(ObjectStoreInfoPB::S3);
+
+    txn->put(instance_key({instance_id}), instance.SerializeAsString());
+    txn->put(storage_vault_key({instance_id, vault_id}), 
vault.SerializeAsString());
+
+    if (snapshot_version.has_value()) {
+        versioned::SnapshotReferenceKeyInfo ref_key_info {source_instance_id, 
*snapshot_version,
+                                                          instance_id};
+        txn->put(versioned::snapshot_reference_key(ref_key_info), "");
+    }
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+static void verify_storage_vault(MetaServiceProxy* meta_service, const 
std::string& instance_id,
+                                 const std::string& vault_id, const 
std::string& expected_name,
+                                 const std::string& expected_ak, const 
std::string& expected_sk) {
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    std::string val;
+    ASSERT_EQ(txn->get(storage_vault_key({instance_id, vault_id}), &val), 
TxnErrorCode::TXN_OK);
+
+    StorageVaultPB vault;
+    ASSERT_TRUE(vault.ParseFromString(val));
+    ASSERT_TRUE(vault.has_obj_info());
+    EXPECT_EQ(vault.name(), expected_name);
+    EXPECT_EQ(vault.obj_info().ak(), expected_ak);
+    EXPECT_EQ(vault.obj_info().sk(), expected_sk);
+
+    ASSERT_EQ(txn->get(instance_key({instance_id}), &val), 
TxnErrorCode::TXN_OK);
+    InstanceInfoPB instance;
+    ASSERT_TRUE(instance.ParseFromString(val));
+    ASSERT_EQ(instance.resource_ids_size(), 1);
+    ASSERT_EQ(instance.storage_vault_names_size(), 1);
+    EXPECT_EQ(instance.resource_ids(0), vault_id);
+    EXPECT_EQ(instance.storage_vault_names(0), expected_name);
+    EXPECT_EQ(instance.default_storage_vault_id(), vault_id);
+    EXPECT_EQ(instance.default_storage_vault_name(), expected_name);
+}
+
 // Test AK/SK cascade update: two-level cascade
 TEST(AkSkCascadeTest, TwoLevelCascade) {
     auto meta_service = get_meta_service();
@@ -856,6 +934,94 @@ TEST(AkSkCascadeTest, ChildWithoutObjInfo) {
     sp->clear_all_call_backs();
 }
 
+TEST(StorageVaultCascadeTest, AlterS3VaultCascadesToDerivedInstances) {
+    auto meta_service = get_meta_service();
+
+    auto sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+        auto* ret = try_any_cast<int*>(args[0]);
+        *ret = 0;
+        auto* key = try_any_cast<std::string*>(args[1]);
+        *key = "selectdbselectdbselectdbselectdb";
+        auto* key_id = try_any_cast<int64_t*>(args[2]);
+        *key_id = 1;
+    });
+
+    create_instance_with_storage_vault(meta_service.get(), "vault_parent", "", 
"2", "vault_old",
+                                       "old_ak", "old_sk");
+    create_instance_with_storage_vault(meta_service.get(), "vault_child", 
"vault_parent", "2",
+                                       "vault_old", "old_ak", "old_sk");
+
+    AlterObjStoreInfoRequest req;
+    req.set_cloud_unique_id("1:vault_parent:test");
+    req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
+    StorageVaultPB vault;
+    vault.set_name("vault_old");
+    vault.set_alter_name("vault_new");
+    vault.mutable_obj_info()->set_ak("new_ak");
+    vault.mutable_obj_info()->set_sk("new_sk");
+    req.mutable_vault()->CopyFrom(vault);
+
+    brpc::Controller cntl;
+    AlterObjStoreInfoResponse res;
+    meta_service->alter_storage_vault(&cntl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+    std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw==";
+    verify_storage_vault(meta_service.get(), "vault_parent", "2", "vault_new", 
"new_ak", cipher_sk);
+    verify_storage_vault(meta_service.get(), "vault_child", "2", "vault_new", 
"new_ak", cipher_sk);
+
+    sp->disable_processing();
+    sp->clear_all_call_backs();
+}
+
+TEST(StorageVaultCascadeTest, SnapshotDisabledSkipsCascade) {
+    auto meta_service = get_meta_service();
+
+    auto sp = SyncPoint::get_instance();
+    sp->enable_processing();
+    sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {
+        auto* ret = try_any_cast<int*>(args[0]);
+        *ret = 0;
+        auto* key = try_any_cast<std::string*>(args[1]);
+        *key = "selectdbselectdbselectdbselectdb";
+        auto* key_id = try_any_cast<int64_t*>(args[2]);
+        *key_id = 1;
+    });
+
+    create_instance_with_storage_vault(meta_service.get(), 
"vault_parent_disabled", "", "2",
+                                       "vault_old", "old_ak", "old_sk",
+                                       /*enable_snapshot=*/false);
+    create_instance_with_storage_vault(meta_service.get(), 
"vault_child_disabled",
+                                       "vault_parent_disabled", "2", 
"vault_old", "old_ak",
+                                       "old_sk");
+
+    AlterObjStoreInfoRequest req;
+    req.set_cloud_unique_id("1:vault_parent_disabled:test");
+    req.set_op(AlterObjStoreInfoRequest::ALTER_S3_VAULT);
+    StorageVaultPB vault;
+    vault.set_name("vault_old");
+    vault.set_alter_name("vault_new");
+    vault.mutable_obj_info()->set_ak("new_ak");
+    vault.mutable_obj_info()->set_sk("new_sk");
+    req.mutable_vault()->CopyFrom(vault);
+
+    brpc::Controller cntl;
+    AlterObjStoreInfoResponse res;
+    meta_service->alter_storage_vault(&cntl, &req, &res, nullptr);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg();
+
+    std::string cipher_sk = "HNAGUf23voYuuqV2BCX9Tw==";
+    verify_storage_vault(meta_service.get(), "vault_parent_disabled", "2", 
"vault_new", "new_ak",
+                         cipher_sk);
+    verify_storage_vault(meta_service.get(), "vault_child_disabled", "2", 
"vault_old", "old_ak",
+                         "old_sk");
+
+    sp->disable_processing();
+    sp->clear_all_call_backs();
+}
+
 TEST(ResourceTest, RollbackInstance) {
     auto sp = SyncPoint::get_instance();
     sp->set_call_back("encrypt_ak_sk:get_encryption_key", [](auto&& args) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to