This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2893a49b7 refactor(services/azure): migrate azblob azdls azfile to
reqsign v2 (#7226)
2893a49b7 is described below
commit 2893a49b7bd9aaf6d5a4ec7b42998363cb45603b
Author: Xuanwo <[email protected]>
AuthorDate: Thu Feb 26 17:09:55 2026 +0800
refactor(services/azure): migrate azblob azdls azfile to reqsign v2 (#7226)
* refactor(services/azure): migrate azblob azdls azfile to reqsign v2
* fix(services/ghac): migrate azblob signer wiring to reqsign v2
---
core/Cargo.lock | 42 ++++++++++--
core/services/azblob/Cargo.toml | 8 +--
core/services/azblob/src/backend.rs | 84 +++++++++++++++--------
core/services/azblob/src/core.rs | 121 +++++++++++++++-------------------
core/services/azdls/Cargo.toml | 8 +--
core/services/azdls/src/backend.rs | 92 +++++++++++++++++++-------
core/services/azdls/src/core.rs | 74 +++++++++------------
core/services/azfile/Cargo.toml | 8 +--
core/services/azfile/src/backend.rs | 64 +++++++++++++-----
core/services/azfile/src/core.rs | 78 ++++++++++------------
core/services/azure-common/Cargo.toml | 1 -
core/services/azure-common/src/lib.rs | 34 ++++++++--
core/services/ghac/Cargo.toml | 3 +-
core/services/ghac/src/writer.rs | 18 ++---
14 files changed, 383 insertions(+), 252 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index d8d971590..13e66cb98 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -6371,7 +6371,10 @@ dependencies = [
"opendal-service-azure-common",
"pretty_assertions",
"quick-xml",
- "reqsign",
+ "reqsign-azure-storage",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
"serde",
"serde_json",
"sha2",
@@ -6389,7 +6392,10 @@ dependencies = [
"opendal-core",
"opendal-service-azure-common",
"quick-xml",
- "reqsign",
+ "reqsign-azure-storage",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
"serde",
"serde_json",
"tokio",
@@ -6405,7 +6411,10 @@ dependencies = [
"opendal-core",
"opendal-service-azure-common",
"quick-xml",
- "reqsign",
+ "reqsign-azure-storage",
+ "reqsign-core",
+ "reqsign-file-read-tokio",
+ "reqsign-http-send-reqwest",
"serde",
"tokio",
]
@@ -6416,7 +6425,6 @@ version = "0.55.0"
dependencies = [
"http 1.4.0",
"opendal-core",
- "reqsign",
]
[[package]]
@@ -6624,7 +6632,8 @@ dependencies = [
"opendal-core",
"opendal-service-azblob",
"prost 0.13.5",
- "reqsign",
+ "reqsign-azure-storage",
+ "reqsign-core",
"serde",
"serde_json",
"sha2",
@@ -8695,6 +8704,29 @@ dependencies = [
"sha1",
]
+[[package]]
+name = "reqsign-azure-storage"
+version = "2.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "153687178cff1ba9fe13d5f43c7364a3b0cf70751b9ef44ae03d4385c7558085"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "bytes",
+ "form_urlencoded",
+ "http 1.4.0",
+ "jsonwebtoken",
+ "log",
+ "pem",
+ "percent-encoding",
+ "reqsign-core",
+ "rsa",
+ "serde",
+ "serde_json",
+ "sha1",
+]
+
[[package]]
name = "reqsign-core"
version = "2.0.2"
diff --git a/core/services/azblob/Cargo.toml b/core/services/azblob/Cargo.toml
index 87274adf4..a0dffe4d0 100644
--- a/core/services/azblob/Cargo.toml
+++ b/core/services/azblob/Cargo.toml
@@ -38,10 +38,10 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
- "reqwest_request",
- "services-azblob",
-] }
+reqsign-azure-storage = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
diff --git a/core/services/azblob/src/backend.rs
b/core/services/azblob/src/backend.rs
index 9222b5276..69def7c3a 100644
--- a/core/services/azblob/src/backend.rs
+++ b/core/services/azblob/src/backend.rs
@@ -23,9 +23,16 @@ use base64::prelude::BASE64_STANDARD;
use http::Response;
use http::StatusCode;
use log::debug;
-use reqsign::AzureStorageConfig;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::DefaultCredentialProvider;
+use reqsign_azure_storage::RequestSigner;
+use reqsign_azure_storage::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Env as _;
+use reqsign_core::OsEnv;
+use reqsign_core::Signer;
+use reqsign_core::StaticEnv;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
use sha2::Digest;
use sha2::Sha256;
@@ -42,13 +49,14 @@ use super::writer::AzblobWriters;
use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azure_common::{
- AzureStorageService, azure_account_name_from_endpoint,
azure_config_from_connection_string,
+ AzureStorageConfig as AzureConnectionConfig, AzureStorageService,
+ azure_account_name_from_endpoint, azure_config_from_connection_string,
};
const AZBLOB_BATCH_LIMIT: usize = 256;
-impl From<AzureStorageConfig> for AzblobConfig {
- fn from(value: AzureStorageConfig) -> Self {
+impl From<AzureConnectionConfig> for AzblobConfig {
+ fn from(value: AzureConnectionConfig) -> Self {
Self {
endpoint: value.endpoint,
account_name: value.account_name,
@@ -297,23 +305,23 @@ impl Builder for AzblobBuilder {
}?;
debug!("backend use endpoint {}", &container);
- #[cfg(target_arch = "wasm32")]
- let mut config_loader = AzureStorageConfig::default();
- #[cfg(not(target_arch = "wasm32"))]
- let mut config_loader = AzureStorageConfig::default().from_env();
-
- if let Some(v) = self
+ let account_name = self
.config
.account_name
.clone()
- .or_else(|| azure_account_name_from_endpoint(endpoint.as_str()))
- {
- config_loader.account_name = Some(v);
+ .or_else(|| azure_account_name_from_endpoint(endpoint.as_str()));
+
+ let os_env = OsEnv;
+ let mut envs = os_env.vars();
+
+ if let Some(v) = &account_name {
+ envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), v.clone());
+ envs.insert("AZURE_STORAGE_ACCOUNT_NAME".to_string(), v.clone());
}
- if let Some(v) = self.config.account_key.clone() {
+ if let Some(v) = &self.config.account_key {
// Validate that account_key can be decoded as base64
- if let Err(e) = BASE64_STANDARD.decode(&v) {
+ if let Err(e) = BASE64_STANDARD.decode(v) {
return Err(Error::new(
ErrorKind::ConfigInvalid,
format!("invalid account_key: cannot decode as base64:
{e}"),
@@ -322,11 +330,12 @@ impl Builder for AzblobBuilder {
.with_context("service", AZBLOB_SCHEME)
.with_context("key", "account_key"));
}
- config_loader.account_key = Some(v);
+ envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone());
+ envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone());
}
- if let Some(v) = self.config.sas_token.clone() {
- config_loader.sas_token = Some(v);
+ if let Some(v) = &self.config.sas_token {
+ envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone());
}
let encryption_key =
@@ -360,9 +369,34 @@ impl Builder for AzblobBuilder {
}
};
- let cred_loader = AzureStorageLoader::new(config_loader);
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(StaticEnv {
+ home_dir: os_env.home_dir(),
+ envs,
+ });
+
+ let mut credential = DefaultCredentialProvider::new();
+
+ if let (Some(account_name), Some(account_key)) =
+ (account_name.as_deref(), self.config.account_key.as_deref())
+ {
+ credential =
credential.push_front(StaticCredentialProvider::new_shared_key(
+ account_name,
+ account_key,
+ ));
+ }
+
+ if let Some(sas_token) = self.config.sas_token.as_deref() {
+ credential =
credential.push_front(StaticCredentialProvider::new_sas_token(sas_token));
+ }
- let signer = AzureStorageSigner::new();
+ let signer = Signer::new(
+ ctx,
+ credential,
+ RequestSigner::new().with_service_sas_permissions("racwd"),
+ );
Ok(AzblobBackend {
core: Arc::new(AzblobCore {
@@ -422,7 +456,6 @@ impl Builder for AzblobBuilder {
encryption_algorithm,
container: self.config.container.clone(),
- loader: cred_loader,
signer,
}),
})
@@ -551,9 +584,8 @@ impl Access for AzblobBackend {
)),
};
- let mut req = req?;
-
- self.core.sign_query(&mut req).await?;
+ let req = req?;
+ let req = self.core.sign_query(req).await?;
let (parts, _) = req.into_parts();
diff --git a/core/services/azblob/src/core.rs b/core/services/azblob/src/core.rs
index 792e1ee02..67091e5e8 100644
--- a/core/services/azblob/src/core.rs
+++ b/core/services/azblob/src/core.rs
@@ -32,9 +32,8 @@ use http::header::IF_MATCH;
use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
-use reqsign::AzureStorageCredential;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::Credential;
+use reqsign_core::Signer;
use serde::Deserialize;
use serde::Serialize;
use uuid::Uuid;
@@ -69,8 +68,7 @@ pub struct AzblobCore {
pub encryption_key: Option<HeaderValue>,
pub encryption_key_sha256: Option<HeaderValue>,
pub encryption_algorithm: Option<HeaderValue>,
- pub loader: AzureStorageLoader,
- pub signer: AzureStorageSigner,
+ pub signer: Signer<Credential>,
}
impl Debug for AzblobCore {
@@ -84,35 +82,22 @@ impl Debug for AzblobCore {
}
impl AzblobCore {
- async fn load_credential(&self) -> Result<AzureStorageCredential> {
- let cred = self
- .loader
- .load()
- .await
- .map_err(new_request_credential_error)?;
-
- if let Some(cred) = cred {
- Ok(cred)
- } else {
- Err(Error::new(
- ErrorKind::ConfigInvalid,
- "no valid credential found",
- ))
- }
- }
-
- pub async fn sign_query<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = self.load_credential().await?;
+ pub async fn sign_query<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
self.signer
- .sign_query(req, Duration::from_secs(3600), &cred)
- .map_err(new_request_sign_error)
+ .sign(&mut parts, Some(Duration::from_secs(3600)))
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = self.load_credential().await?;
+ pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
+
// Insert x-ms-version header for normal requests.
- req.headers_mut().insert(
+ parts.headers.insert(
HeaderName::from_static(constants::X_MS_VERSION),
// 2022-11-02 is the version supported by Azurite V3 and
// used by Azure Portal, We use this version to make
@@ -121,12 +106,24 @@ impl AzblobCore {
// In the future, we could allow users to configure this value.
HeaderValue::from_static("2022-11-02"),
);
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
- async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = self.load_credential().await?;
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ async fn batch_sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
+
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
#[inline]
@@ -230,9 +227,8 @@ impl AzblobCore {
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
- let mut req = self.azblob_get_blob_request(path, range, args)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_get_blob_request(path, range, args)?;
+ let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
@@ -297,9 +293,8 @@ impl AzblobCore {
args: &OpWrite,
body: Buffer,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_put_blob_request(path, size, args, body)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_put_blob_request(path, size, args, body)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -359,9 +354,8 @@ impl AzblobCore {
path: &str,
args: &OpWrite,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_init_appendable_blob_request(path, args)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_init_appendable_blob_request(path, args)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -407,9 +401,8 @@ impl AzblobCore {
size: u64,
body: Buffer,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_append_blob_request(path, position, size,
body)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_append_blob_request(path, position, size, body)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -462,9 +455,8 @@ impl AzblobCore {
args: &OpWrite,
body: Buffer,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_put_block_request(path, block_id, size,
args, body)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_put_block_request(path, block_id, size, args,
body)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -511,10 +503,8 @@ impl AzblobCore {
block_ids: Vec<Uuid>,
args: &OpWrite,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_complete_put_block_list_request(path,
block_ids, args)?;
-
- self.sign(&mut req).await?;
-
+ let req = self.azblob_complete_put_block_list_request(path, block_ids,
args)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -545,9 +535,8 @@ impl AzblobCore {
path: &str,
args: &OpStat,
) -> Result<Response<Buffer>> {
- let mut req = self.azblob_head_blob_request(path, args)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_head_blob_request(path, args)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -560,9 +549,8 @@ impl AzblobCore {
}
pub async fn azblob_delete_blob(&self, path: &str) ->
Result<Response<Buffer>> {
- let mut req = self.azblob_delete_blob_request(path)?;
-
- self.sign(&mut req).await?;
+ let req = self.azblob_delete_blob_request(path)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -584,12 +572,12 @@ impl AzblobCore {
req = req.header(IF_NONE_MATCH, "*");
}
- let mut req = req
+ let req = req
.extension(Operation::Copy)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -618,12 +606,12 @@ impl AzblobCore {
url = url.push("marker", next_marker);
}
- let mut req = Request::get(url.finish())
+ let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -636,8 +624,8 @@ impl AzblobCore {
let mut multipart = Multipart::new();
for (idx, path) in paths.iter().enumerate() {
- let mut req = self.azblob_delete_blob_request(path)?;
- self.batch_sign(&mut req).await?;
+ let req = self.azblob_delete_blob_request(path)?;
+ let req = self.batch_sign(req).await?;
multipart = multipart.part(
MixedPart::from_request(req).part_header("content-id".parse().unwrap(),
idx.into()),
@@ -645,9 +633,8 @@ impl AzblobCore {
}
let req = Request::post(url);
- let mut req = multipart.apply(req)?;
-
- self.sign(&mut req).await?;
+ let req = multipart.apply(req)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
}
diff --git a/core/services/azdls/Cargo.toml b/core/services/azdls/Cargo.toml
index 820594838..cd7af72a2 100644
--- a/core/services/azdls/Cargo.toml
+++ b/core/services/azdls/Cargo.toml
@@ -37,10 +37,10 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
- "reqwest_request",
- "services-azblob",
-] }
+reqsign-azure-storage = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
diff --git a/core/services/azdls/src/backend.rs
b/core/services/azdls/src/backend.rs
index 7f6120a78..e162cd437 100644
--- a/core/services/azdls/src/backend.rs
+++ b/core/services/azdls/src/backend.rs
@@ -21,9 +21,16 @@ use std::sync::Arc;
use http::Response;
use http::StatusCode;
use log::debug;
-use reqsign::AzureStorageConfig;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::DefaultCredentialProvider;
+use reqsign_azure_storage::RequestSigner;
+use reqsign_azure_storage::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Env as _;
+use reqsign_core::OsEnv;
+use reqsign_core::Signer;
+use reqsign_core::StaticEnv;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
use super::AZDLS_SCHEME;
use super::config::AzdlsConfig;
@@ -37,11 +44,12 @@ use super::writer::AzdlsWriters;
use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azure_common::{
- AzureStorageService, azure_account_name_from_endpoint,
azure_config_from_connection_string,
+ AzureStorageConfig as AzureConnectionConfig, AzureStorageService,
+ azure_account_name_from_endpoint, azure_config_from_connection_string,
};
-impl From<AzureStorageConfig> for AzdlsConfig {
- fn from(config: AzureStorageConfig) -> Self {
+impl From<AzureConnectionConfig> for AzdlsConfig {
+ fn from(config: AzureConnectionConfig) -> Self {
AzdlsConfig {
endpoint: config.endpoint,
account_name: config.account_name,
@@ -252,23 +260,62 @@ impl Builder for AzdlsBuilder {
}?;
debug!("backend use endpoint {}", &endpoint);
- let config_loader = AzureStorageConfig {
- account_name: self
- .config
- .account_name
- .clone()
- .or_else(||
azure_account_name_from_endpoint(endpoint.as_str())),
- account_key: self.config.account_key.clone(),
- sas_token: self.config.sas_token,
- client_id: self.config.client_id.clone(),
- client_secret: self.config.client_secret.clone(),
- tenant_id: self.config.tenant_id.clone(),
- authority_host: self.config.authority_host.clone(),
- ..Default::default()
- };
+ let account_name = self
+ .config
+ .account_name
+ .clone()
+ .or_else(|| azure_account_name_from_endpoint(endpoint.as_str()));
+
+ let mut envs = std::collections::HashMap::new();
+
+ if let Some(v) = &account_name {
+ envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), v.clone());
+ envs.insert("AZURE_STORAGE_ACCOUNT_NAME".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.account_key {
+ envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone());
+ envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.sas_token {
+ envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.client_id {
+ envs.insert("AZURE_CLIENT_ID".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.client_secret {
+ envs.insert("AZURE_CLIENT_SECRET".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.tenant_id {
+ envs.insert("AZURE_TENANT_ID".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.authority_host {
+ envs.insert("AZURE_AUTHORITY_HOST".to_string(), v.clone());
+ }
+
+ let os_env = OsEnv;
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(StaticEnv {
+ home_dir: os_env.home_dir(),
+ envs,
+ });
+
+ let mut credential = DefaultCredentialProvider::new();
+
+ if let (Some(account_name), Some(account_key)) =
+ (account_name.as_deref(), self.config.account_key.as_deref())
+ {
+ credential =
credential.push_front(StaticCredentialProvider::new_shared_key(
+ account_name,
+ account_key,
+ ));
+ }
+ if let Some(sas_token) = self.config.sas_token.as_deref() {
+ credential =
credential.push_front(StaticCredentialProvider::new_sas_token(sas_token));
+ }
- let cred_loader = AzureStorageLoader::new(config_loader);
- let signer = AzureStorageSigner::new();
+ let signer = Signer::new(ctx, credential, RequestSigner::new());
Ok(AzdlsBackend {
core: Arc::new(AzdlsCore {
info: {
@@ -307,7 +354,6 @@ impl Builder for AzdlsBuilder {
root,
endpoint,
enable_hns: self.config.enable_hns,
- loader: cred_loader,
signer,
}),
})
diff --git a/core/services/azdls/src/core.rs b/core/services/azdls/src/core.rs
index cc8d27622..1d6642e2b 100644
--- a/core/services/azdls/src/core.rs
+++ b/core/services/azdls/src/core.rs
@@ -27,9 +27,8 @@ use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_NONE_MATCH;
-use reqsign::AzureStorageCredential;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::Credential;
+use reqsign_core::Signer;
use super::error::parse_error;
use opendal_core::raw::*;
@@ -49,8 +48,7 @@ pub struct AzdlsCore {
pub endpoint: String,
pub enable_hns: bool,
- pub loader: AzureStorageLoader,
- pub signer: AzureStorageSigner,
+ pub signer: Signer<Credential>,
}
impl Debug for AzdlsCore {
@@ -65,27 +63,11 @@ impl Debug for AzdlsCore {
}
impl AzdlsCore {
- async fn load_credential(&self) -> Result<AzureStorageCredential> {
- let cred = self
- .loader
- .load()
- .await
- .map_err(new_request_credential_error)?;
-
- if let Some(cred) = cred {
- Ok(cred)
- } else {
- Err(Error::new(
- ErrorKind::ConfigInvalid,
- "no valid credential found",
- ))
- }
- }
+ pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = self.load_credential().await?;
// Insert x-ms-version header for normal requests.
- req.headers_mut().insert(
+ parts.headers.insert(
HeaderName::from_static(X_MS_VERSION),
// 2022-11-02 is the version supported by Azurite V3 and
// used by Azure Portal, We use this version to make
@@ -94,7 +76,13 @@ impl AzdlsCore {
// In the future, we could allow users to configure this value.
HeaderValue::from_static("2022-11-02"),
);
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
#[inline]
@@ -120,12 +108,12 @@ impl AzdlsCore {
req = req.header(http::header::RANGE, range.to_header());
}
- let mut req = req
+ let req = req
.extension(Operation::Read)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
@@ -176,12 +164,12 @@ impl AzdlsCore {
Operation::Write
};
- let mut req = req
+ let req = req
.extension(operation)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -198,14 +186,14 @@ impl AzdlsCore {
let source_path = format!("/{}/{}", self.filesystem,
percent_encode_path(&source));
- let mut req = Request::put(&url)
+ let req = Request::put(&url)
.header(X_MS_RENAME_SOURCE, source_path)
.header(CONTENT_LENGTH, 0)
.extension(Operation::Rename)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -242,12 +230,12 @@ impl AzdlsCore {
req = req.header(CONTENT_LENGTH, size)
}
- let mut req = req
+ let req = req
.extension(Operation::Write)
.body(body)
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -272,13 +260,13 @@ impl AzdlsCore {
url.push_str("&close=true");
}
- let mut req = Request::patch(&url)
+ let req = Request::patch(&url)
.header(CONTENT_LENGTH, 0)
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -296,12 +284,12 @@ impl AzdlsCore {
let req = Request::head(&url);
- let mut req = req
+ let req = req
.extension(Operation::Stat)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -360,12 +348,12 @@ impl AzdlsCore {
percent_encode_path(&p)
);
- let mut req = Request::delete(&url)
+ let req = Request::delete(&url)
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -394,12 +382,12 @@ impl AzdlsCore {
url = url.push("continuation",
&percent_encode_path(&continuation));
}
- let mut req = Request::delete(url.finish())
+ let req = Request::delete(url.finish())
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
let resp = self.send(req).await?;
let status = resp.status();
@@ -446,12 +434,12 @@ impl AzdlsCore {
url = url.push("continuation", &percent_encode_path(continuation));
}
- let mut req = Request::get(url.finish())
+ let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = self.sign(req).await?;
self.send(req).await
}
diff --git a/core/services/azfile/Cargo.toml b/core/services/azfile/Cargo.toml
index 4741324dc..13e1f3be3 100644
--- a/core/services/azfile/Cargo.toml
+++ b/core/services/azfile/Cargo.toml
@@ -37,10 +37,10 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
opendal-service-azure-common = { path = "../azure-common", version = "0.55.0" }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
-reqsign = { workspace = true, features = [
- "reqwest_request",
- "services-azblob",
-] }
+reqsign-azure-storage = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
+reqsign-file-read-tokio = { version = "2.0.1", default-features = false }
+reqsign-http-send-reqwest = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
diff --git a/core/services/azfile/src/backend.rs
b/core/services/azfile/src/backend.rs
index 47a0394b2..6c8d7b492 100644
--- a/core/services/azfile/src/backend.rs
+++ b/core/services/azfile/src/backend.rs
@@ -21,9 +21,16 @@ use std::sync::Arc;
use http::Response;
use http::StatusCode;
use log::debug;
-use reqsign::AzureStorageConfig;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::DefaultCredentialProvider;
+use reqsign_azure_storage::RequestSigner;
+use reqsign_azure_storage::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Env as _;
+use reqsign_core::OsEnv;
+use reqsign_core::Signer;
+use reqsign_core::StaticEnv;
+use reqsign_file_read_tokio::TokioFileRead;
+use reqsign_http_send_reqwest::ReqwestHttpSend;
use super::AZFILE_SCHEME;
use super::config::AzfileConfig;
@@ -37,11 +44,12 @@ use super::writer::AzfileWriters;
use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azure_common::{
- AzureStorageService, azure_account_name_from_endpoint,
azure_config_from_connection_string,
+ AzureStorageConfig as AzureConnectionConfig, AzureStorageService,
+ azure_account_name_from_endpoint, azure_config_from_connection_string,
};
-impl From<AzureStorageConfig> for AzfileConfig {
- fn from(config: AzureStorageConfig) -> Self {
+impl From<AzureConnectionConfig> for AzfileConfig {
+ fn from(config: AzureConnectionConfig) -> Self {
AzfileConfig {
account_name: config.account_name,
account_key: config.account_key,
@@ -185,15 +193,42 @@ impl Builder for AzfileBuilder {
),
}?;
- let config_loader = AzureStorageConfig {
- account_name: Some(account_name),
- account_key: self.config.account_key.clone(),
- sas_token: self.config.sas_token.clone(),
- ..Default::default()
- };
+ let mut envs = std::collections::HashMap::new();
+ envs.insert("AZBLOB_ACCOUNT_NAME".to_string(), account_name.clone());
+ envs.insert(
+ "AZURE_STORAGE_ACCOUNT_NAME".to_string(),
+ account_name.clone(),
+ );
+
+ if let Some(v) = &self.config.account_key {
+ envs.insert("AZBLOB_ACCOUNT_KEY".to_string(), v.clone());
+ envs.insert("AZURE_STORAGE_ACCOUNT_KEY".to_string(), v.clone());
+ }
+ if let Some(v) = &self.config.sas_token {
+ envs.insert("AZURE_STORAGE_SAS_TOKEN".to_string(), v.clone());
+ }
+
+ let os_env = OsEnv;
+ let ctx = Context::new()
+ .with_file_read(TokioFileRead)
+
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
+ .with_env(StaticEnv {
+ home_dir: os_env.home_dir(),
+ envs,
+ });
+
+ let mut credential = DefaultCredentialProvider::new();
+ if let Some(account_key) = self.config.account_key.as_deref() {
+ credential =
credential.push_front(StaticCredentialProvider::new_shared_key(
+ &account_name,
+ account_key,
+ ));
+ }
+ if let Some(sas_token) = self.config.sas_token.as_deref() {
+ credential =
credential.push_front(StaticCredentialProvider::new_sas_token(sas_token));
+ }
- let cred_loader = AzureStorageLoader::new(config_loader);
- let signer = AzureStorageSigner::new();
+ let signer = Signer::new(ctx, credential, RequestSigner::new());
Ok(AzfileBackend {
core: Arc::new(AzfileCore {
info: {
@@ -223,7 +258,6 @@ impl Builder for AzfileBuilder {
},
root,
endpoint,
- loader: cred_loader,
signer,
share_name: self.config.share_name.clone(),
}),
diff --git a/core/services/azfile/src/core.rs b/core/services/azfile/src/core.rs
index da4257089..22342ed19 100644
--- a/core/services/azfile/src/core.rs
+++ b/core/services/azfile/src/core.rs
@@ -28,9 +28,8 @@ use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::RANGE;
-use reqsign::AzureStorageCredential;
-use reqsign::AzureStorageLoader;
-use reqsign::AzureStorageSigner;
+use reqsign_azure_storage::Credential;
+use reqsign_core::Signer;
use super::error::parse_error;
use opendal_core::raw::*;
@@ -49,8 +48,7 @@ pub struct AzfileCore {
pub root: String,
pub endpoint: String,
pub share_name: String,
- pub loader: AzureStorageLoader,
- pub signer: AzureStorageSigner,
+ pub signer: Signer<Credential>,
}
impl Debug for AzfileCore {
@@ -64,32 +62,22 @@ impl Debug for AzfileCore {
}
impl AzfileCore {
- async fn load_credential(&self) -> Result<AzureStorageCredential> {
- let cred = self
- .loader
- .load()
- .await
- .map_err(new_request_credential_error)?;
-
- if let Some(cred) = cred {
- Ok(cred)
- } else {
- Err(Error::new(
- ErrorKind::ConfigInvalid,
- "no valid credential found",
- ))
- }
- }
+ pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
+ let (mut parts, body) = req.into_parts();
- pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
- let cred = self.load_credential().await?;
// Insert x-ms-version header for normal requests.
- req.headers_mut().insert(
+ parts.headers.insert(
HeaderName::from_static(X_MS_VERSION),
// consistent with azdls and azblob
HeaderValue::from_static("2022-11-02"),
);
- self.signer.sign(req, &cred).map_err(new_request_sign_error)
+
+ self.signer
+ .sign(&mut parts, None)
+ .await
+ .map_err(|e| new_request_sign_error(e.into()))?;
+
+ Ok(Request::from_parts(parts, body))
}
#[inline]
@@ -115,8 +103,8 @@ impl AzfileCore {
let req = req.extension(Operation::Read);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.info.http_client().fetch(req).await
}
@@ -164,8 +152,8 @@ impl AzfileCore {
let req = req.extension(Operation::Write);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -200,8 +188,8 @@ impl AzfileCore {
let req = req.extension(Operation::Write);
- let mut req = req.body(body).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(body).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -218,8 +206,8 @@ impl AzfileCore {
let req = req.extension(Operation::Stat);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -237,8 +225,8 @@ impl AzfileCore {
let req = req.extension(Operation::Stat);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -289,8 +277,8 @@ impl AzfileCore {
let req = req.extension(Operation::Rename);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -312,8 +300,8 @@ impl AzfileCore {
let req = req.extension(Operation::CreateDir);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -333,8 +321,8 @@ impl AzfileCore {
let req = req.extension(Operation::Delete);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -354,8 +342,8 @@ impl AzfileCore {
let req = req.extension(Operation::Delete);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
@@ -393,8 +381,8 @@ impl AzfileCore {
let req = req.extension(Operation::List);
- let mut req =
req.body(Buffer::new()).map_err(new_request_build_error)?;
- self.sign(&mut req).await?;
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+ let req = self.sign(req).await?;
self.send(req).await
}
diff --git a/core/services/azure-common/Cargo.toml
b/core/services/azure-common/Cargo.toml
index 0f989c108..68a668ba3 100644
--- a/core/services/azure-common/Cargo.toml
+++ b/core/services/azure-common/Cargo.toml
@@ -33,4 +33,3 @@ all-features = true
[dependencies]
http = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
-reqsign = { workspace = true }
diff --git a/core/services/azure-common/src/lib.rs
b/core/services/azure-common/src/lib.rs
index e380539df..55e9cc90b 100644
--- a/core/services/azure-common/src/lib.rs
+++ b/core/services/azure-common/src/lib.rs
@@ -28,7 +28,32 @@ use std::collections::HashMap;
use http::Uri;
use http::response::Parts;
use opendal_core::{Error, ErrorKind, Result};
-use reqsign::{AzureStorageConfig, AzureStorageCredential};
+
+/// Configuration parsed from Azure storage connection string.
+#[derive(Debug, Default, Clone, PartialEq, Eq)]
+pub struct AzureStorageConfig {
+ /// Storage account name.
+ pub account_name: Option<String>,
+ /// Storage account shared key.
+ pub account_key: Option<String>,
+ /// Shared access signature token.
+ pub sas_token: Option<String>,
+ /// Service endpoint.
+ pub endpoint: Option<String>,
+ /// OAuth client id.
+ pub client_id: Option<String>,
+ /// OAuth client secret.
+ pub client_secret: Option<String>,
+ /// OAuth tenant id.
+ pub tenant_id: Option<String>,
+ /// OAuth authority host.
+ pub authority_host: Option<String>,
+}
+
+enum AzureStorageCredential {
+ SharedAccessSignature(String),
+ SharedKey(String, String),
+}
/// Parses an [Azure connection string][1] into a configuration object.
///
@@ -229,9 +254,6 @@ fn set_credentials(config: &mut AzureStorageConfig, creds:
AzureStorageCredentia
config.account_name = Some(account_name);
config.account_key = Some(account_key);
}
- AzureStorageCredential::BearerToken(_, _) => {
- // Bearer tokens shouldn't be passed via connection strings.
- }
}
}
@@ -327,12 +349,12 @@ fn censor_sas_uri(uri: &Uri) -> String {
#[cfg(test)]
mod tests {
use http::Uri;
- use reqsign::AzureStorageConfig;
use super::censor_sas_uri;
use super::{
- AzureStorageService, azure_account_name_from_endpoint,
azure_config_from_connection_string,
+ AzureStorageConfig, AzureStorageService,
azure_account_name_from_endpoint,
+ azure_config_from_connection_string,
};
#[test]
diff --git a/core/services/ghac/Cargo.toml b/core/services/ghac/Cargo.toml
index 40a6417ae..a9f003abb 100644
--- a/core/services/ghac/Cargo.toml
+++ b/core/services/ghac/Cargo.toml
@@ -38,7 +38,8 @@ log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features =
false }
opendal-service-azblob = { path = "../azblob", version = "0.55.0",
default-features = false }
prost = { version = "0.13", default-features = false }
-reqsign = { workspace = true }
+reqsign-azure-storage = { version = "2.0.2", default-features = false }
+reqsign-core = { version = "2.0.1", default-features = false }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
diff --git a/core/services/ghac/src/writer.rs b/core/services/ghac/src/writer.rs
index bf9a2e515..d54eb914e 100644
--- a/core/services/ghac/src/writer.rs
+++ b/core/services/ghac/src/writer.rs
@@ -25,6 +25,10 @@ use opendal_core::raw::*;
use opendal_core::*;
use opendal_service_azblob::core::AzblobCore;
use opendal_service_azblob::writer::AzblobWriter;
+use reqsign_azure_storage::RequestSigner;
+use reqsign_azure_storage::StaticCredentialProvider;
+use reqsign_core::Context;
+use reqsign_core::Signer;
pub struct GhacWriter(pub TwoWays<GhacWriterV1, GhacWriterV2>);
@@ -66,6 +70,11 @@ impl GhacWriter {
)
.with_context("url", &url));
};
+ let signer = Signer::new(
+ Context::new(),
+ StaticCredentialProvider::new_sas_token(query),
+ RequestSigner::new(),
+ );
let azure_core = Arc::new(AzblobCore {
info: {
let am = AccessorInfo::default();
@@ -113,14 +122,7 @@ impl GhacWriter {
encryption_key: None,
encryption_key_sha256: None,
encryption_algorithm: None,
- loader: {
- let config = reqsign::AzureStorageConfig {
- sas_token: Some(query.to_string()),
- ..Default::default()
- };
- reqsign::AzureStorageLoader::new(config)
- },
- signer: { reqsign::AzureStorageSigner::new() },
+ signer,
});
let w = AzblobWriter::new(azure_core, OpWrite::default(),
path.to_string());
let writer = oio::BlockWriter::new(core.info.clone(), w, 4);