This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ada9062b refactor(services/azdfs): Migrate to async reqsign (#1903)
ada9062b is described below

commit ada9062b0dc884a6d9eac78ea88ffdcb76e538e4
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 13:56:15 2023 +0800

    refactor(services/azdfs): Migrate to async reqsign (#1903)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/azdfs/backend.rs | 300 ++++++-------------------------------
 core/src/services/azdfs/core.rs    | 270 +++++++++++++++++++++++++++++++++
 core/src/services/azdfs/mod.rs     |   1 +
 core/src/services/azdfs/pager.rs   |  20 +--
 core/src/services/azdfs/writer.rs  |  28 ++--
 5 files changed, 334 insertions(+), 285 deletions(-)

diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 5a810ac2..62c0ef64 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -18,19 +18,16 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
-use std::fmt::Write;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use http::header::CONTENT_DISPOSITION;
-use http::header::CONTENT_LENGTH;
-use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
 use http::StatusCode;
 use log::debug;
-use reqsign::AzureStorageSigner;
+use reqsign_0_9::AzureStorageConfig;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
 
+use super::core::AzdfsCore;
 use super::error::parse_error;
 use super::pager::AzdfsPager;
 use super::writer::AzdfsWriter;
@@ -252,35 +249,28 @@ impl Builder for AzdfsBuilder {
             })?
         };
 
-        let mut signer_builder = AzureStorageSigner::builder();
-        let mut account_name = None;
-        if let (Some(name), Some(key)) = (&self.account_name, 
&self.account_key) {
-            account_name = Some(name.clone());
-            signer_builder.account_name(name).account_key(key);
-        } else if let Some(key) = &self.account_key {
-            account_name = infer_storage_name_from_endpoint(endpoint.as_str());
-            signer_builder
-                .account_name(account_name.as_ref().unwrap_or(&String::new()))
-                .account_key(key);
-        }
+        let config_loader = AzureStorageConfig {
+            account_name: self
+                .account_name
+                .clone()
+                .or_else(|| 
infer_storage_name_from_endpoint(endpoint.as_str())),
+            account_key: self.account_key.clone(),
+            sas_token: None,
+        };
 
-        let signer = signer_builder.build().map_err(|e| {
-            Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
-                .with_operation("Builder::build")
-                .with_context("service", Scheme::Azdfs)
-                .with_context("endpoint", &endpoint)
-                .with_context("container", filesystem.as_str())
-                .set_source(e)
-        })?;
+        let cred_loader = AzureStorageLoader::new(config_loader);
+        let signer = AzureStorageSigner::new();
 
         debug!("backend build finished: {:?}", &self);
         Ok(AzdfsBackend {
-            root,
-            endpoint,
-            signer: Arc::new(signer),
-            filesystem: self.filesystem.clone(),
-            client,
-            _account_name: account_name.unwrap_or_default(),
+            core: Arc::new(AzdfsCore {
+                filesystem: self.filesystem.clone(),
+                root,
+                endpoint,
+                client,
+                loader: cred_loader,
+                signer,
+            }),
         })
     }
 
