This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new aefe7675 refactor(services/azblob): Adopt new reqsign (#1902)
aefe7675 is described below
commit aefe7675c9e6d3768bcd013e8f4bdf9633db1001
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 13:31:21 2023 +0800
refactor(services/azblob): Adopt new reqsign (#1902)
* refactor: Change presign to async for future refactor
Signed-off-by: Xuanwo <[email protected]>
* Fix unit test
Signed-off-by: Xuanwo <[email protected]>
* refactor(services/azblob): Adopt new reqsign
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
Cargo.lock | 34 +++-
core/Cargo.toml | 2 +
core/src/raw/http_util/error.rs | 10 +
core/src/raw/http_util/mod.rs | 1 +
core/src/services/azblob/backend.rs | 358 +++++-------------------------------
core/src/services/azblob/core.rs | 317 +++++++++++++++++++++++++++++++
core/src/services/azblob/mod.rs | 1 +
core/src/services/azblob/pager.rs | 18 +-
core/src/services/azblob/writer.rs | 19 +-
9 files changed, 431 insertions(+), 329 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index c57103ae..b4c0a267 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2251,7 +2251,8 @@ dependencies = [
"quick-xml 0.27.1",
"rand 0.8.5",
"redis",
- "reqsign",
+ "reqsign 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "reqsign 0.8.5
(git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd)",
"reqwest",
"rocksdb",
"serde",
@@ -3161,6 +3162,37 @@ dependencies = [
"ureq",
]
+[[package]]
+name = "reqsign"
+version = "0.8.5"
+source =
"git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd#c1e44223a984a612b63c80ee8092f0c089ff62bd"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.21.0",
+ "bytes",
+ "chrono",
+ "dirs 5.0.0",
+ "form_urlencoded",
+ "hex",
+ "hmac",
+ "http",
+ "jsonwebtoken",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.28.1",
+ "rand 0.8.5",
+ "reqwest",
+ "rsa",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+ "tokio",
+]
+
[[package]]
name = "reqwest"
version = "0.11.15"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 48d5fffa..de7dcbae 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -118,7 +118,9 @@ redis = { version = "0.22", features = [
"tokio-comp",
"connection-manager",
], optional = true }
+# NOTE: we keep this for service migration one by one. And finally we will
replace reqsign by v0.9.
reqsign = "0.8.5"
+reqsign_0_9 = { package = "reqsign", git =
"https://github.com/Xuanwo/reqsign", rev =
"c1e44223a984a612b63c80ee8092f0c089ff62bd" }
reqwest = { version = "0.11.13", features = [
"multipart",
"stream",
diff --git a/core/src/raw/http_util/error.rs b/core/src/raw/http_util/error.rs
index 65c338fc..72543768 100644
--- a/core/src/raw/http_util/error.rs
+++ b/core/src/raw/http_util/error.rs
@@ -94,6 +94,16 @@ pub fn new_request_build_error(err: http::Error) -> Error {
.set_source(err)
}
+/// Create a new error happened during signing request.
+pub fn new_request_credential_error(err: anyhow::Error) -> Error {
+ Error::new(
+ ErrorKind::Unexpected,
+ "loading credentail to sign http request",
+ )
+ .with_operation("reqsign::LoadCredential")
+ .set_source(err)
+}
+
/// Create a new error happened during signing request.
pub fn new_request_sign_error(err: anyhow::Error) -> Error {
Error::new(ErrorKind::Unexpected, "signing http request")
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 9cb60a04..ed5acaca 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -48,6 +48,7 @@ pub use uri::percent_encode_path;
mod error;
pub use error::new_request_build_error;
+pub use error::new_request_credential_error;
pub use error::new_request_sign_error;
pub use error::parse_error_response;
pub use error::ErrorResponse;
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index 20b2d596..5bde2591 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -18,34 +18,26 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
-use std::fmt::Write;
-use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
-use http::header::HeaderName;
-use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
use http::StatusCode;
-use http::Uri;
use log::debug;
-use reqsign::AzureStorageSigner;
+use reqsign_0_9::AzureStorageConfig;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
use super::batch::parse_batch_delete_response;
-use super::batch::BatchDeleteRequestBuilder;
use super::error::parse_error;
use super::pager::AzblobPager;
use super::writer::AzblobWriter;
use crate::ops::*;
use crate::raw::*;
+use crate::services::azblob::core::AzblobCore;
use crate::types::Metadata;
use crate::*;
-const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
-const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
-
/// Known endpoint suffix Azure Storage Blob services resource URI syntax.
/// Azure public cloud: https://accountname.blob.core.windows.net
/// Azure US Government: https://accountname.blob.core.usgovcloudapi.net
@@ -385,64 +377,43 @@ impl Builder for AzblobBuilder {
})?
};
- let mut signer_builder = AzureStorageSigner::builder();
- let mut account_name: Option<String> = None;
- if let Some(sas_token) = &self.sas_token {
- signer_builder.security_token(sas_token);
- match &self.account_name {
- Some(name) => account_name = Some(name.clone()),
- None => {
- account_name =
infer_storage_name_from_endpoint(endpoint.as_str());
- }
- }
- } else if let (Some(name), Some(key)) = (&self.account_name,
&self.account_key) {
- account_name = Some(name.clone());
- signer_builder.account_name(name).account_key(key);
- } else if let Some(key) = &self.account_key {
- account_name = infer_storage_name_from_endpoint(endpoint.as_str());
- signer_builder
- .account_name(account_name.as_ref().unwrap_or(&String::new()))
- .account_key(key);
- }
+ let config_loader = AzureStorageConfig {
+ account_name: self
+ .account_name
+ .clone()
+ .or_else(||
infer_storage_name_from_endpoint(endpoint.as_str())),
+ account_key: self.account_key.clone(),
+ sas_token: self.sas_token.clone(),
+ };
- let signer = signer_builder.clone().build().map_err(|e| {
- Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
- .with_operation("Builder::build")
- .with_context("service", Scheme::Azblob)
- .with_context("endpoint", &endpoint)
- .with_context("container", container.as_str())
- .set_source(e)
- })?;
- signer_builder.omit_service_version();
- let sub_req_signer = signer_builder.build().map_err(|e| {
- Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
- .with_operation("Builder::build")
- .with_context("service", Scheme::Azblob)
- .with_context("endpoint", &endpoint)
- .with_context("container", container.as_str())
- .set_source(e)
- })?;
+ let cred_loader = AzureStorageLoader::new(config_loader);
+
+ let signer = AzureStorageSigner::new();
+ let batch_signer = AzureStorageSigner::new().omit_service_version();
debug!("backend build finished: {:?}", &self);
Ok(AzblobBackend {
- root,
- endpoint,
- signer: Arc::new(signer),
- batch_signer: Arc::new(sub_req_signer),
- container: self.container.clone(),
- client,
- _account_name: account_name.unwrap_or_default(),
+ core: Arc::new(AzblobCore {
+ root,
+ endpoint,
+ container: self.container.clone(),
+
+ client,
+ loader: cred_loader,
+ signer,
+ batch_signer,
+ }),
})
}
}
fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
- let _endpoint: &str = endpoint
+ let endpoint: &str = endpoint
.strip_prefix("http://")
.or_else(|| endpoint.strip_prefix("https://"))
.unwrap_or(endpoint);
- let mut parts = _endpoint.splitn(2, '.');
+ let mut parts = endpoint.splitn(2, '.');
let storage_name = parts.next();
let endpoint_suffix = parts
.next()
@@ -463,14 +434,7 @@ fn infer_storage_name_from_endpoint(endpoint: &str) ->
Option<String> {
/// Backend for azblob services.
#[derive(Debug, Clone)]
pub struct AzblobBackend {
- container: String,
- // TODO: remove pub after
https://github.com/apache/incubator-opendal/issues/1427
- pub client: HttpClient,
- root: String, // root will be "/" or /abc/
- endpoint: String,
- pub signer: Arc<AzureStorageSigner>,
- pub batch_signer: Arc<AzureStorageSigner>,
- _account_name: String,
+ core: Arc<AzblobCore>,
}
#[async_trait]
@@ -488,8 +452,8 @@ impl Accessor for AzblobBackend {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Azblob)
- .set_root(&self.root)
- .set_name(&self.container)
+ .set_root(&self.core.root)
+ .set_name(&self.core.container)
.set_max_batch_operations(AZBLOB_BATCH_LIMIT)
.set_capabilities(Read | Write | List | Scan | Batch | Copy)
.set_hints(ReadStreamable);
@@ -498,11 +462,13 @@ impl Accessor for AzblobBackend {
}
async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
- let mut req = self.azblob_put_blob_request(path, Some(0), None,
AsyncBody::Empty)?;
+ let mut req = self
+ .core
+ .azblob_put_blob_request(path, Some(0), None, AsyncBody::Empty)?;
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();
@@ -516,7 +482,7 @@ impl Accessor for AzblobBackend {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.azblob_get_blob(path, args.range()).await?;
+ let resp = self.core.azblob_get_blob(path, args.range()).await?;
let status = resp.status();
@@ -540,12 +506,12 @@ impl Accessor for AzblobBackend {
Ok((
RpWrite::default(),
- AzblobWriter::new(self.clone(), args, path.to_string()),
+ AzblobWriter::new(self.core.clone(), args, path.to_string()),
))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
- let resp = self.azblob_copy_blob(from, to).await?;
+ let resp = self.core.azblob_copy_blob(from, to).await?;
let status = resp.status();
@@ -564,7 +530,7 @@ impl Accessor for AzblobBackend {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}
- let resp = self.azblob_get_blob_properties(path).await?;
+ let resp = self.core.azblob_get_blob_properties(path).await?;
let status = resp.status();
@@ -578,7 +544,7 @@ impl Accessor for AzblobBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.azblob_delete_blob(path).await?;
+ let resp = self.core.azblob_delete_blob(path).await?;
let status = resp.status();
@@ -590,8 +556,7 @@ impl Accessor for AzblobBackend {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
let op = AzblobPager::new(
- Arc::new(self.clone()),
- self.root.clone(),
+ self.core.clone(),
path.to_string(),
"/".to_string(),
args.limit(),
@@ -602,8 +567,7 @@ impl Accessor for AzblobBackend {
async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
let op = AzblobPager::new(
- Arc::new(self.clone()),
- self.root.clone(),
+ self.core.clone(),
path.to_string(),
"".to_string(),
args.limit(),
@@ -622,7 +586,7 @@ impl Accessor for AzblobBackend {
));
}
// construct and complete batch request
- let resp = self.azblob_batch_delete(&paths).await?;
+ let resp = self.core.azblob_batch_delete(&paths).await?;
// check response status
if resp.status() != StatusCode::ACCEPTED {
@@ -669,222 +633,6 @@ impl Accessor for AzblobBackend {
}
}
-impl AzblobBackend {
- async fn azblob_get_blob(
- &self,
- path: &str,
- range: BytesRange,
- ) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&p)
- );
-
- let mut req = Request::get(&url);
-
- if !range.is_full() {
- // azblob doesn't support read with suffix range.
- //
- // ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
- if range.offset().is_none() && range.size().is_some() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "azblob doesn't support read with suffix range",
- ));
- }
-
- req = req.header(http::header::RANGE, range.to_header());
- }
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- pub fn azblob_put_blob_request(
- &self,
- path: &str,
- size: Option<usize>,
- content_type: Option<&str>,
- body: AsyncBody,
- ) -> Result<Request<AsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&p)
- );
-
- let mut req = Request::put(&url);
-
- if let Some(size) = size {
- req = req.header(CONTENT_LENGTH, size)
- }
-
- if let Some(ty) = content_type {
- req = req.header(CONTENT_TYPE, ty)
- }
-
- req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob");
-
- // Set body
- let req = req.body(body).map_err(new_request_build_error)?;
-
- Ok(req)
- }
-
- async fn azblob_get_blob_properties(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&p)
- );
-
- let req = Request::head(&url);
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- async fn azblob_delete_blob(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&p)
- );
-
- let req = Request::delete(&url);
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- async fn azblob_copy_blob(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
- let source = build_abs_path(&self.root, from);
- let target = build_abs_path(&self.root, to);
-
- let source = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&source)
- );
- let target = format!(
- "{}/{}/{}",
- self.endpoint,
- self.container,
- percent_encode_path(&target)
- );
-
- let mut req = Request::put(&target)
- .header(X_MS_COPY_SOURCE, source)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- pub(crate) async fn azblob_list_blobs(
- &self,
- path: &str,
- next_marker: &str,
- delimiter: &str,
- limit: Option<usize>,
- ) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let mut url = format!(
- "{}/{}?restype=container&comp=list",
- self.endpoint, self.container
- );
- if !p.is_empty() {
- write!(url, "&prefix={}", percent_encode_path(&p))
- .expect("write into string must succeed");
- }
- if let Some(limit) = limit {
- write!(url, "&maxresults={limit}").expect("write into string must
succeed");
- }
- if !delimiter.is_empty() {
- write!(url, "&delimiter={delimiter}").expect("write into string
must succeed");
- }
- if !next_marker.is_empty() {
- write!(url, "&marker={next_marker}").expect("write into string
must succeed");
- }
-
- let mut req = Request::get(&url)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- async fn azblob_batch_delete(&self, paths: &[String]) ->
Result<Response<IncomingAsyncBody>> {
- // init batch request
- let url = format!(
- "{}/{}?restype=container&comp=batch",
- self.endpoint, self.container
- );
- let mut batch_delete_req_builder =
BatchDeleteRequestBuilder::new(&url);
-
- for path in paths.iter() {
- // build sub requests
- let p = build_abs_path(&self.root, path);
- let encoded_path = percent_encode_path(&p);
-
- let url = Uri::from_str(&format!(
- "{}/{}/{}",
- self.endpoint, self.container, encoded_path
- ))
- .unwrap();
-
- let mut sub_req = Request::delete(&url.to_string())
- .header(CONTENT_LENGTH, 0)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
- self.batch_signer
- .sign(&mut sub_req)
- .map_err(new_request_sign_error)?;
-
- batch_delete_req_builder.append(sub_req);
- }
-
- let mut req = batch_delete_req_builder.try_into_req()?;
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-}
-
#[cfg(test)]
mod tests {
use super::AzblobBuilder;
@@ -916,13 +664,11 @@ mod tests {
.expect("build azblob should be succeeded.");
assert_eq!(
- azblob.endpoint,
+ azblob.core.endpoint,
"https://storagesample.blob.core.chinacloudapi.cn"
);
- assert_eq!(azblob._account_name, "storagesample".to_string());
-
- assert_eq!(azblob.container, "container".to_string());
+ assert_eq!(azblob.core.container, "container".to_string());
assert_eq!(
azblob_builder.account_key.unwrap(),
@@ -940,13 +686,11 @@ mod tests {
.expect("build azblob should be succeeded.");
assert_eq!(
- azblob.endpoint,
+ azblob.core.endpoint,
"https://storagesample.blob.core.windows.net"
);
- assert_eq!(azblob._account_name, "".to_string());
-
- assert_eq!(azblob.container, "container".to_string());
+ assert_eq!(azblob.core.container, "container".to_string());
assert_eq!(azblob_builder.account_key, None);
}
@@ -964,13 +708,11 @@ mod tests {
.expect("build azblob should be succeeded.");
assert_eq!(
- azblob.endpoint,
+ azblob.core.endpoint,
"https://storagesample.blob.core.usgovcloudapi.net"
);
- assert_eq!(azblob._account_name, "storagesample".to_string());
-
- assert_eq!(azblob.container, "container".to_string());
+ assert_eq!(azblob.core.container, "container".to_string());
assert_eq!(
azblob_builder.account_key.unwrap(),
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
new file mode 100644
index 00000000..ddeb33c3
--- /dev/null
+++ b/core/src/services/azblob/core.rs
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::str::FromStr;
+
+use http::header::HeaderName;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use http::Uri;
+use reqsign_0_9::AzureStorageCredential;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
+
+use super::batch::BatchDeleteRequestBuilder;
+use crate::raw::*;
+use crate::*;
+
+const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
+const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
+
+pub struct AzblobCore {
+ pub container: String,
+ pub root: String,
+ pub endpoint: String,
+
+ pub client: HttpClient,
+ pub loader: AzureStorageLoader,
+ pub signer: AzureStorageSigner,
+ pub batch_signer: AzureStorageSigner,
+}
+
+impl Debug for AzblobCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AzblobCore")
+ .field("container", &self.container)
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl AzblobCore {
+ async fn load_credential(&self) -> Result<AzureStorageCredential> {
+ let cred = self
+ .loader
+ .load()
+ .await
+ .map_err(new_request_credential_error)?;
+
+ if let Some(cred) = cred {
+ Ok(cred)
+ } else {
+ Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "no valid credential found",
+ ))
+ }
+ }
+
+ pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ let cred = self.load_credential().await?;
+ self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ }
+
+ async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ let cred = self.load_credential().await?;
+ self.batch_signer
+ .sign(req, &cred)
+ .map_err(new_request_sign_error)
+ }
+
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+}
+
+impl AzblobCore {
+ pub async fn azblob_get_blob(
+ &self,
+ path: &str,
+ range: BytesRange,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::get(&url);
+
+ if !range.is_full() {
+ // azblob doesn't support read with suffix range.
+ //
+ // ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
+ if range.offset().is_none() && range.size().is_some() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "azblob doesn't support read with suffix range",
+ ));
+ }
+
+ req = req.header(http::header::RANGE, range.to_header());
+ }
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub fn azblob_put_blob_request(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::put(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+
+ if let Some(ty) = content_type {
+ req = req.header(CONTENT_TYPE, ty)
+ }
+
+ req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob");
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn azblob_get_blob_properties(
+ &self,
+ path: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::head(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn azblob_delete_blob(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::delete(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn azblob_copy_blob(
+ &self,
+ from: &str,
+ to: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let source = build_abs_path(&self.root, from);
+ let target = build_abs_path(&self.root, to);
+
+ let source = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&source)
+ );
+ let target = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.container,
+ percent_encode_path(&target)
+ );
+
+ let mut req = Request::put(&target)
+ .header(X_MS_COPY_SOURCE, source)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn azblob_list_blobs(
+ &self,
+ path: &str,
+ next_marker: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let mut url = format!(
+ "{}/{}?restype=container&comp=list",
+ self.endpoint, self.container
+ );
+ if !p.is_empty() {
+ write!(url, "&prefix={}", percent_encode_path(&p))
+ .expect("write into string must succeed");
+ }
+ if let Some(limit) = limit {
+ write!(url, "&maxresults={limit}").expect("write into string must
succeed");
+ }
+ if !delimiter.is_empty() {
+ write!(url, "&delimiter={delimiter}").expect("write into string
must succeed");
+ }
+ if !next_marker.is_empty() {
+ write!(url, "&marker={next_marker}").expect("write into string
must succeed");
+ }
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn azblob_batch_delete(
+ &self,
+ paths: &[String],
+ ) -> Result<Response<IncomingAsyncBody>> {
+ // init batch request
+ let url = format!(
+ "{}/{}?restype=container&comp=batch",
+ self.endpoint, self.container
+ );
+ let mut batch_delete_req_builder =
BatchDeleteRequestBuilder::new(&url);
+
+ for path in paths.iter() {
+ // build sub requests
+ let p = build_abs_path(&self.root, path);
+ let encoded_path = percent_encode_path(&p);
+
+ let url = Uri::from_str(&format!(
+ "{}/{}/{}",
+ self.endpoint, self.container, encoded_path
+ ))
+ .unwrap();
+
+ let mut sub_req = Request::delete(&url.to_string())
+ .header(CONTENT_LENGTH, 0)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.batch_sign(&mut sub_req).await?;
+
+ batch_delete_req_builder.append(sub_req);
+ }
+
+ let mut req = batch_delete_req_builder.try_into_req()?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+}
diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs
index c403b95e..7edcfde3 100644
--- a/core/src/services/azblob/mod.rs
+++ b/core/src/services/azblob/mod.rs
@@ -19,6 +19,7 @@ mod backend;
pub use backend::AzblobBuilder as Azblob;
mod batch;
+mod core;
mod error;
mod pager;
mod writer;
diff --git a/core/src/services/azblob/pager.rs
b/core/src/services/azblob/pager.rs
index a1ac7b86..b0fad0e6 100644
--- a/core/src/services/azblob/pager.rs
+++ b/core/src/services/azblob/pager.rs
@@ -24,14 +24,14 @@ use serde::Deserialize;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
-use super::backend::AzblobBackend;
+use super::core::AzblobCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct AzblobPager {
- backend: Arc<AzblobBackend>,
- root: String,
+ core: Arc<AzblobCore>,
+
path: String,
delimiter: String,
limit: Option<usize>,
@@ -42,15 +42,13 @@ pub struct AzblobPager {
impl AzblobPager {
pub fn new(
- backend: Arc<AzblobBackend>,
- root: String,
+ core: Arc<AzblobCore>,
path: String,
delimiter: String,
limit: Option<usize>,
) -> Self {
Self {
- backend,
- root,
+ core,
path,
delimiter,
limit,
@@ -69,7 +67,7 @@ impl oio::Page for AzblobPager {
}
let resp = self
- .backend
+ .core
.azblob_list_blobs(&self.path, &self.next_marker, &self.delimiter,
self.limit)
.await?;
@@ -96,7 +94,7 @@ impl oio::Page for AzblobPager {
for prefix in prefixes {
let de = oio::Entry::new(
- &build_rel_path(&self.root, &prefix.name),
+ &build_rel_path(&self.core.root, &prefix.name),
Metadata::new(EntryMode::DIR),
);
@@ -128,7 +126,7 @@ impl oio::Page for AzblobPager {
})?,
);
- let de = oio::Entry::new(&build_rel_path(&self.root,
&object.name), meta);
+ let de = oio::Entry::new(&build_rel_path(&self.core.root,
&object.name), meta);
entries.push(de);
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index 8cad24c6..84a7e3dc 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -15,45 +15,44 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
-use super::backend::AzblobBackend;
+use super::core::AzblobCore;
use super::error::parse_error;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
pub struct AzblobWriter {
- backend: AzblobBackend,
+ core: Arc<AzblobCore>,
op: OpWrite,
path: String,
}
impl AzblobWriter {
- pub fn new(backend: AzblobBackend, op: OpWrite, path: String) -> Self {
- AzblobWriter { backend, op, path }
+ pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
+ AzblobWriter { core, op, path }
}
}
#[async_trait]
impl oio::Write for AzblobWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- let mut req = self.backend.azblob_put_blob_request(
+ let mut req = self.core.azblob_put_blob_request(
&self.path,
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
- self.backend
- .signer
- .sign(&mut req)
- .map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.backend.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();