This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ada9062b refactor(services/azdfs): Migrate to async reqsign (#1903)
ada9062b is described below
commit ada9062b0dc884a6d9eac78ea88ffdcb76e538e4
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 13:56:15 2023 +0800
refactor(services/azdfs): Migrate to async reqsign (#1903)
Signed-off-by: Xuanwo <[email protected]>
---
core/src/services/azdfs/backend.rs | 300 ++++++-------------------------------
core/src/services/azdfs/core.rs | 270 +++++++++++++++++++++++++++++++++
core/src/services/azdfs/mod.rs | 1 +
core/src/services/azdfs/pager.rs | 20 +--
core/src/services/azdfs/writer.rs | 28 ++--
5 files changed, 334 insertions(+), 285 deletions(-)
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index 5a810ac2..62c0ef64 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -18,19 +18,16 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
-use std::fmt::Write;
use std::sync::Arc;
use async_trait::async_trait;
-use http::header::CONTENT_DISPOSITION;
-use http::header::CONTENT_LENGTH;
-use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
use http::StatusCode;
use log::debug;
-use reqsign::AzureStorageSigner;
+use reqsign_0_9::AzureStorageConfig;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
+use super::core::AzdfsCore;
use super::error::parse_error;
use super::pager::AzdfsPager;
use super::writer::AzdfsWriter;
@@ -252,35 +249,28 @@ impl Builder for AzdfsBuilder {
})?
};
- let mut signer_builder = AzureStorageSigner::builder();
- let mut account_name = None;
- if let (Some(name), Some(key)) = (&self.account_name,
&self.account_key) {
- account_name = Some(name.clone());
- signer_builder.account_name(name).account_key(key);
- } else if let Some(key) = &self.account_key {
- account_name = infer_storage_name_from_endpoint(endpoint.as_str());
- signer_builder
- .account_name(account_name.as_ref().unwrap_or(&String::new()))
- .account_key(key);
- }
+ let config_loader = AzureStorageConfig {
+ account_name: self
+ .account_name
+ .clone()
+ .or_else(||
infer_storage_name_from_endpoint(endpoint.as_str())),
+ account_key: self.account_key.clone(),
+ sas_token: None,
+ };
- let signer = signer_builder.build().map_err(|e| {
- Error::new(ErrorKind::ConfigInvalid, "build AzureStorageSigner")
- .with_operation("Builder::build")
- .with_context("service", Scheme::Azdfs)
- .with_context("endpoint", &endpoint)
- .with_context("container", filesystem.as_str())
- .set_source(e)
- })?;
+ let cred_loader = AzureStorageLoader::new(config_loader);
+ let signer = AzureStorageSigner::new();
debug!("backend build finished: {:?}", &self);
Ok(AzdfsBackend {
- root,
- endpoint,
- signer: Arc::new(signer),
- filesystem: self.filesystem.clone(),
- client,
- _account_name: account_name.unwrap_or_default(),
+ core: Arc::new(AzdfsCore {
+ filesystem: self.filesystem.clone(),
+ root,
+ endpoint,
+ client,
+ loader: cred_loader,
+ signer,
+ }),
})
}
@@ -300,13 +290,7 @@ impl Builder for AzdfsBuilder {
/// Backend for azblob services.
#[derive(Debug, Clone)]
pub struct AzdfsBackend {
- filesystem: String,
- // TODO: remove pub after
https://github.com/apache/incubator-opendal/issues/1427
- pub client: HttpClient,
- root: String, // root will be "/" or /abc/
- endpoint: String,
- pub signer: Arc<AzureStorageSigner>,
- _account_name: String,
+ core: Arc<AzdfsCore>,
}
#[async_trait]
@@ -321,8 +305,8 @@ impl Accessor for AzdfsBackend {
fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Azdfs)
- .set_root(&self.root)
- .set_name(&self.filesystem)
+ .set_root(&self.core.root)
+ .set_name(&self.core.filesystem)
.set_capabilities(
AccessorCapability::Read | AccessorCapability::Write |
AccessorCapability::List,
)
@@ -338,11 +322,13 @@ impl Accessor for AzdfsBackend {
_ => unimplemented!("not supported object mode"),
};
- let mut req = self.azdfs_create_request(path, resource, None, None,
AsyncBody::Empty)?;
+ let mut req =
+ self.core
+ .azdfs_create_request(path, resource, None, None,
AsyncBody::Empty)?;
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();
@@ -356,7 +342,7 @@ impl Accessor for AzdfsBackend {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.azdfs_read(path, args.range()).await?;
+ let resp = self.core.azdfs_read(path, args.range()).await?;
let status = resp.status();
@@ -379,7 +365,7 @@ impl Accessor for AzdfsBackend {
Ok((
RpWrite::default(),
- AzdfsWriter::new(self.clone(), args, path.to_string()),
+ AzdfsWriter::new(self.core.clone(), args, path.to_string()),
))
}
@@ -389,7 +375,7 @@ impl Accessor for AzdfsBackend {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}
- let resp = self.azdfs_get_properties(path).await?;
+ let resp = self.core.azdfs_get_properties(path).await?;
let status = resp.status();
@@ -403,7 +389,7 @@ impl Accessor for AzdfsBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.azdfs_delete(path).await?;
+ let resp = self.core.azdfs_delete(path).await?;
let status = resp.status();
@@ -414,216 +400,19 @@ impl Accessor for AzdfsBackend {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
- let op = AzdfsPager::new(
- Arc::new(self.clone()),
- self.root.clone(),
- path.to_string(),
- args.limit(),
- );
+ let op = AzdfsPager::new(self.core.clone(), path.to_string(),
args.limit());
Ok((RpList::default(), op))
}
}
-impl AzdfsBackend {
- async fn azdfs_read(
- &self,
- path: &str,
- range: BytesRange,
- ) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.filesystem,
- percent_encode_path(&p)
- );
-
- let mut req = Request::get(&url);
-
- if !range.is_full() {
- // azblob doesn't support read with suffix range.
- //
- // ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
- if range.offset().is_none() && range.size().is_some() {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "azblob doesn't support read with suffix range",
- ));
- }
-
- req = req.header(http::header::RANGE, range.to_header());
- }
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- /// resource should be one of `file` or `directory`
- ///
- /// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
- pub fn azdfs_create_request(
- &self,
- path: &str,
- resource: &str,
- content_type: Option<&str>,
- content_disposition: Option<&str>,
- body: AsyncBody,
- ) -> Result<Request<AsyncBody>> {
- let p = build_abs_path(&self.root, path)
- .trim_end_matches('/')
- .to_string();
-
- let url = format!(
- "{}/{}/{}?resource={resource}",
- self.endpoint,
- self.filesystem,
- percent_encode_path(&p)
- );
-
- let mut req = Request::put(&url);
-
- // Content length must be 0 for create request.
- req = req.header(CONTENT_LENGTH, 0);
-
- if let Some(ty) = content_type {
- req = req.header(CONTENT_TYPE, ty)
- }
-
- if let Some(pos) = content_disposition {
- req = req.header(CONTENT_DISPOSITION, pos)
- }
-
- // Set body
- let req = req.body(body).map_err(new_request_build_error)?;
-
- Ok(req)
- }
-
- /// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
- pub fn azdfs_update_request(
- &self,
- path: &str,
- size: Option<usize>,
- body: AsyncBody,
- ) -> Result<Request<AsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- // - close: Make this is the final action to this file.
- // - flush: Flush the file directly.
- let url = format!(
- "{}/{}/{}?action=append&close=true&flush=true&position=0",
- self.endpoint,
- self.filesystem,
- percent_encode_path(&p)
- );
-
- let mut req = Request::patch(&url);
-
- if let Some(size) = size {
- req = req.header(CONTENT_LENGTH, size)
- }
-
- // Set body
- let req = req.body(body).map_err(new_request_build_error)?;
-
- Ok(req)
- }
-
- async fn azdfs_get_properties(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path)
- .trim_end_matches('/')
- .to_string();
-
- let url = format!(
- "{}/{}/{}?action=getStatus",
- self.endpoint,
- self.filesystem,
- percent_encode_path(&p)
- );
-
- let req = Request::head(&url);
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- async fn azdfs_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path)
- .trim_end_matches('/')
- .to_string();
-
- let url = format!(
- "{}/{}/{}",
- self.endpoint,
- self.filesystem,
- percent_encode_path(&p)
- );
-
- let req = Request::delete(&url);
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- pub(crate) async fn azdfs_list(
- &self,
- path: &str,
- continuation: &str,
- limit: Option<usize>,
- ) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path)
- .trim_end_matches('/')
- .to_string();
-
- let mut url = format!(
- "{}/{}?resource=filesystem&recursive=false",
- self.endpoint, self.filesystem
- );
- if !p.is_empty() {
- write!(url, "&directory={}", percent_encode_path(&p))
- .expect("write into string must succeed");
- }
- if let Some(limit) = limit {
- write!(url, "&maxresults={limit}").expect("write into string must
succeed");
- }
- if !continuation.is_empty() {
- write!(url, "&continuation={continuation}").expect("write into
string must succeed");
- }
-
- let mut req = Request::get(&url)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-}
-
fn infer_storage_name_from_endpoint(endpoint: &str) -> Option<String> {
- let _endpoint: &str = endpoint
+ let endpoint: &str = endpoint
.strip_prefix("http://")
.or_else(|| endpoint.strip_prefix("https://"))
.unwrap_or(endpoint);
- let mut parts = _endpoint.splitn(2, '.');
+ let mut parts = endpoint.splitn(2, '.');
let storage_name = parts.next();
let endpoint_suffix = parts
.next()
@@ -672,13 +461,11 @@ mod tests {
.expect("build azdfs should be succeeded.");
assert_eq!(
- azdfs.endpoint,
+ azdfs.core.endpoint,
"https://storagesample.dfs.core.chinacloudapi.cn"
);
- assert_eq!(azdfs._account_name, "storagesample".to_string());
-
- assert_eq!(azdfs.filesystem, "filesystem".to_string());
+ assert_eq!(azdfs.core.filesystem, "filesystem".to_string());
assert_eq!(
azdfs_builder.account_key.unwrap(),
@@ -695,11 +482,12 @@ mod tests {
.build()
.expect("build azdfs should be succeeded.");
- assert_eq!(azdfs.endpoint,
"https://storagesample.dfs.core.windows.net");
-
- assert_eq!(azdfs._account_name, "".to_string());
+ assert_eq!(
+ azdfs.core.endpoint,
+ "https://storagesample.dfs.core.windows.net"
+ );
- assert_eq!(azdfs.filesystem, "filesystem".to_string());
+ assert_eq!(azdfs.core.filesystem, "filesystem".to_string());
assert_eq!(azdfs_builder.account_key, None);
}
diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs
new file mode 100644
index 00000000..2b905112
--- /dev/null
+++ b/core/src/services/azdfs/core.rs
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+
+use http::header::CONTENT_DISPOSITION;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use reqsign_0_9::AzureStorageCredential;
+use reqsign_0_9::AzureStorageLoader;
+use reqsign_0_9::AzureStorageSigner;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct AzdfsCore {
+ pub filesystem: String,
+ pub root: String,
+ pub endpoint: String,
+
+ pub client: HttpClient,
+ pub loader: AzureStorageLoader,
+ pub signer: AzureStorageSigner,
+}
+
+impl Debug for AzdfsCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.debug_struct("AzdfsCore")
+ .field("filesystem", &self.filesystem)
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .finish_non_exhaustive()
+ }
+}
+
+impl AzdfsCore {
+ async fn load_credential(&self) -> Result<AzureStorageCredential> {
+ let cred = self
+ .loader
+ .load()
+ .await
+ .map_err(new_request_credential_error)?;
+
+ if let Some(cred) = cred {
+ Ok(cred)
+ } else {
+ Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "no valid credential found",
+ ))
+ }
+ }
+
+ pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ let cred = self.load_credential().await?;
+ self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ }
+
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+}
+
+impl AzdfsCore {
+ pub async fn azdfs_read(
+ &self,
+ path: &str,
+ range: BytesRange,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::get(&url);
+
+ if !range.is_full() {
+ // azblob doesn't support read with suffix range.
+ //
+ // ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-the-range-header-for-blob-service-operations
+ if range.offset().is_none() && range.size().is_some() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "azblob doesn't support read with suffix range",
+ ));
+ }
+
+ req = req.header(http::header::RANGE, range.to_header());
+ }
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ /// resource should be one of `file` or `directory`
+ ///
+ /// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create
+ pub fn azdfs_create_request(
+ &self,
+ path: &str,
+ resource: &str,
+ content_type: Option<&str>,
+ content_disposition: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path)
+ .trim_end_matches('/')
+ .to_string();
+
+ let url = format!(
+ "{}/{}/{}?resource={resource}",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::put(&url);
+
+ // Content length must be 0 for create request.
+ req = req.header(CONTENT_LENGTH, 0);
+
+ if let Some(ty) = content_type {
+ req = req.header(CONTENT_TYPE, ty)
+ }
+
+ if let Some(pos) = content_disposition {
+ req = req.header(CONTENT_DISPOSITION, pos)
+ }
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ /// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
+ pub fn azdfs_update_request(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ // - close: Make this is the final action to this file.
+ // - flush: Flush the file directly.
+ let url = format!(
+ "{}/{}/{}?action=append&close=true&flush=true&position=0",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::patch(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn azdfs_get_properties(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path)
+ .trim_end_matches('/')
+ .to_string();
+
+ let url = format!(
+ "{}/{}/{}?action=getStatus",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::head(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.client.send(req).await
+ }
+
+ pub async fn azdfs_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path)
+ .trim_end_matches('/')
+ .to_string();
+
+ let url = format!(
+ "{}/{}/{}",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::delete(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn azdfs_list(
+ &self,
+ path: &str,
+ continuation: &str,
+ limit: Option<usize>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path)
+ .trim_end_matches('/')
+ .to_string();
+
+ let mut url = format!(
+ "{}/{}?resource=filesystem&recursive=false",
+ self.endpoint, self.filesystem
+ );
+ if !p.is_empty() {
+ write!(url, "&directory={}", percent_encode_path(&p))
+ .expect("write into string must succeed");
+ }
+ if let Some(limit) = limit {
+ write!(url, "&maxresults={limit}").expect("write into string must
succeed");
+ }
+ if !continuation.is_empty() {
+ write!(url, "&continuation={continuation}").expect("write into
string must succeed");
+ }
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+}
diff --git a/core/src/services/azdfs/mod.rs b/core/src/services/azdfs/mod.rs
index 730bd99e..db015ca9 100644
--- a/core/src/services/azdfs/mod.rs
+++ b/core/src/services/azdfs/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::AzdfsBuilder as Azdfs;
+mod core;
mod error;
mod pager;
mod writer;
diff --git a/core/src/services/azdfs/pager.rs b/core/src/services/azdfs/pager.rs
index 5f856a0b..4f414a04 100644
--- a/core/src/services/azdfs/pager.rs
+++ b/core/src/services/azdfs/pager.rs
@@ -23,14 +23,14 @@ use serde_json::de;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
-use super::backend::AzdfsBackend;
+use super::core::AzdfsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct AzdfsPager {
- backend: Arc<AzdfsBackend>,
- root: String,
+ core: Arc<AzdfsCore>,
+
path: String,
limit: Option<usize>,
@@ -39,15 +39,9 @@ pub struct AzdfsPager {
}
impl AzdfsPager {
- pub fn new(
- backend: Arc<AzdfsBackend>,
- root: String,
- path: String,
- limit: Option<usize>,
- ) -> Self {
+ pub fn new(core: Arc<AzdfsCore>, path: String, limit: Option<usize>) ->
Self {
Self {
- backend,
- root,
+ core,
path,
limit,
@@ -65,7 +59,7 @@ impl oio::Page for AzdfsPager {
}
let resp = self
- .backend
+ .core
.azdfs_list(&self.path, &self.continuation, self.limit)
.await?;
@@ -123,7 +117,7 @@ impl oio::Page for AzdfsPager {
})?,
);
- let mut path = build_rel_path(&self.root, &object.name);
+ let mut path = build_rel_path(&self.core.root, &object.name);
if mode == EntryMode::DIR {
path += "/"
};
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 2d85f9fb..460f1487 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -15,33 +15,35 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
-use super::backend::AzdfsBackend;
+use super::core::AzdfsCore;
use super::error::parse_error;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
pub struct AzdfsWriter {
- backend: AzdfsBackend,
+ core: Arc<AzdfsCore>,
op: OpWrite,
path: String,
}
impl AzdfsWriter {
- pub fn new(backend: AzdfsBackend, op: OpWrite, path: String) -> Self {
- AzdfsWriter { backend, op, path }
+ pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self {
+ AzdfsWriter { core, op, path }
}
}
#[async_trait]
impl oio::Write for AzdfsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- let mut req = self.backend.azdfs_create_request(
+ let mut req = self.core.azdfs_create_request(
&self.path,
"file",
self.op.content_type(),
@@ -49,12 +51,9 @@ impl oio::Write for AzdfsWriter {
AsyncBody::Empty,
)?;
- self.backend
- .signer
- .sign(&mut req)
- .map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.backend.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();
match status {
@@ -69,15 +68,12 @@ impl oio::Write for AzdfsWriter {
}
let mut req =
- self.backend
+ self.core
.azdfs_update_request(&self.path, Some(bs.len()),
AsyncBody::Bytes(bs))?;
- self.backend
- .signer
- .sign(&mut req)
- .map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.backend.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();
match status {