@@ -300,13 +290,7 @@ impl Builder for AzdfsBuilder {
 /// Backend for azblob services.
 #[derive(Debug, Clone)]
 pub struct AzdfsBackend {
-    filesystem: String,
-    // TODO: remove pub after 
https://github.com/apache/incubator-opendal/issues/1427
-    pub client: HttpClient,
-    root: String, // root will be "/" or /abc/
-    endpoint: String,
-    pub signer: Arc<AzureStorageSigner>,
-    _account_name: String,
+    core: Arc<AzdfsCore>,
 }
 
 #[async_trait]
@@ -321,8 +305,8 @@ impl Accessor for AzdfsBackend {
     fn info(&self) -> AccessorInfo {
         let mut am = AccessorInfo::default();
         am.set_scheme(Scheme::Azdfs)
-            .set_root(&self.root)
-            .set_name(&self.filesystem)
+            .set_root(&self.core.root)
+            .set_name(&self.core.filesystem)
             .set_capabilities(
                 AccessorCapability::Read | AccessorCapability::Write | 
AccessorCapability::List,
             )
@@ -338,11 +322,13 @@ impl Accessor for AzdfsBackend {
             _ => unimplemented!("not supported object mode"),
         };
 
-        let mut req = self.azdfs_create_request(path, resource, None, None, 
AsyncBody::Empty)?;
+        let mut req =
+            self.core
+                .azdfs_create_request(path, resource, None, None, 
AsyncBody::Empty)?;
 
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
 
@@ -356,7 +342,7 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.azdfs_read(path, args.range()).await?;
+        let resp = self.core.azdfs_read(path, args.range()).await?;
 
         let status = resp.status();
 
@@ -379,7 +365,7 @@ impl Accessor for AzdfsBackend {
 
         Ok((
             RpWrite::default(),
-            AzdfsWriter::new(self.clone(), args, path.to_string()),
+            AzdfsWriter::new(self.core.clone(), args, path.to_string()),
         ))
     }
 
@@ -389,7 +375,7 @@ impl Accessor for AzdfsBackend {
             return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
         }
 
-        let resp = self.azdfs_get_properties(path).await?;
+        let resp = self.core.azdfs_get_properties(path).await?;
 
         let status = resp.status();
 
@@ -403,7 +389,7 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.azdfs_delete(path).await?;
+        let resp = self.core.azdfs_delete(path).await?;
 
         let status = resp.status();
 
@@ -414,216 +400,19 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
-        let op = AzdfsPager::new(
-            Arc::new(self.clone()),
-            self.root.clone(),
-            path.to_string(),
-            args.limit(),
-        );
+        let op = AzdfsPager::new(self.core.clone(), path.to_string(), 
args.limit());
 
         Ok((RpList::default(), op))
     }
 }
 
-impl AzdfsBackend {
-    async fn azdfs_read(
-        &self,
-        path: &str,
-        range: BytesRange,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.filesystem,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::get(&url);
-
-        if !range.is_full() {
-            // azblob doesn't support read with suffix range.
-            //
-            // ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
-            if range.offset().is_none() && range.size().is_some() {
-                return Err(Error::new(
-                    ErrorKind::Unsupported,
-                    "azblob doesn't support read with suffix range",
-                ));
-            }
-
-            req = req.header(http::header::RANGE, range.to_header());
-        }
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    /// resource should be one of `file` or `directory`
-    ///
-    /// ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
-    pub fn azdfs_create_request(
-        &self,
-        path: &str,
-        resource: &str,
-        content_type: Option<&str>,
-        content_disposition: Option<&str>,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = format!(
-            "{}/{}/{}?resource={resource}",
-            self.endpoint,
-            self.filesystem,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::put(&url);
-
-        // Content length must be 0 for create request.
-        req = req.header(CONTENT_LENGTH, 0);
-
-        if let Some(ty) = content_type {
-            req = req.header(CONTENT_TYPE, ty)
-        }
-
-        if let Some(pos) = content_disposition {
-            req = req.header(CONTENT_DISPOSITION, pos)
-        }
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    /// ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
-    pub fn azdfs_update_request(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        // - close: Make this is the final action to this file.
-        // - flush: Flush the file directly.
-        let url = format!(
-            "{}/{}/{}?action=append&close=true&flush=true&position=0",
-            self.endpoint,
-            self.filesystem,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::patch(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size)
-        }
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    async fn azdfs_get_properties(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = format!(
-            "{}/{}/{}?action=getStatus",
-            self.endpoint,
-            self.filesystem,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::head(&url);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn azdfs_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let url = format!(
-            "{}/{}/{}",
-            self.endpoint,
-            self.filesystem,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::delete(&url);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub(crate) async fn azdfs_list(
-        &self,
-        path: &str,
-        continuation: &str,
-        limit: Option<usize>,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path)
-            .trim_end_matches('/')
-            .to_string();
-
-        let mut url = format!(
-            "{}/{}?resource=filesystem&recursive=false",
-            self.endpoint, self.filesystem
-        );
-        if !p.is_empty() {
-            write!(url, "&directory={}", percent_encode_path(&p))
-                .expect("write into string must succeed");
-        }
-        if let Some(limit) = limit {
-            write!(url, "&maxresults={limit}").expect("write into string must 
succeed");
-        }
-        if !continuation.is_empty() {
-            write!(url, "&continuation={continuation}").expect("write into 
string must succeed");
-        }
-
-        let mut req = Request::get(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-}
-
 fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
-    let _endpoint: &str = endpoint
+    let endpoint: &str = endpoint
         .strip_prefix("http://";)
         .or_else(|| endpoint.strip_prefix("https://";))
         .unwrap_or(endpoint);
 
-    let mut parts = _endpoint.splitn(2, '.');
+    let mut parts = endpoint.splitn(2, '.');
     let storage_name = parts.next();
     let endpoint_suffix = parts
         .next()
@@ -672,13 +461,11 @@ mod tests {
             .expect("build azdfs should be succeeded.");
 
         assert_eq!(
-            azdfs.endpoint,
+            azdfs.core.endpoint,
             "https://storagesample.dfs.core.chinacloudapi.cn";
         );
 
-        assert_eq!(azdfs._account_name, "storagesample".to_string());
-
-        assert_eq!(azdfs.filesystem, "filesystem".to_string());
+        assert_eq!(azdfs.core.filesystem, "filesystem".to_string());
 
         assert_eq!(
             azdfs_builder.account_key.unwrap(),
@@ -695,11 +482,12 @@ mod tests {
             .build()
             .expect("build azdfs should be succeeded.");
 
-        assert_eq!(azdfs.endpoint, 
"https://storagesample.dfs.core.windows.net";);
-
-        assert_eq!(azdfs._account_name, "".to_string());
+        assert_eq!(
+            azdfs.core.endpoint,
+            "https://storagesample.dfs.core.windows.net";
+        );
 
-        assert_eq!(azdfs.filesystem, "filesystem".to_string());
+        assert_eq!(azdfs.core.filesystem, "filesystem".to_string());
 
         assert_eq!(azdfs_builder.account_key, None);
     }
diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs
new file mode 100644
index 00000000..2b905112
--- /dev/null
+++ b/core/src/services/azdfs/core.rs
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+
+use http::header::CONTENT_DISPOSITION;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use reqsign_0_9::AzureStorageCredential;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct AzdfsCore {
+    pub filesystem: String,
+    pub root: String,
+    pub endpoint: String,
+
+    pub client: HttpClient,
+    pub loader: AzureStorageLoader,
+    pub signer: AzureStorageSigner,
+}
+
+impl Debug for AzdfsCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.debug_struct("AzdfsCore")
+            .field("filesystem", &self.filesystem)
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish_non_exhaustive()
+    }
+}
+
+impl AzdfsCore {
+    async fn load_credential(&self) -> Result<AzureStorageCredential> {
+        let cred = self
+            .loader
+            .load()
+            .await
+            .map_err(new_request_credential_error)?;
+
+        if let Some(cred) = cred {
+            Ok(cred)
+        } else {
+            Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "no valid credential found",
+            ))
+        }
+    }
+
+    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+        let cred = self.load_credential().await?;
+        self.signer.sign(req, &cred).map_err(new_request_sign_error)
+    }
+
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+}
+
+impl AzdfsCore {
+    pub async fn azdfs_read(
+        &self,
+        path: &str,
+        range: BytesRange,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::get(&url);
+
+        if !range.is_full() {
+            // azblob doesn't support read with suffix range.
+            //
+            // ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
+            if range.offset().is_none() && range.size().is_some() {
+                return Err(Error::new(
+                    ErrorKind::Unsupported,
+                    "azblob doesn't support read with suffix range",
+                ));
+            }
+
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    /// resource should be one of `file` or `directory`
+    ///
+    /// ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
+    pub fn azdfs_create_request(
+        &self,
+        path: &str,
+        resource: &str,
+        content_type: Option<&str>,
+        content_disposition: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path)
+            .trim_end_matches('/')
+            .to_string();
+
+        let url = format!(
+            "{}/{}/{}?resource={resource}",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::put(&url);
+
+        // Content length must be 0 for create request.
+        req = req.header(CONTENT_LENGTH, 0);
+
+        if let Some(ty) = content_type {
+            req = req.header(CONTENT_TYPE, ty)
+        }
+
+        if let Some(pos) = content_disposition {
+            req = req.header(CONTENT_DISPOSITION, pos)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    /// ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
+    pub fn azdfs_update_request(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        // - close: Make this is the final action to this file.
+        // - flush: Flush the file directly.
+        let url = format!(
+            "{}/{}/{}?action=append&close=true&flush=true&position=0",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::patch(&url);
+
+        if let Some(size) = size {
+            req = req.header(CONTENT_LENGTH, size)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn azdfs_get_properties(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path)
+            .trim_end_matches('/')
+            .to_string();
+
+        let url = format!(
+            "{}/{}/{}?action=getStatus",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::head(&url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.client.send(req).await
+    }
+
+    pub async fn azdfs_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path)
+            .trim_end_matches('/')
+            .to_string();
+
+        let url = format!(
+            "{}/{}/{}",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::delete(&url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn azdfs_list(
+        &self,
+        path: &str,
+        continuation: &str,
+        limit: Option<usize>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path)
+            .trim_end_matches('/')
+            .to_string();
+
+        let mut url = format!(
+            "{}/{}?resource=filesystem&recursive=false",
+            self.endpoint, self.filesystem
+        );
+        if !p.is_empty() {
+            write!(url, "&directory={}", percent_encode_path(&p))
+                .expect("write into string must succeed");
+        }
+        if let Some(limit) = limit {
+            write!(url, "&maxresults={limit}").expect("write into string must 
succeed");
+        }
+        if !continuation.is_empty() {
+            write!(url, "&continuation={continuation}").expect("write into 
string must succeed");
+        }
+
+        let mut req = Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+}
diff --git a/core/src/services/azdfs/mod.rs b/core/src/services/azdfs/mod.rs
index 730bd99e..db015ca9 100644
--- a/core/src/services/azdfs/mod.rs
+++ b/core/src/services/azdfs/mod.rs
@@ -18,6 +18,7 @@
 mod backend;
 pub use backend::AzdfsBuilder as Azdfs;
 
+mod core;
 mod error;
 mod pager;
 mod writer;
diff --git a/core/src/services/azdfs/pager.rs b/core/src/services/azdfs/pager.rs
index 5f856a0b..4f414a04 100644
--- a/core/src/services/azdfs/pager.rs
+++ b/core/src/services/azdfs/pager.rs
@@ -23,14 +23,14 @@ use serde_json::de;
 use time::format_description::well_known::Rfc2822;
 use time::OffsetDateTime;
 
-use super::backend::AzdfsBackend;
+use super::core::AzdfsCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct AzdfsPager {
-    backend: Arc<AzdfsBackend>,
-    root: String,
+    core: Arc<AzdfsCore>,
+
     path: String,
     limit: Option<usize>,
 
@@ -39,15 +39,9 @@ pub struct AzdfsPager {
 }
 
 impl AzdfsPager {
-    pub fn new(
-        backend: Arc<AzdfsBackend>,
-        root: String,
-        path: String,
-        limit: Option<usize>,
-    ) -> Self {
+    pub fn new(core: Arc<AzdfsCore>, path: String, limit: Option<usize>) -> 
Self {
         Self {
-            backend,
-            root,
+            core,
             path,
             limit,
 
@@ -65,7 +59,7 @@ impl oio::Page for AzdfsPager {
         }
 
         let resp = self
-            .backend
+            .core
             .azdfs_list(&self.path, &self.continuation, self.limit)
             .await?;
 
@@ -123,7 +117,7 @@ impl oio::Page for AzdfsPager {
                     })?,
                 );
 
-            let mut path = build_rel_path(&self.root, &object.name);
+            let mut path = build_rel_path(&self.core.root, &object.name);
             if mode == EntryMode::DIR {
                 path += "/"
             };
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 2d85f9fb..460f1487 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -15,33 +15,35 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use async_trait::async_trait;
 use bytes::Bytes;
 use http::StatusCode;
 
-use super::backend::AzdfsBackend;
+use super::core::AzdfsCore;
 use super::error::parse_error;
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
 
 pub struct AzdfsWriter {
-    backend: AzdfsBackend,
+    core: Arc<AzdfsCore>,
 
     op: OpWrite,
     path: String,
 }
 
 impl AzdfsWriter {
-    pub fn new(backend: AzdfsBackend, op: OpWrite, path: String) -> Self {
-        AzdfsWriter { backend, op, path }
+    pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self {
+        AzdfsWriter { core, op, path }
     }
 }
 
 #[async_trait]
 impl oio::Write for AzdfsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let mut req = self.backend.azdfs_create_request(
+        let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
             self.op.content_type(),
@@ -49,12 +51,9 @@ impl oio::Write for AzdfsWriter {
             AsyncBody::Empty,
         )?;
 
-        self.backend
-            .signer
-            .sign(&mut req)
-            .map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.backend.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
         match status {
@@ -69,15 +68,12 @@ impl oio::Write for AzdfsWriter {
         }
 
         let mut req =
-            self.backend
+            self.core
                 .azdfs_update_request(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))?;
 
-        self.backend
-            .signer
-            .sign(&mut req)
-            .map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.backend.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
         match status {

Reply via email to