This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch new-reqsign in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 3254cde4265fb1d80ffb8dd3e1d58c8f31a2ea68 Author: Xuanwo <[email protected]> AuthorDate: Tue Apr 11 13:09:25 2023 +0800 refactor(services/azblob): Adopt new reqsign Signed-off-by: Xuanwo <[email protected]> --- Cargo.lock | 34 +++- core/Cargo.toml | 2 + core/src/raw/http_util/error.rs | 10 + core/src/raw/http_util/mod.rs | 1 + core/src/services/azblob/backend.rs | 358 +++++------------------------------- core/src/services/azblob/core.rs | 317 +++++++++++++++++++++++++++++++ core/src/services/azblob/mod.rs | 1 + core/src/services/azblob/pager.rs | 18 +- core/src/services/azblob/writer.rs | 19 +- 9 files changed, 431 insertions(+), 329 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c57103ae..b4c0a267 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,7 +2251,8 @@ dependencies = [ "quick-xml 0.27.1", "rand 0.8.5", "redis", - "reqsign", + "reqsign 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", + "reqsign 0.8.5 (git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd)", "reqwest", "rocksdb", "serde", @@ -3161,6 +3162,37 @@ dependencies = [ "ureq", ] +[[package]] +name = "reqsign" +version = "0.8.5" +source = "git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd#c1e44223a984a612b63c80ee8092f0c089ff62bd" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.0", + "bytes", + "chrono", + "dirs 5.0.0", + "form_urlencoded", + "hex", + "hmac", + "http", + "jsonwebtoken", + "log", + "once_cell", + "percent-encoding", + "quick-xml 0.28.1", + "rand 0.8.5", + "reqwest", + "rsa", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.15" diff --git a/core/Cargo.toml b/core/Cargo.toml index 48d5fffa..de7dcbae 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -118,7 +118,9 @@ redis = { version = "0.22", features = [ "tokio-comp", "connection-manager", ], optional = true } +# NOTE: we keep this for service migration one by one. And finally we will replace reqsign by v0.9. reqsign = "0.8.5" +reqsign_0_9 = { package = "reqsign", git = "https://github.com/Xuanwo/reqsign", rev = "c1e44223a984a612b63c80ee8092f0c089ff62bd" } reqwest = { version = "0.11.13", features = [ "multipart", "stream", diff --git a/core/src/raw/http_util/error.rs b/core/src/raw/http_util/error.rs index 65c338fc..72543768 100644 --- a/core/src/raw/http_util/error.rs +++ b/core/src/raw/http_util/error.rs @@ -94,6 +94,16 @@ pub fn new_request_build_error(err: http::Error) -> Error { .set_source(err) } +/// Create a new error happened during signing request. +pub fn new_request_credential_error(err: anyhow::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "loading credentail to sign http request", + ) + .with_operation("reqsign::LoadCredential") + .set_source(err) +} + /// Create a new error happened during signing request. pub fn new_request_sign_error(err: anyhow::Error) -> Error { Error::new(ErrorKind::Unexpected, "signing http request") diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index 9cb60a04..ed5acaca 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -48,6 +48,7 @@ pub use uri::percent_encode_path; mod error; pub use error::new_request_build_error; +pub use error::new_request_credential_error; pub use error::new_request_sign_error; pub use error::parse_error_response; pub use error::ErrorResponse; diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 20b2d596..5bde2591 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -18,34 +18,26 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::fmt::Write; -use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; -use http::header::HeaderName; -use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; -use http::Request; -use http::Response; use http::StatusCode; -use http::Uri; use log::debug; -use reqsign::AzureStorageSigner; +use reqsign_0_9::AzureStorageConfig; +use reqsign_0_9::AzureStorageLoader; +use reqsign_0_9::AzureStorageSigner; use super::batch::parse_batch_delete_response; -use super::batch::BatchDeleteRequestBuilder; use super::error::parse_error; use super::pager::AzblobPager; use super::writer::AzblobWriter; use crate::ops::*; use crate::raw::*; +use crate::services::azblob::core::AzblobCore; use crate::types::Metadata; use crate::*; -const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -const X_MS_COPY_SOURCE: &str = "x-ms-copy-source"; - /// Known endpoint suffix Azure Storage Blob services resource URI syntax. /// Azure public cloud: https://accountname.blob.core.windows.net /// Azure US Government: https://accountname.blob.core.usgovcloudapi.net @@ -385,64 +377,43 @@ impl Builder for AzblobBuilder { })? }; - let mut signer_builder = AzureStorageSigner::builder(); - let mut account_name: Option<String> = None; - if let Some(sas_token) = &self.sas_token { - signer_builder.security_token(sas_token); - match &self.account_name { - Some(name) => account_name = Some(name.clone()), - None => { - account_name = infer_storage_name_from_endpoint(endpoint.as_str()); - } - } - } else if let (Some(name), Some(key)) = (&self.account_name, &self.account_key) { - account_name = Some(name.clone()); - signer_builder.account_name(name).account_key(key); - } else if let Some(key) = &self.account_key { - account_name = infer_storage_name_from_endpoint(endpoint.as_str()); - signer_builder - .account_name(account_name.as_ref().unwrap_or(&String::new())) - .account_key(key); - } + let config_loader = AzureStorageConfig { + account_name: self + .account_name + .clone() + .or_else(|| infer_storage_name_from_endpoint(endpoint.as_str())), + account_key: self.account_key.clone(), + sas_token: self.sas_token.clone(), + }; - let signer = signer_builder.clone().build().map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner") - .with_operation("Builder::build") - .with_context("service", Scheme::Azblob) - .with_context("endpoint", &endpoint) - .with_context("container", container.as_str()) - .set_source(e) - })?; - signer_builder.omit_service_version(); - let sub_req_signer = signer_builder.build().map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner") - .with_operation("Builder::build") - .with_context("service", Scheme::Azblob) - .with_context("endpoint", &endpoint) - .with_context("container", container.as_str()) - .set_source(e) - })?; + let cred_loader = AzureStorageLoader::new(config_loader); + + let signer = AzureStorageSigner::new(); + let batch_signer = AzureStorageSigner::new().omit_service_version(); debug!("backend build finished: {:?}", &self); Ok(AzblobBackend { - root, - endpoint, - signer: Arc::new(signer), - batch_signer: Arc::new(sub_req_signer), - container: self.container.clone(), - client, - _account_name: account_name.unwrap_or_default(), + core: Arc::new(AzblobCore { + root, + endpoint, + container: self.container.clone(), + + client, + loader: cred_loader, + signer, + batch_signer, + }), }) } } fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> { - let _endpoint: &str = endpoint + let endpoint: &str = endpoint .strip_prefix("http://") .or_else(|| endpoint.strip_prefix("https://")) .unwrap_or(endpoint); - let mut parts = _endpoint.splitn(2, '.'); + let mut parts = endpoint.splitn(2, '.'); let storage_name = parts.next(); let endpoint_suffix = parts .next() @@ -463,14 +434,7 @@ fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> { /// Backend for azblob services. #[derive(Debug, Clone)] pub struct AzblobBackend { - container: String, - // TODO: remove pub after https://github.com/apache/incubator-opendal/issues/1427 - pub client: HttpClient, - root: String, // root will be "/" or /abc/ - endpoint: String, - pub signer: Arc<AzureStorageSigner>, - pub batch_signer: Arc<AzureStorageSigner>, - _account_name: String, + core: Arc<AzblobCore>, } #[async_trait] @@ -488,8 +452,8 @@ impl Accessor for AzblobBackend { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Azblob) - .set_root(&self.root) - .set_name(&self.container) + .set_root(&self.core.root) + .set_name(&self.core.container) .set_max_batch_operations(AZBLOB_BATCH_LIMIT) .set_capabilities(Read | Write | List | Scan | Batch | Copy) .set_hints(ReadStreamable); @@ -498,11 +462,13 @@ impl Accessor for AzblobBackend { } async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> { - let mut req = self.azblob_put_blob_request(path, Some(0), None, AsyncBody::Empty)?; + let mut req = self + .core + .azblob_put_blob_request(path, Some(0), None, AsyncBody::Empty)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status(); @@ -516,7 +482,7 @@ impl Accessor for AzblobBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.azblob_get_blob(path, args.range()).await?; + let resp = self.core.azblob_get_blob(path, args.range()).await?; let status = resp.status(); @@ -540,12 +506,12 @@ impl Accessor for AzblobBackend { Ok(( RpWrite::default(), - AzblobWriter::new(self.clone(), args, path.to_string()), + AzblobWriter::new(self.core.clone(), args, path.to_string()), )) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { - let resp = self.azblob_copy_blob(from, to).await?; + let resp = self.core.azblob_copy_blob(from, to).await?; let status = resp.status(); @@ -564,7 +530,7 @@ impl Accessor for AzblobBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.azblob_get_blob_properties(path).await?; + let resp = self.core.azblob_get_blob_properties(path).await?; let status = resp.status(); @@ -578,7 +544,7 @@ impl Accessor for AzblobBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { - let resp = self.azblob_delete_blob(path).await?; + let resp = self.core.azblob_delete_blob(path).await?; let status = resp.status(); @@ -590,8 +556,7 @@ impl Accessor for AzblobBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { let op = AzblobPager::new( - Arc::new(self.clone()), - self.root.clone(), + self.core.clone(), path.to_string(), "/".to_string(), args.limit(), @@ -602,8 +567,7 @@ impl Accessor for AzblobBackend { async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { let op = AzblobPager::new( - Arc::new(self.clone()), - self.root.clone(), + self.core.clone(), path.to_string(), "".to_string(), args.limit(), @@ -622,7 +586,7 @@ impl Accessor for AzblobBackend { )); } // construct and complete batch request - let resp = self.azblob_batch_delete(&paths).await?; + let resp = self.core.azblob_batch_delete(&paths).await?; // check response status if resp.status() != StatusCode::ACCEPTED { @@ -669,222 +633,6 @@ impl Accessor for AzblobBackend { } } -impl AzblobBackend { - async fn azblob_get_blob( - &self, - path: &str, - range: BytesRange, - ) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&p) - ); - - let mut req = Request::get(&url); - - if !range.is_full() { - // azblob doesn't support read with suffix range. - // - // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations - if range.offset().is_none() && range.size().is_some() { - return Err(Error::new( - ErrorKind::Unsupported, - "azblob doesn't support read with suffix range", - )); - } - - req = req.header(http::header::RANGE, range.to_header()); - } - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub fn azblob_put_blob_request( - &self, - path: &str, - size: Option<usize>, - content_type: Option<&str>, - body: AsyncBody, - ) -> Result<Request<AsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&p) - ); - - let mut req = Request::put(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - if let Some(ty) = content_type { - req = req.header(CONTENT_TYPE, ty) - } - - req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob"); - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn azblob_get_blob_properties(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&p) - ); - - let req = Request::head(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn azblob_delete_blob(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&p) - ); - - let req = Request::delete(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn azblob_copy_blob(&self, from: &str, to: &str) -> Result<Response<IncomingAsyncBody>> { - let source = build_abs_path(&self.root, from); - let target = build_abs_path(&self.root, to); - - let source = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&source) - ); - let target = format!( - "{}/{}/{}", - self.endpoint, - self.container, - percent_encode_path(&target) - ); - - let mut req = Request::put(&target) - .header(X_MS_COPY_SOURCE, source) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub(crate) async fn azblob_list_blobs( - &self, - path: &str, - next_marker: &str, - delimiter: &str, - limit: Option<usize>, - ) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let mut url = format!( - "{}/{}?restype=container&comp=list", - self.endpoint, self.container - ); - if !p.is_empty() { - write!(url, "&prefix={}", percent_encode_path(&p)) - .expect("write into string must succeed"); - } - if let Some(limit) = limit { - write!(url, "&maxresults={limit}").expect("write into string must succeed"); - } - if !delimiter.is_empty() { - write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); - } - if !next_marker.is_empty() { - write!(url, "&marker={next_marker}").expect("write into string must succeed"); - } - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn azblob_batch_delete(&self, paths: &[String]) -> Result<Response<IncomingAsyncBody>> { - // init batch request - let url = format!( - "{}/{}?restype=container&comp=batch", - self.endpoint, self.container - ); - let mut batch_delete_req_builder = BatchDeleteRequestBuilder::new(&url); - - for path in paths.iter() { - // build sub requests - let p = build_abs_path(&self.root, path); - let encoded_path = percent_encode_path(&p); - - let url = Uri::from_str(&format!( - "{}/{}/{}", - self.endpoint, self.container, encoded_path - )) - .unwrap(); - - let mut sub_req = Request::delete(&url.to_string()) - .header(CONTENT_LENGTH, 0) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - self.batch_signer - .sign(&mut sub_req) - .map_err(new_request_sign_error)?; - - batch_delete_req_builder.append(sub_req); - } - - let mut req = batch_delete_req_builder.try_into_req()?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } -} - #[cfg(test)] mod tests { use super::AzblobBuilder; @@ -916,13 +664,11 @@ mod tests { .expect("build azblob should be succeeded."); assert_eq!( - azblob.endpoint, + azblob.core.endpoint, "https://storagesample.blob.core.chinacloudapi.cn" ); - assert_eq!(azblob._account_name, "storagesample".to_string()); - - assert_eq!(azblob.container, "container".to_string()); + assert_eq!(azblob.core.container, "container".to_string()); assert_eq!( azblob_builder.account_key.unwrap(), @@ -940,13 +686,11 @@ mod tests { .expect("build azblob should be succeeded."); assert_eq!( - azblob.endpoint, + azblob.core.endpoint, "https://storagesample.blob.core.windows.net" ); - assert_eq!(azblob._account_name, "".to_string()); - - assert_eq!(azblob.container, "container".to_string()); + assert_eq!(azblob.core.container, "container".to_string()); assert_eq!(azblob_builder.account_key, None); } @@ -964,13 +708,11 @@ mod tests { .expect("build azblob should be succeeded."); assert_eq!( - azblob.endpoint, + azblob.core.endpoint, "https://storagesample.blob.core.usgovcloudapi.net" ); - assert_eq!(azblob._account_name, "storagesample".to_string()); - - assert_eq!(azblob.container, "container".to_string()); + assert_eq!(azblob.core.container, "container".to_string()); assert_eq!( azblob_builder.account_key.unwrap(), diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs new file mode 100644 index 00000000..ddeb33c3 --- /dev/null +++ b/core/src/services/azblob/core.rs @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; +use std::str::FromStr; + +use http::header::HeaderName; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::Request; +use http::Response; +use http::Uri; +use reqsign_0_9::AzureStorageCredential; +use reqsign_0_9::AzureStorageLoader; +use reqsign_0_9::AzureStorageSigner; + +use super::batch::BatchDeleteRequestBuilder; +use crate::raw::*; +use crate::*; + +const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; +const X_MS_COPY_SOURCE: &str = "x-ms-copy-source"; + +pub struct AzblobCore { + pub container: String, + pub root: String, + pub endpoint: String, + + pub client: HttpClient, + pub loader: AzureStorageLoader, + pub signer: AzureStorageSigner, + pub batch_signer: AzureStorageSigner, +} + +impl Debug for AzblobCore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("AzblobCore") + .field("container", &self.container) + .field("root", &self.root) + .field("endpoint", &self.endpoint) + .finish_non_exhaustive() + } +} + +impl AzblobCore { + async fn load_credential(&self) -> Result<AzureStorageCredential> { + let cred = self + .loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(cred) + } else { + Err(Error::new( + ErrorKind::ConfigInvalid, + "no valid credential found", + )) + } + } + + pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { + let cred = self.load_credential().await?; + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> { + let cred = self.load_credential().await?; + self.batch_signer + .sign(req, &cred) + .map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> { + self.client.send(req).await + } +} + +impl AzblobCore { + pub async fn azblob_get_blob( + &self, + path: &str, + range: BytesRange, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + if !range.is_full() { + // azblob doesn't support read with suffix range. + // + // ref: https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations + if range.offset().is_none() && range.size().is_some() { + return Err(Error::new( + ErrorKind::Unsupported, + "azblob doesn't support read with suffix range", + )); + } + + req = req.header(http::header::RANGE, range.to_header()); + } + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub fn azblob_put_blob_request( + &self, + path: &str, + size: Option<usize>, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&p) + ); + + let mut req = Request::put(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(ty) = content_type { + req = req.header(CONTENT_TYPE, ty) + } + + req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob"); + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn azblob_get_blob_properties( + &self, + path: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&p) + ); + + let req = Request::head(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azblob_delete_blob(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&p) + ); + + let req = Request::delete(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azblob_copy_blob( + &self, + from: &str, + to: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let source = build_abs_path(&self.root, from); + let target = build_abs_path(&self.root, to); + + let source = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&source) + ); + let target = format!( + "{}/{}/{}", + self.endpoint, + self.container, + percent_encode_path(&target) + ); + + let mut req = Request::put(&target) + .header(X_MS_COPY_SOURCE, source) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azblob_list_blobs( + &self, + path: &str, + next_marker: &str, + delimiter: &str, + limit: Option<usize>, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/{}?restype=container&comp=list", + self.endpoint, self.container + ); + if !p.is_empty() { + write!(url, "&prefix={}", percent_encode_path(&p)) + .expect("write into string must succeed"); + } + if let Some(limit) = limit { + write!(url, "&maxresults={limit}").expect("write into string must succeed"); + } + if !delimiter.is_empty() { + write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); + } + if !next_marker.is_empty() { + write!(url, "&marker={next_marker}").expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn azblob_batch_delete( + &self, + paths: &[String], + ) -> Result<Response<IncomingAsyncBody>> { + // init batch request + let url = format!( + "{}/{}?restype=container&comp=batch", + self.endpoint, self.container + ); + let mut batch_delete_req_builder = BatchDeleteRequestBuilder::new(&url); + + for path in paths.iter() { + // build sub requests + let p = build_abs_path(&self.root, path); + let encoded_path = percent_encode_path(&p); + + let url = Uri::from_str(&format!( + "{}/{}/{}", + self.endpoint, self.container, encoded_path + )) + .unwrap(); + + let mut sub_req = Request::delete(&url.to_string()) + .header(CONTENT_LENGTH, 0) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.batch_sign(&mut sub_req).await?; + + batch_delete_req_builder.append(sub_req); + } + + let mut req = batch_delete_req_builder.try_into_req()?; + + self.sign(&mut req).await?; + self.send(req).await + } +} diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs index c403b95e..7edcfde3 100644 --- a/core/src/services/azblob/mod.rs +++ b/core/src/services/azblob/mod.rs @@ -19,6 +19,7 @@ mod backend; pub use backend::AzblobBuilder as Azblob; mod batch; +mod core; mod error; mod pager; mod writer; diff --git a/core/src/services/azblob/pager.rs b/core/src/services/azblob/pager.rs index a1ac7b86..b0fad0e6 100644 --- a/core/src/services/azblob/pager.rs +++ b/core/src/services/azblob/pager.rs @@ -24,14 +24,14 @@ use serde::Deserialize; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; -use super::backend::AzblobBackend; +use super::core::AzblobCore; use super::error::parse_error; use crate::raw::*; use crate::*; pub struct AzblobPager { - backend: Arc<AzblobBackend>, - root: String, + core: Arc<AzblobCore>, + path: String, delimiter: String, limit: Option<usize>, @@ -42,15 +42,13 @@ pub struct AzblobPager { impl AzblobPager { pub fn new( - backend: Arc<AzblobBackend>, - root: String, + core: Arc<AzblobCore>, path: String, delimiter: String, limit: Option<usize>, ) -> Self { Self { - backend, - root, + core, path, delimiter, limit, @@ -69,7 +67,7 @@ impl oio::Page for AzblobPager { } let resp = self - .backend + .core .azblob_list_blobs(&self.path, &self.next_marker, &self.delimiter, self.limit) .await?; @@ -96,7 +94,7 @@ impl oio::Page for AzblobPager { for prefix in prefixes { let de = oio::Entry::new( - &build_rel_path(&self.root, &prefix.name), + &build_rel_path(&self.core.root, &prefix.name), Metadata::new(EntryMode::DIR), ); @@ -128,7 +126,7 @@ impl oio::Page for AzblobPager { })?, ); - let de = oio::Entry::new(&build_rel_path(&self.root, &object.name), meta); + let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.name), meta); entries.push(de); } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 8cad24c6..84a7e3dc 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -15,45 +15,44 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::AzblobBackend; +use super::core::AzblobCore; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct AzblobWriter { - backend: AzblobBackend, + core: Arc<AzblobCore>, op: OpWrite, path: String, } impl AzblobWriter { - pub fn new(backend: AzblobBackend, op: OpWrite, path: String) -> Self { - AzblobWriter { backend, op, path } + pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self { + AzblobWriter { core, op, path } } } #[async_trait] impl oio::Write for AzblobWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut req = self.backend.azblob_put_blob_request( + let mut req = self.core.azblob_put_blob_request( &self.path, Some(bs.len()), self.op.content_type(), AsyncBody::Bytes(bs), )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status();
