This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 42f6e98a73162405d8576412eb2e3c687f2dd340 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:14:40 2023 +0800 Migrate azdfs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/azblob/backend.rs | 2 +- core/src/services/azdfs/backend.rs | 31 ++++++++++++++++++------- core/src/services/azdfs/core.rs | 2 +- core/src/services/azdfs/writer.rs | 45 ++++++++++++++++--------------------- 4 files changed, 44 insertions(+), 36 deletions(-) diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 3a437b6c8..76aa65480 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -613,7 +613,7 @@ impl Accessor for AzblobBackend { } else { return Err(Error::new( ErrorKind::Unsupported, - "azblob write without neither content-length nor append is not supported yet", + "azblob write with block blobs is not supported yet, refer to https://github.com/apache/incubator-opendal/issues/2113 for more details", )); }; diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 3882f4fe2..9edfe3e6d 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -32,6 +32,7 @@ use super::error::parse_error; use super::pager::AzdfsPager; use super::writer::AzdfsWriter; use crate::raw::*; +use crate::services::azdfs::writer::AzdfsWriters; use crate::*; /// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax. @@ -230,7 +231,7 @@ pub struct AzdfsBackend { impl Accessor for AzdfsBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = AzdfsWriter; + type Writer = oio::TwoWaysWriter<AzdfsWriters, oio::AtLeastBufWriter<AzdfsWriters>>; type BlockingWriter = (); type Pager = AzdfsPager; type BlockingPager = (); @@ -296,17 +297,31 @@ impl Accessor for AzdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { + let writer = AzdfsWriter::new(self.core.clone(), path, args.clone()); + + let w = if args.content_length().is_some() { + oio::OneShotWriter::new(writer) + } else if args.append() { return Err(Error::new( ErrorKind::Unsupported, - "write without content length is not supported", + "azdfs write with append is not supported yet, refer to https://github.com/apache/incubator-opendal/issues/2977 for more details", )); - } + } else { + return Err(Error::new( + ErrorKind::Unsupported, + "azdfs write without content-length is not supported yet, refer to https://github.com/apache/incubator-opendal/issues/2978 for more details", + )); + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + oio::TwoWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()), + ) + } else { + oio::TwoWaysWriter::One(w) + }; - Ok(( - RpWrite::default(), - AzdfsWriter::new(self.core.clone(), args, path.to_string()), - )) + Ok((RpWrite::default(), w)) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> { diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs index 409216d17..61bab62b3 100644 --- a/core/src/services/azdfs/core.rs +++ b/core/src/services/azdfs/core.rs @@ -204,7 +204,7 @@ impl AzdfsCore { pub fn azdfs_update_request( &self, path: &str, - size: Option<usize>, + size: Option<u64>, body: AsyncBody, ) -> Result<Request<AsyncBody>> { let p = build_abs_path(&self.root, path); diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index e6ae3f84a..9f5d014c0 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -18,27 +18,36 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzdfsCore; use super::error::parse_error; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; +pub type AzdfsWriters = oio::OneShotWriter<AzdfsWriter>; + pub struct AzdfsWriter { core: Arc<AzdfsCore>, - op: OpWrite, path: String, + op: OpWrite, } impl AzdfsWriter { - pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self { - AzdfsWriter { core, op, path } + pub fn new(core: Arc<AzdfsCore>, path: &str, op: OpWrite) -> Self { + AzdfsWriter { + core, + path: path.to_string(), + op, + } } +} - async fn write(&mut self, bs: Bytes) -> Result<()> { +#[async_trait] +impl oio::OneShotWrite for AzdfsWriter { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.azdfs_create_request( &self.path, "file", @@ -63,9 +72,11 @@ impl AzdfsWriter { } } - let mut req = - self.core - .azdfs_update_request(&self.path, Some(bs.len()), AsyncBody::Bytes(bs))?; + let mut req = self.core.azdfs_update_request( + &self.path, + Some(stream.size()), + AsyncBody::Stream(stream), + )?; self.core.sign(&mut req).await?; @@ -83,21 +94,3 @@ impl AzdfsWriter { } } } - -#[async_trait] -impl oio::Write for AzdfsWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -}
