This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new aefe7675 refactor(services/azblob): Adopt new reqsign  (#1902)
aefe7675 is described below

commit aefe7675c9e6d3768bcd013e8f4bdf9633db1001
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 13:31:21 2023 +0800

    refactor(services/azblob): Adopt new reqsign  (#1902)
    
    * refactor: Change presign to async for future refactor
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix unit test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * refactor(services/azblob): Adopt new reqsign
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 Cargo.lock                          |  34 +++-
 core/Cargo.toml                     |   2 +
 core/src/raw/http_util/error.rs     |  10 +
 core/src/raw/http_util/mod.rs       |   1 +
 core/src/services/azblob/backend.rs | 358 +++++-------------------------------
 core/src/services/azblob/core.rs    | 317 +++++++++++++++++++++++++++++++
 core/src/services/azblob/mod.rs     |   1 +
 core/src/services/azblob/pager.rs   |  18 +-
 core/src/services/azblob/writer.rs  |  19 +-
 9 files changed, 431 insertions(+), 329 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c57103ae..b4c0a267 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2251,7 +2251,8 @@ dependencies = [
  "quick-xml 0.27.1",
  "rand 0.8.5",
  "redis",
- "reqsign",
+ "reqsign 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "reqsign 0.8.5 
(git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd)",
  "reqwest",
  "rocksdb",
  "serde",
@@ -3161,6 +3162,37 @@ dependencies = [
  "ureq",
 ]
 
+[[package]]
+name = "reqsign"
+version = "0.8.5"
+source = 
"git+https://github.com/Xuanwo/reqsign?rev=c1e44223a984a612b63c80ee8092f0c089ff62bd#c1e44223a984a612b63c80ee8092f0c089ff62bd";
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.21.0",
+ "bytes",
+ "chrono",
+ "dirs 5.0.0",
+ "form_urlencoded",
+ "hex",
+ "hmac",
+ "http",
+ "jsonwebtoken",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.28.1",
+ "rand 0.8.5",
+ "reqwest",
+ "rsa",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+ "tokio",
+]
+
 [[package]]
 name = "reqwest"
 version = "0.11.15"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 48d5fffa..de7dcbae 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -118,7 +118,9 @@ redis = { version = "0.22", features = [
   "tokio-comp",
   "connection-manager",
 ], optional = true }
+# NOTE: we keep this for service migration one by one. And finally we will 
replace reqsign by v0.9.
 reqsign = "0.8.5"
+reqsign_0_9 = { package = "reqsign", git = 
"https://github.com/Xuanwo/reqsign";, rev = 
"c1e44223a984a612b63c80ee8092f0c089ff62bd" }
 reqwest = { version = "0.11.13", features = [
   "multipart",
   "stream",
diff --git a/core/src/raw/http_util/error.rs b/core/src/raw/http_util/error.rs
index 65c338fc..72543768 100644
--- a/core/src/raw/http_util/error.rs
+++ b/core/src/raw/http_util/error.rs
@@ -94,6 +94,16 @@ pub fn new_request_build_error(err: http::Error) -> Error {
         .set_source(err)
 }
 
+/// Create a new error happened during signing request.
+pub fn new_request_credential_error(err: anyhow::Error) -> Error {
+    Error::new(
+        ErrorKind::Unexpected,
+        "loading credentail to sign http request",
+    )
+    .with_operation("reqsign::LoadCredential")
+    .set_source(err)
+}
+
 /// Create a new error happened during signing request.
 pub fn new_request_sign_error(err: anyhow::Error) -> Error {
     Error::new(ErrorKind::Unexpected, "signing http request")
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 9cb60a04..ed5acaca 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -48,6 +48,7 @@ pub use uri::percent_encode_path;
 
 mod error;
 pub use error::new_request_build_error;
+pub use error::new_request_credential_error;
 pub use error::new_request_sign_error;
 pub use error::parse_error_response;
 pub use error::ErrorResponse;
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 20b2d596..5bde2591 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -18,34 +18,26 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
-use std::fmt::Write;
-use std::str::FromStr;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use http::header::HeaderName;
-use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
 use http::StatusCode;
-use http::Uri;
 use log::debug;
-use reqsign::AzureStorageSigner;
+use reqsign_0_9::AzureStorageConfig;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
 
 use super::batch::parse_batch_delete_response;
-use super::batch::BatchDeleteRequestBuilder;
 use super::error::parse_error;
 use super::pager::AzblobPager;
 use super::writer::AzblobWriter;
 use crate::ops::*;
 use crate::raw::*;
+use crate::services::azblob::core::AzblobCore;
 use crate::types::Metadata;
 use crate::*;
 
-const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
-const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
-
 /// Known endpoint suffix Azure Storage Blob services resource URI syntax.
 /// Azure public cloud: https://accountname.blob.core.windows.net
 /// Azure US Government: https://accountname.blob.core.usgovcloudapi.net
@@ -385,64 +377,43 @@ impl Builder for AzblobBuilder {
             })?
         };
 
-        let mut signer_builder = AzureStorageSigner::builder();
-        let mut account_name: Option<String> = None;
-        if let Some(sas_token) = &self.sas_token {
-            signer_builder.security_token(sas_token);
-            match &self.account_name {
-                Some(name) => account_name = Some(name.clone()),
-                None => {
-                    account_name = 
infer_storage_name_from_endpoint(endpoint.as_str());
-                }
-            }
-        } else if let (Some(name), Some(key)) = (&self.account_name, 
&self.account_key) {
-            account_name = Some(name.clone());
-            signer_builder.account_name(name).account_key(key);
-        } else if let Some(key) = &self.account_key {
-            account_name = infer_storage_name_from_endpoint(endpoint.as_str());
-            signer_builder
-                .account_name(account_name.as_ref().unwrap_or(&String::new()))
-                .account_key(key);
-        }
+        let config_loader = AzureStorageConfig {
+            account_name: self
+                .account_name
+                .clone()
+                .or_else(|| 
infer_storage_name_from_endpoint(endpoint.as_str())),
+            account_key: self.account_key.clone(),
+            sas_token: self.sas_token.clone(),
+        };
 
-        let signer = signer_builder.clone().build().map_err(|e| {
-            Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
-                .with_operation("Builder::build")
-                .with_context("service", Scheme::Azblob)
-                .with_context("endpoint", &endpoint)
-                .with_context("container", container.as_str())
-                .set_source(e)
-        })?;
-        signer_builder.omit_service_version();
-        let sub_req_signer = signer_builder.build().map_err(|e| {
-            Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
-                .with_operation("Builder::build")
-                .with_context("service", Scheme::Azblob)
-                .with_context("endpoint", &endpoint)
-                .with_context("container", container.as_str())
-                .set_source(e)
-        })?;
+        let cred_loader = AzureStorageLoader::new(config_loader);
+
+        let signer = AzureStorageSigner::new();
+        let batch_signer = AzureStorageSigner::new().omit_service_version();
 
         debug!("backend build finished: {:?}", &self);
         Ok(AzblobBackend {
-            root,
-            endpoint,
-            signer: Arc::new(signer),
-            batch_signer: Arc::new(sub_req_signer),
-            container: self.container.clone(),
-            client,
-            _account_name: account_name.unwrap_or_default(),
+            core: Arc::new(AzblobCore {
+                root,
+                endpoint,
+                container: self.container.clone(),
+
+                client,
+                loader: cred_loader,
+                signer,
+                batch_signer,
+            }),
         })
     }
 }
 
 fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
-    let _endpoint: &str = endpoint
+    let endpoint: &str = endpoint
         .strip_prefix("http://";)
         .or_else(|| endpoint.strip_prefix("https://";))
         .unwrap_or(endpoint);
 
-    let mut parts = _endpoint.splitn(2, '.');
+    let mut parts = endpoint.splitn(2, '.');
     let storage_name = parts.next();
     let endpoint_suffix = parts
         .next()
@@ -463,14 +434,7 @@ fn infer_storage_name_from_endpoint(endpoint: &str) -> 
Option<String> {
 /// Backend for azblob services.
 #[derive(Debug, Clone)]
 pub struct AzblobBackend {
-    container: String,
-    // TODO: remove pub after 
https://github.com/apache/incubator-opendal/issues/1427
-    pub client: HttpClient,
-    root: String, // root will be "/" or /abc/
-    endpoint: String,
-    pub signer: Arc<AzureStorageSigner>,
-    pub batch_signer: Arc<AzureStorageSigner>,
-    _account_name: String,
+    core: Arc<AzblobCore>,
 }
 
 #[async_trait]
@@ -488,8 +452,8 @@ impl Accessor for AzblobBackend {
 
         let mut am = AccessorInfo::default();
         am.set_scheme(Scheme::Azblob)
-            .set_root(&self.root)
-            .set_name(&self.container)
+            .set_root(&self.core.root)
+            .set_name(&self.core.container)
             .set_max_batch_operations(AZBLOB_BATCH_LIMIT)
             .set_capabilities(Read | Write | List | Scan | Batch | Copy)
             .set_hints(ReadStreamable);
@@ -498,11 +462,13 @@ impl Accessor for AzblobBackend {
     }
 
     async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
-        let mut req = self.azblob_put_blob_request(path, Some(0), None, 
AsyncBody::Empty)?;
+        let mut req = self
+            .core
+            .azblob_put_blob_request(path, Some(0), None, AsyncBody::Empty)?;
 
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
 
@@ -516,7 +482,7 @@ impl Accessor for AzblobBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.azblob_get_blob(path, args.range()).await?;
+        let resp = self.core.azblob_get_blob(path, args.range()).await?;
 
         let status = resp.status();
 
@@ -540,12 +506,12 @@ impl Accessor for AzblobBackend {
 
         Ok((
             RpWrite::default(),
-            AzblobWriter::new(self.clone(), args, path.to_string()),
+            AzblobWriter::new(self.core.clone(), args, path.to_string()),
         ))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
-        let resp = self.azblob_copy_blob(from, to).await?;
+        let resp = self.core.azblob_copy_blob(from, to).await?;
 
         let status = resp.status();
 
@@ -564,7 +530,7 @@ impl Accessor for AzblobBackend {
             return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
         }
 
-        let resp = self.azblob_get_blob_properties(path).await?;
+        let resp = self.core.azblob_get_blob_properties(path).await?;
 
         let status = resp.status();
 
@@ -578,7 +544,7 @@ impl Accessor for AzblobBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.azblob_delete_blob(path).await?;
+        let resp = self.core.azblob_delete_blob(path).await?;
 
         let status = resp.status();
 
@@ -590,8 +556,7 @@ impl Accessor for AzblobBackend {
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         let op = AzblobPager::new(
-            Arc::new(self.clone()),
-            self.root.clone(),
+            self.core.clone(),
             path.to_string(),
             "/".to_string(),
             args.limit(),
@@ -602,8 +567,7 @@ impl Accessor for AzblobBackend {
 
     async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
         let op = AzblobPager::new(
-            Arc::new(self.clone()),
-            self.root.clone(),
+            self.core.clone(),
             path.to_string(),
             "".to_string(),
             args.limit(),
@@ -622,7 +586,7 @@ impl Accessor for AzblobBackend {
             ));
         }
         // construct and complete batch request
-        let resp = self.azblob_batch_delete(&paths).await?;
+        let resp = self.core.azblob_batch_delete(&paths).await?;
 
         // check response status
         if resp.status() != StatusCode::ACCEPTED {
@@ -669,222 +633,6 @@ impl Accessor for AzblobBackend {
     }
 }
 
-impl AzblobBackend {
-    async fn azblob_get_blob(
-        &self,
-        path: &str,
-        range: BytesRange,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::get(&url);
-
-        if !range.is_full() {
-            // azblob doesn't support read with suffix range.
-            //
-            // ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
-            if range.offset().is_none() && range.size().is_some() {
-                return Err(Error::new(
-                    ErrorKind::Unsupported,
-                    "azblob doesn't support read with suffix range",
-                ));
-            }
-
-            req = req.header(http::header::RANGE, range.to_header());
-        }
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub fn azblob_put_blob_request(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        content_type: Option<&str>,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size)
-        }
-
-        if let Some(ty) = content_type {
-            req = req.header(CONTENT_TYPE, ty)
-        }
-
-        req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob");
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    async fn azblob_get_blob_properties(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::head(&url);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn azblob_delete_blob(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::delete(&url);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn azblob_copy_blob(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let source = build_abs_path(&self.root, from);
-        let target = build_abs_path(&self.root, to);
-
-        let source = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&source)
-        );
-        let target = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.container,
-            percent_encode_path(&target)
-        );
-
-        let mut req = Request::put(&target)
-            .header(X_MS_COPY_SOURCE, source)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub(crate) async fn azblob_list_blobs(
-        &self,
-        path: &str,
-        next_marker: &str,
-        delimiter: &str,
-        limit: Option<usize>,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let mut url = format!(
-            "{}/{}?restype=container&comp=list",
-            self.endpoint, self.container
-        );
-        if !p.is_empty() {
-            write!(url, "&prefix={}", percent_encode_path(&p))
-                .expect("write into string must succeed");
-        }
-        if let Some(limit) = limit {
-            write!(url, "&maxresults={limit}").expect("write into string must 
succeed");
-        }
-        if !delimiter.is_empty() {
-            write!(url, "&delimiter={delimiter}").expect("write into string 
must succeed");
-        }
-        if !next_marker.is_empty() {
-            write!(url, "&marker={next_marker}").expect("write into string 
must succeed");
-        }
-
-        let mut req = Request::get(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn azblob_batch_delete(&self, paths: &[String]) -> 
Result<Response<IncomingAsyncBody>> {
-        // init batch request
-        let url = format!(
-            "{}/{}?restype=container&comp=batch",
-            self.endpoint, self.container
-        );
-        let mut batch_delete_req_builder = 
BatchDeleteRequestBuilder::new(&url);
-
-        for path in paths.iter() {
-            // build sub requests
-            let p = build_abs_path(&self.root, path);
-            let encoded_path = percent_encode_path(&p);
-
-            let url = Uri::from_str(&format!(
-                "{}/{}/{}",
-                self.endpoint, self.container, encoded_path
-            ))
-            .unwrap();
-
-            let mut sub_req = Request::delete(&url.to_string())
-                .header(CONTENT_LENGTH, 0)
-                .body(AsyncBody::Empty)
-                .map_err(new_request_build_error)?;
-            self.batch_signer
-                .sign(&mut sub_req)
-                .map_err(new_request_sign_error)?;
-
-            batch_delete_req_builder.append(sub_req);
-        }
-
-        let mut req = batch_delete_req_builder.try_into_req()?;
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::AzblobBuilder;
@@ -916,13 +664,11 @@ mod tests {
             .expect("build azblob should be succeeded.");
 
         assert_eq!(
-            azblob.endpoint,
+            azblob.core.endpoint,
             "https://storagesample.blob.core.chinacloudapi.cn";
         );
 
-        assert_eq!(azblob._account_name, "storagesample".to_string());
-
-        assert_eq!(azblob.container, "container".to_string());
+        assert_eq!(azblob.core.container, "container".to_string());
 
         assert_eq!(
             azblob_builder.account_key.unwrap(),
@@ -940,13 +686,11 @@ mod tests {
             .expect("build azblob should be succeeded.");
 
         assert_eq!(
-            azblob.endpoint,
+            azblob.core.endpoint,
             "https://storagesample.blob.core.windows.net";
         );
 
-        assert_eq!(azblob._account_name, "".to_string());
-
-        assert_eq!(azblob.container, "container".to_string());
+        assert_eq!(azblob.core.container, "container".to_string());
 
         assert_eq!(azblob_builder.account_key, None);
     }
@@ -964,13 +708,11 @@ mod tests {
             .expect("build azblob should be succeeded.");
 
         assert_eq!(
-            azblob.endpoint,
+            azblob.core.endpoint,
             "https://storagesample.blob.core.usgovcloudapi.net";
         );
 
-        assert_eq!(azblob._account_name, "storagesample".to_string());
-
-        assert_eq!(azblob.container, "container".to_string());
+        assert_eq!(azblob.core.container, "container".to_string());
 
         assert_eq!(
             azblob_builder.account_key.unwrap(),
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
new file mode 100644
index 00000000..ddeb33c3
--- /dev/null
+++ b/core/src/services/azblob/core.rs
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::str::FromStr;
+
+use http::header::HeaderName;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use http::Uri;
+use reqsign_0_9::AzureStorageCredential;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
+
+use super::batch::BatchDeleteRequestBuilder;
+use crate::raw::*;
+use crate::*;
+
+const X_MS_BLOB_TYPE: &str = "x-ms-blob-type";
+const X_MS_COPY_SOURCE: &str = "x-ms-copy-source";
+
+pub struct AzblobCore {
+    pub container: String,
+    pub root: String,
+    pub endpoint: String,
+
+    pub client: HttpClient,
+    pub loader: AzureStorageLoader,
+    pub signer: AzureStorageSigner,
+    pub batch_signer: AzureStorageSigner,
+}
+
+impl Debug for AzblobCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AzblobCore")
+            .field("container", &self.container)
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AzblobCore {
+    async fn load_credential(&self) -> Result<AzureStorageCredential> {
+        let cred = self
+            .loader
+            .load()
+            .await
+            .map_err(new_request_credential_error)?;
+
+        if let Some(cred) = cred {
+            Ok(cred)
+        } else {
+            Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "no valid credential found",
+            ))
+        }
+    }
+
+    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+        let cred = self.load_credential().await?;
+        self.signer.sign(req, &cred).map_err(new_request_sign_error)
+    }
+
+    async fn batch_sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+        let cred = self.load_credential().await?;
+        self.batch_signer
+            .sign(req, &cred)
+            .map_err(new_request_sign_error)
+    }
+
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+}
+
+impl AzblobCore {
+    pub async fn azblob_get_blob(
+        &self,
+        path: &str,
+        range: BytesRange,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::get(&url);
+
+        if !range.is_full() {
+            // azblob doesn't support read with suffix range.
+            //
+            // ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
+            if range.offset().is_none() && range.size().is_some() {
+                return Err(Error::new(
+                    ErrorKind::Unsupported,
+                    "azblob doesn't support read with suffix range",
+                ));
+            }
+
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub fn azblob_put_blob_request(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        if let Some(size) = size {
+            req = req.header(CONTENT_LENGTH, size)
+        }
+
+        if let Some(ty) = content_type {
+            req = req.header(CONTENT_TYPE, ty)
+        }
+
+        req = req.header(HeaderName::from_static(X_MS_BLOB_TYPE), "BlockBlob");
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn azblob_get_blob_properties(
+        &self,
+        path: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::head(&url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn azblob_delete_blob(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::delete(&url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn azblob_copy_blob(
+        &self,
+        from: &str,
+        to: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let source = build_abs_path(&self.root, from);
+        let target = build_abs_path(&self.root, to);
+
+        let source = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&source)
+        );
+        let target = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.container,
+            percent_encode_path(&target)
+        );
+
+        let mut req = Request::put(&target)
+            .header(X_MS_COPY_SOURCE, source)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn azblob_list_blobs(
+        &self,
+        path: &str,
+        next_marker: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let mut url = format!(
+            "{}/{}?restype=container&comp=list",
+            self.endpoint, self.container
+        );
+        if !p.is_empty() {
+            write!(url, "&prefix={}", percent_encode_path(&p))
+                .expect("write into string must succeed");
+        }
+        if let Some(limit) = limit {
+            write!(url, "&maxresults={limit}").expect("write into string must 
succeed");
+        }
+        if !delimiter.is_empty() {
+            write!(url, "&delimiter={delimiter}").expect("write into string 
must succeed");
+        }
+        if !next_marker.is_empty() {
+            write!(url, "&marker={next_marker}").expect("write into string 
must succeed");
+        }
+
+        let mut req = Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn azblob_batch_delete(
+        &self,
+        paths: &[String],
+    ) -> Result<Response<IncomingAsyncBody>> {
+        // init batch request
+        let url = format!(
+            "{}/{}?restype=container&comp=batch",
+            self.endpoint, self.container
+        );
+        let mut batch_delete_req_builder = 
BatchDeleteRequestBuilder::new(&url);
+
+        for path in paths.iter() {
+            // build sub requests
+            let p = build_abs_path(&self.root, path);
+            let encoded_path = percent_encode_path(&p);
+
+            let url = Uri::from_str(&format!(
+                "{}/{}/{}",
+                self.endpoint, self.container, encoded_path
+            ))
+            .unwrap();
+
+            let mut sub_req = Request::delete(&url.to_string())
+                .header(CONTENT_LENGTH, 0)
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?;
+
+            self.batch_sign(&mut sub_req).await?;
+
+            batch_delete_req_builder.append(sub_req);
+        }
+
+        let mut req = batch_delete_req_builder.try_into_req()?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+}
diff --git a/core/src/services/azblob/mod.rs b/core/src/services/azblob/mod.rs
index c403b95e..7edcfde3 100644
--- a/core/src/services/azblob/mod.rs
+++ b/core/src/services/azblob/mod.rs
@@ -19,6 +19,7 @@ mod backend;
 pub use backend::AzblobBuilder as Azblob;
 
 mod batch;
+mod core;
 mod error;
 mod pager;
 mod writer;
diff --git a/core/src/services/azblob/pager.rs 
b/core/src/services/azblob/pager.rs
index a1ac7b86..b0fad0e6 100644
--- a/core/src/services/azblob/pager.rs
+++ b/core/src/services/azblob/pager.rs
@@ -24,14 +24,14 @@ use serde::Deserialize;
 use time::format_description::well_known::Rfc2822;
 use time::OffsetDateTime;
 
-use super::backend::AzblobBackend;
+use super::core::AzblobCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct AzblobPager {
-    backend: Arc<AzblobBackend>,
-    root: String,
+    core: Arc<AzblobCore>,
+
     path: String,
     delimiter: String,
     limit: Option<usize>,
@@ -42,15 +42,13 @@ pub struct AzblobPager {
 
 impl AzblobPager {
     pub fn new(
-        backend: Arc<AzblobBackend>,
-        root: String,
+        core: Arc<AzblobCore>,
         path: String,
         delimiter: String,
         limit: Option<usize>,
     ) -> Self {
         Self {
-            backend,
-            root,
+            core,
             path,
             delimiter,
             limit,
@@ -69,7 +67,7 @@ impl oio::Page for AzblobPager {
         }
 
         let resp = self
-            .backend
+            .core
             .azblob_list_blobs(&self.path, &self.next_marker, &self.delimiter, 
self.limit)
             .await?;
 
@@ -96,7 +94,7 @@ impl oio::Page for AzblobPager {
 
         for prefix in prefixes {
             let de = oio::Entry::new(
-                &build_rel_path(&self.root, &prefix.name),
+                &build_rel_path(&self.core.root, &prefix.name),
                 Metadata::new(EntryMode::DIR),
             );
 
@@ -128,7 +126,7 @@ impl oio::Page for AzblobPager {
                         })?,
                 );
 
-            let de = oio::Entry::new(&build_rel_path(&self.root, 
&object.name), meta);
+            let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.name), meta);
 
             entries.push(de);
         }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 8cad24c6..84a7e3dc 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -15,45 +15,44 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use async_trait::async_trait;
 use bytes::Bytes;
 use http::StatusCode;
 
-use super::backend::AzblobBackend;
+use super::core::AzblobCore;
 use super::error::parse_error;
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
 
 pub struct AzblobWriter {
-    backend: AzblobBackend,
+    core: Arc<AzblobCore>,
 
     op: OpWrite,
     path: String,
 }
 
 impl AzblobWriter {
-    pub fn new(backend: AzblobBackend, op: OpWrite, path: String) -> Self {
-        AzblobWriter { backend, op, path }
+    pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self {
+        AzblobWriter { core, op, path }
     }
 }
 
 #[async_trait]
 impl oio::Write for AzblobWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let mut req = self.backend.azblob_put_blob_request(
+        let mut req = self.core.azblob_put_blob_request(
             &self.path,
             Some(bs.len()),
             self.op.content_type(),
             AsyncBody::Bytes(bs),
         )?;
 
-        self.backend
-            .signer
-            .sign(&mut req)
-            .map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.backend.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
 


Reply via email to