This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 4dabc4682 refactor(services/obs): migrate obs to reqsign-core v2
(#7169)
4dabc4682 is described below
commit 4dabc4682f2e81d10975090860452b0fdd94e97e
Author: Qinxuan Chen <[email protected]>
AuthorDate: Mon Feb 23 13:55:18 2026 +0800
refactor(services/obs): migrate obs to reqsign-core v2 (#7169)
---
core/Cargo.lock | 24 +++++++++--
core/services/obs/Cargo.toml | 8 ++--
core/services/obs/src/backend.rs | 42 ++++++++++--------
core/services/obs/src/core.rs | 93 ++++++++++++++++------------------------
core/services/obs/src/writer.rs | 23 +++++-----
5 files changed, 98 insertions(+), 92 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index eda8e61f5..d8d971590 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6838,7 +6838,10 @@ dependencies = [
"log",
"opendal-core",
"quick-xml",
- "reqsign",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
+ "reqsign-huaweicloud-obs",
"serde",
"tokio",
]
@@ -8643,7 +8646,6 @@ dependencies = [
"http 1.4.0",
"jsonwebtoken",
"log",
- "once_cell",
"percent-encoding",
"rand 0.8.5",
"reqwest",
@@ -8717,9 +8719,9 @@ dependencies = [
[[package]]
name = "reqsign-file-read-tokio"
-version = "2.0.1"
+version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "669ea66036266a9ac371d2e63cc7d345e69994da0168b4e6f3487fe21e126f76"
+checksum = "702f12a867bf8e507de907fa0f4d75b96469ace7edd33fcc1fc8a8ef58f3c8d2"
dependencies = [
"anyhow",
"async-trait",
@@ -8744,6 +8746,20 @@ dependencies = [
"wasm-bindgen-futures",
]
+[[package]]
+name = "reqsign-huaweicloud-obs"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5ef663039ba605fb73ca2837215ed082fc264fd815395c72fa93b31c46a32081"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "http 1.4.0",
+ "log",
+ "percent-encoding",
+ "reqsign-core",
+]
+
[[package]]
name = "reqwest"
version = "0.12.24"
diff --git a/core/services/obs/Cargo.toml b/core/services/obs/Cargo.toml
index 3bd2582c0..3d828df60 100644
--- a/core/services/obs/Cargo.toml
+++ b/core/services/obs/Cargo.toml
@@ -36,10 +36,10 @@ http = { workspace = true }
log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
- "services-huaweicloud",
- "reqwest_request",
-] }
+reqsign-core = "2.0.2"
+reqsign-file-read-tokio = "2.0.2"
+reqsign-http-send-reqwest = "2.0.1"
+reqsign-huaweicloud-obs = "2.0.2"
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
diff --git a/core/services/obs/src/backend.rs b/core/services/obs/src/backend.rs
index a4aef9b4f..42bcbe2ba 100644
--- a/core/services/obs/src/backend.rs
+++ b/core/services/obs/src/backend.rs
@@ -23,9 +23,17 @@ use http::Response;
use http::StatusCode;
use http::Uri;
use log::debug;
-use reqsign::HuaweicloudObsConfig;
-use reqsign::HuaweicloudObsCredentialLoader;
-use reqsign::HuaweicloudObsSigner;
+use opendal_core::raw::*;
+use opendal_core::*;
+use reqsign_core::Context;
+use reqsign_core::OsEnv;
+use reqsign_core::ProvideCredentialChain;
+use reqsign_core::Signer;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
+use reqsign_huaweicloud_obs::EnvCredentialProvider;
+use reqsign_huaweicloud_obs::RequestSigner;
+use reqsign_huaweicloud_obs::StaticCredentialProvider;
use super::OBS_SCHEME;
use super::config::ObsConfig;
@@ -36,8 +44,6 @@ use super::error::parse_error;
use super::lister::ObsLister;
use super::writer::ObsWriter;
use super::writer::ObsWriters;
-use opendal_core::raw::*;
-use opendal_core::*;
/// Huawei-Cloud Object Storage Service (OBS) support
#[doc = include_str!("docs.md")]
@@ -169,20 +175,18 @@ impl Builder for ObsBuilder {
};
debug!("backend use endpoint {}", &endpoint);
- let mut cfg = HuaweicloudObsConfig::default();
- // Load cfg from env first.
- cfg = cfg.from_env();
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(OsEnv);
- if let Some(v) = self.config.access_key_id {
- cfg.access_key_id = Some(v);
- }
+ let mut provider =
ProvideCredentialChain::new().push(EnvCredentialProvider::new());
- if let Some(v) = self.config.secret_access_key {
- cfg.secret_access_key = Some(v);
+ if let (Some(ak), Some(sk)) = (&self.config.access_key_id,
&self.config.secret_access_key) {
+ let static_provider = StaticCredentialProvider::new(ak, sk);
+ provider = provider.push_front(static_provider);
}
- let loader = HuaweicloudObsCredentialLoader::new(cfg);
-
// Set the bucket name in CanonicalizedResource.
// 1. If the bucket is bound to a user domain name, use the user
domain name as the bucket name,
// for example, `/obs.ccc.com/object`. `obs.ccc.com` is the user
domain name bound to the bucket.
@@ -190,7 +194,8 @@ impl Builder for ObsBuilder {
//
// Please refer to this doc for more details:
// https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0010.html
- let signer = HuaweicloudObsSigner::new(if is_obs_default { &bucket }
else { &endpoint });
+ let request_signer = RequestSigner::new(if is_obs_default { &bucket }
else { &endpoint });
+ let signer = Signer::new(ctx, provider, request_signer);
debug!("backend build finished");
Ok(ObsBackend {
@@ -252,7 +257,6 @@ impl Builder for ObsBuilder {
root,
endpoint: format!("{}://{}", &scheme, &endpoint),
signer,
- loader,
}),
})
}
@@ -390,8 +394,8 @@ impl Access for ObsBackend {
"operation is not supported",
)),
};
- let mut req = req?;
- self.core.sign_query(&mut req, args.expire()).await?;
+ let req = req?;
+ let req = self.core.sign_query(req, args.expire()).await?;
// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
diff --git a/core/services/obs/src/core.rs b/core/services/obs/src/core.rs
index 2b754ff70..722ef0054 100644
--- a/core/services/obs/src/core.rs
+++ b/core/services/obs/src/core.rs
@@ -27,14 +27,12 @@ use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
-use reqsign::HuaweicloudObsCredential;
-use reqsign::HuaweicloudObsCredentialLoader;
-use reqsign::HuaweicloudObsSigner;
-use serde::Deserialize;
-use serde::Serialize;
-
use opendal_core::raw::*;
use opendal_core::*;
+use reqsign_core::Signer;
+use reqsign_huaweicloud_obs::Credential;
+use serde::Deserialize;
+use serde::Serialize;
pub mod constants {
pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
@@ -47,8 +45,7 @@ pub struct ObsCore {
pub root: String,
pub endpoint: String,
- pub signer: HuaweicloudObsSigner,
- pub loader: HuaweicloudObsCredentialLoader,
+ pub signer: Signer<Credential>,
}
impl Debug for ObsCore {
@@ -62,40 +59,26 @@ impl Debug for ObsCore {
}
impl ObsCore {
- async fn load_credential(&self) ->
Result<Option<HuaweicloudObsCredential>> {
- let cred = self
- .loader
- .load()
- .await
- .map_err(new_request_credential_error)?;
-
- if let Some(cred) = cred {
- Ok(Some(cred))
- } else {
- Ok(None)
- }
- }
+ pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ Ok(Request::from_parts(parts, body))
}
- pub async fn sign_query<T>(&self, req: &mut Request<T>, duration:
Duration) -> Result<()> {
- let cred = if let Some(cred) = self.load_credential().await? {
- cred
- } else {
- return Ok(());
- };
+ pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) ->
Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
self.signer
- .sign_query(req, duration, &cred)
- .map_err(new_request_sign_error)
+ .sign(&mut parts, Some(duration))
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
#[inline]
@@ -111,9 +94,9 @@ impl ObsCore {
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
- let mut req = self.obs_get_object_request(path, range, args)?;
+ let req = self.obs_get_object_request(path, range, args)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
@@ -190,9 +173,9 @@ impl ObsCore {
}
pub async fn obs_head_object(&self, path: &str, args: &OpStat) ->
Result<Response<Buffer>> {
- let mut req = self.obs_head_object_request(path, args)?;
+ let req = self.obs_head_object_request(path, args)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -230,12 +213,12 @@ impl ObsCore {
let req = Request::delete(&url);
- let mut req = req
+ let req = req
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -287,13 +270,13 @@ impl ObsCore {
let source = format!("/{}/{}", self.bucket,
percent_encode_path(&source));
let url = format!("{}/{}", self.endpoint,
percent_encode_path(&target));
- let mut req = Request::put(&url)
+ let req = Request::put(&url)
.extension(Operation::Copy)
.header("x-obs-copy-source", &source)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -321,12 +304,12 @@ impl ObsCore {
url = url.push("marker", next_marker);
}
- let mut req = Request::get(url.finish())
+ let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -344,12 +327,12 @@ impl ObsCore {
req = req.header(CONTENT_TYPE, mime)
}
- let mut req = req
+ let req = req
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -377,13 +360,13 @@ impl ObsCore {
req = req.header(CONTENT_LENGTH, size);
}
- let mut req = req
+ let req = req
.extension(Operation::Write)
// Set body
.body(body)
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -413,12 +396,12 @@ impl ObsCore {
// Set content-type to `application/xml` to avoid mixed with form post.
let req = req.header(CONTENT_TYPE, "application/xml");
- let mut req = req
+ let req = req
.extension(Operation::Write)
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -437,12 +420,12 @@ impl ObsCore {
percent_encode_path(upload_id)
);
- let mut req = Request::delete(&url)
+ let req = Request::delete(&url)
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
}
@@ -500,7 +483,7 @@ pub struct CompleteMultipartUploadRequestPart {
/// Output of `CompleteMultipartUpload` operation
#[derive(Debug, Default, Deserialize)]
-#[serde[default, rename_all = "PascalCase"]]
+#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadResult {
pub location: String,
pub bucket: String,
diff --git a/core/services/obs/src/writer.rs b/core/services/obs/src/writer.rs
index 22657d2a4..983e76b68 100644
--- a/core/services/obs/src/writer.rs
+++ b/core/services/obs/src/writer.rs
@@ -21,12 +21,11 @@ use bytes::Buf;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
+use opendal_core::raw::*;
+use opendal_core::*;
use super::core::*;
use super::error::parse_error;
-use opendal_core::raw::oio::MultipartPart;
-use opendal_core::raw::*;
-use opendal_core::*;
pub type ObsWriters = TwoWays<oio::MultipartWriter<ObsWriter>,
oio::AppendWriter<ObsWriter>>;
@@ -64,11 +63,11 @@ impl ObsWriter {
impl oio::MultipartWrite for ObsWriter {
async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
- let mut req = self
+ let req = self
.core
.obs_put_object_request(&self.path, Some(size), &self.op, body)?;
- self.core.sign(&mut req).await?;
+ let req = self.core.sign(req).await?;
let resp = self.core.send(req).await?;
@@ -110,7 +109,7 @@ impl oio::MultipartWrite for ObsWriter {
part_number: usize,
size: u64,
body: Buffer,
- ) -> Result<MultipartPart> {
+ ) -> Result<oio::MultipartPart> {
// Obs service requires part number must between [1..=10000]
let part_number = part_number + 1;
@@ -132,7 +131,7 @@ impl oio::MultipartWrite for ObsWriter {
})?
.to_string();
- Ok(MultipartPart {
+ Ok(oio::MultipartPart {
part_number,
etag,
checksum: None,
@@ -143,7 +142,11 @@ impl oio::MultipartWrite for ObsWriter {
}
}
- async fn complete_part(&self, upload_id: &str, parts: &[MultipartPart]) ->
Result<Metadata> {
+ async fn complete_part(
+ &self,
+ upload_id: &str,
+ parts: &[oio::MultipartPart],
+ ) -> Result<Metadata> {
let parts = parts
.iter()
.map(|p| CompleteMultipartUploadRequestPart {
@@ -210,11 +213,11 @@ impl oio::AppendWrite for ObsWriter {
}
async fn append(&self, offset: u64, size: u64, body: Buffer) ->
Result<Metadata> {
- let mut req = self
+ let req = self
.core
.obs_append_object_request(&self.path, offset, size, &self.op,
body)?;
- self.core.sign(&mut req).await?;
+ let req = self.core.sign(req).await?;
let resp = self.core.send(req).await?;