This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 098d8f594b2 [fix](recycler) Fix missing provider in S3Conf (#37468)
098d8f594b2 is described below
commit 098d8f594b2a62fe28b4b1bbfb4e36d498d4abaf
Author: plat1ko <[email protected]>
AuthorDate: Tue Jul 9 00:31:18 2024 +0800
[fix](recycler) Fix missing provider in S3Conf (#37468)
Fix missing provider in S3Conf, which is used to determine which obj
client to use.
---
cloud/src/recycler/recycler.cpp | 110 +++++++++++++------------------------
cloud/src/recycler/s3_accessor.cpp | 31 ++++++-----
cloud/src/recycler/s3_accessor.h | 3 +-
3 files changed, 59 insertions(+), 85 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 7d3aa5ad794..5887b3759a0 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2233,49 +2233,35 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
}
#else
// init s3 accessor and add to accessor map
- bool found = false;
- ObjectStoreInfoPB object_store_info;
- StagePB::StageAccessType stage_access_type = StagePB::AKSK;
- for (auto& s : instance_info_.stages()) {
- if (s.stage_id() == stage_id) {
- object_store_info = s.obj_info();
- if (s.has_access_type()) {
- stage_access_type = s.access_type();
- }
- found = true;
- break;
- }
- }
- if (!found) {
+ auto stage_it =
+ std::find_if(instance_info_.stages().begin(),
instance_info_.stages().end(),
+ [&stage_id](auto&& stage) { return stage.stage_id()
== stage_id; });
+
+ if (stage_it == instance_info_.stages().end()) {
LOG(INFO) << "Recycle nonexisted stage copy jobs. instance_id=" <<
instance_id_
<< ", stage_id=" << stage_id << ", stage_type=" <<
stage_type;
return 1;
}
+
+ const auto& object_store_info = stage_it->obj_info();
+ auto stage_access_type = stage_it->has_access_type() ?
stage_it->access_type() : StagePB::AKSK;
+
S3Conf s3_conf;
if (stage_type == StagePB::EXTERNAL) {
- s3_conf.endpoint = object_store_info.endpoint();
- s3_conf.region = object_store_info.region();
- s3_conf.bucket = object_store_info.bucket();
- s3_conf.prefix = object_store_info.prefix();
if (stage_access_type == StagePB::AKSK) {
- s3_conf.ak = object_store_info.ak();
- s3_conf.sk = object_store_info.sk();
- if (object_store_info.has_encryption_info()) {
- AkSkPair plain_ak_sk_pair;
- int ret = decrypt_ak_sk_helper(object_store_info.ak(),
object_store_info.sk(),
-
object_store_info.encryption_info(),
- &plain_ak_sk_pair);
- if (ret != 0) {
- LOG(WARNING) << "fail to decrypt ak sk. instance_id: " <<
instance_id_
- << " obj_info: " <<
proto_to_json(object_store_info);
- return -1;
- }
- s3_conf.ak = std::move(plain_ak_sk_pair.first);
- s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ auto conf = S3Conf::from_obj_store_info(object_store_info);
+ if (!conf) {
+ return -1;
}
+
+ s3_conf = std::move(*conf);
} else if (stage_access_type == StagePB::BUCKET_ACL) {
- s3_conf.ak = instance_info_.ram_user().ak();
- s3_conf.sk = instance_info_.ram_user().sk();
+ auto conf = S3Conf::from_obj_store_info(object_store_info, true /*
skip_aksk */);
+ if (!conf) {
+ return -1;
+ }
+
+ s3_conf = std::move(*conf);
if (instance_info_.ram_user().has_encryption_info()) {
AkSkPair plain_ak_sk_pair;
int ret = decrypt_ak_sk_helper(
@@ -2288,6 +2274,9 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
}
s3_conf.ak = std::move(plain_ak_sk_pair.first);
s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ } else {
+ s3_conf.ak = instance_info_.ram_user().ak();
+ s3_conf.sk = instance_info_.ram_user().sk();
}
} else {
LOG(INFO) << "Unsupported stage access type=" << stage_access_type
@@ -2300,24 +2289,14 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
LOG(WARNING) << "invalid idx: " << idx;
return -1;
}
- auto& old_obj = instance_info_.obj_info()[idx - 1];
- s3_conf.ak = old_obj.ak();
- s3_conf.sk = old_obj.sk();
- if (old_obj.has_encryption_info()) {
- AkSkPair plain_ak_sk_pair;
- int ret = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(),
old_obj.encryption_info(),
- &plain_ak_sk_pair);
- if (ret != 0) {
- LOG(WARNING) << "fail to decrypt ak sk. instance_id: " <<
instance_id_
- << " obj_info: " << proto_to_json(old_obj);
- return -1;
- }
- s3_conf.ak = std::move(plain_ak_sk_pair.first);
- s3_conf.sk = std::move(plain_ak_sk_pair.second);
+
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto conf = S3Conf::from_obj_store_info(old_obj);
+ if (!conf) {
+ return -1;
}
- s3_conf.endpoint = old_obj.endpoint();
- s3_conf.region = old_obj.region();
- s3_conf.bucket = old_obj.bucket();
+
+ s3_conf = std::move(*conf);
s3_conf.prefix = object_store_info.prefix();
} else {
LOG(WARNING) << "unknown stage type " << stage_type;
@@ -2468,28 +2447,17 @@ int InstanceRecycler::recycle_expired_stage_objects() {
LOG(WARNING) << "invalid idx: " << idx << ", id: " <<
stage.obj_info().id();
continue;
}
- auto& old_obj = instance_info_.obj_info()[idx - 1];
- S3Conf s3_conf;
- s3_conf.ak = old_obj.ak();
- s3_conf.sk = old_obj.sk();
- if (old_obj.has_encryption_info()) {
- AkSkPair plain_ak_sk_pair;
- int ret1 = decrypt_ak_sk_helper(old_obj.ak(), old_obj.sk(),
old_obj.encryption_info(),
- &plain_ak_sk_pair);
- if (ret1 != 0) {
- LOG(WARNING) << "fail to decrypt ak sk "
- << "obj_info:" << proto_to_json(old_obj);
- } else {
- s3_conf.ak = std::move(plain_ak_sk_pair.first);
- s3_conf.sk = std::move(plain_ak_sk_pair.second);
- }
+
+ const auto& old_obj = instance_info_.obj_info()[idx - 1];
+ auto s3_conf = S3Conf::from_obj_store_info(old_obj);
+ if (!s3_conf) {
+ LOG(WARNING) << "failed to init accessor";
+ continue;
}
- s3_conf.endpoint = old_obj.endpoint();
- s3_conf.region = old_obj.region();
- s3_conf.bucket = old_obj.bucket();
- s3_conf.prefix = stage.obj_info().prefix();
+
+ s3_conf->prefix = stage.obj_info().prefix();
std::shared_ptr<S3Accessor> accessor;
- int ret1 = S3Accessor::create(std::move(s3_conf), &accessor);
+ int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret1 != 0) {
LOG(WARNING) << "failed to init s3 accessor ret=" << ret1;
ret = -1;
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 5bf37a52199..1f43f6c6b0e 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -153,7 +153,8 @@ private:
size_t prefix_length_;
};
-std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB&
obj_info) {
+std::optional<S3Conf> S3Conf::from_obj_store_info(const ObjectStoreInfoPB&
obj_info,
+ bool skip_aksk) {
S3Conf s3_conf;
switch (obj_info.provider()) {
@@ -175,20 +176,22 @@ std::optional<S3Conf> S3Conf::from_obj_store_info(const
ObjectStoreInfoPB& obj_i
return std::nullopt;
}
- if (obj_info.has_encryption_info()) {
- AkSkPair plain_ak_sk_pair;
- int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(),
obj_info.encryption_info(),
- &plain_ak_sk_pair);
- if (ret != 0) {
- LOG_WARNING("fail to decrypt ak sk").tag("obj_info",
proto_to_json(obj_info));
- return std::nullopt;
+ if (!skip_aksk) {
+ if (obj_info.has_encryption_info()) {
+ AkSkPair plain_ak_sk_pair;
+ int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(),
obj_info.encryption_info(),
+ &plain_ak_sk_pair);
+ if (ret != 0) {
+ LOG_WARNING("fail to decrypt ak sk").tag("obj_info",
proto_to_json(obj_info));
+ return std::nullopt;
+ } else {
+ s3_conf.ak = std::move(plain_ak_sk_pair.first);
+ s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ }
} else {
- s3_conf.ak = std::move(plain_ak_sk_pair.first);
- s3_conf.sk = std::move(plain_ak_sk_pair.second);
+ s3_conf.ak = obj_info.ak();
+ s3_conf.sk = obj_info.sk();
}
- } else {
- s3_conf.ak = obj_info.ak();
- s3_conf.sk = obj_info.sk();
}
s3_conf.endpoint = obj_info.endpoint();
@@ -234,6 +237,8 @@ int S3Accessor::init() {
conf_.ak, conf_.bucket);
auto container_client =
std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri_, cred);
+ // uri format for debug:
${scheme}://${ak}.blob.core.windows.net/${bucket}/${prefix}
+ uri_ = uri_ + '/' + conf_.prefix;
obj_client_ =
std::make_shared<AzureObjClient>(std::move(container_client));
return 0;
}
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 5d50e4abab9..8e9b53b4392 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -50,7 +50,8 @@ struct S3Conf {
Provider provider;
- static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB&
obj_info);
+ static std::optional<S3Conf> from_obj_store_info(const ObjectStoreInfoPB&
obj_info,
+ bool skip_aksk = false);
};
class S3Accessor : public StorageVaultAccessor {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]