This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 4238621dbba8bb9bfe11f797907e77485db38868 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:18:48 2023 +0800 Migrate dropbox Signed-off-by: Xuanwo <[email protected]> --- core/src/services/dropbox/backend.rs | 32 +++++++++++++++++++++------- core/src/services/dropbox/core.rs | 2 +- core/src/services/dropbox/writer.rs | 41 +++++++++++++++--------------------- 3 files changed, 42 insertions(+), 33 deletions(-) diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index ebb468728..a0315bed5 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -31,6 +31,7 @@ use super::error::parse_error; use super::writer::DropboxWriter; use crate::raw::*; use crate::services::dropbox::error::DropboxErrorResponse; +use crate::services::dropbox::writer::DropboxWriters; use crate::*; static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| { @@ -49,7 +50,7 @@ pub struct DropboxBackend { impl Accessor for DropboxBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = DropboxWriter; + type Writer = oio::TwoWaysWriter<DropboxWriters, oio::AtLeastBufWriter<DropboxWriters>>; type BlockingWriter = (); type Pager = (); type BlockingPager = (); @@ -106,16 +107,31 @@ impl Accessor for DropboxBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { + let writer = DropboxWriter::new(self.core.clone(), path, args.clone()); + + let w = if args.content_length().is_some() { + oio::OneShotWriter::new(writer) + } else if args.append() { return Err(Error::new( ErrorKind::Unsupported, - "write without content length is not supported", + "dropbox write with append is not supported yet.", )); - } - Ok(( - RpWrite::default(), - DropboxWriter::new(self.core.clone(), args, String::from(path)), - )) + } else { + return Err(Error::new( + ErrorKind::Unsupported, + "dropbox write without content-length is not supported yet", + )); + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + oio::TwoWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()), + ) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((RpWrite::default(), w)) } async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 4b3fba758..81d68563f 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -97,7 +97,7 @@ impl DropboxCore { pub async fn dropbox_update( &self, path: &str, - size: Option<usize>, + size: Option<u64>, content_type: Option<&str>, body: AsyncBody, ) -> Result<Response<IncomingAsyncBody>> { diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 72309ff0d..a43c89f5a 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -23,31 +23,42 @@ use http::StatusCode; use super::core::DropboxCore; use super::error::parse_error; +use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; +pub type DropboxWriters = oio::OneShotWriter<DropboxWriter>; + pub struct DropboxWriter { core: Arc<DropboxCore>, - op: OpWrite, path: String, + op: OpWrite, } impl DropboxWriter { - pub fn new(core: Arc<DropboxCore>, op: OpWrite, path: String) -> Self { - DropboxWriter { core, op, path } + pub fn new(core: Arc<DropboxCore>, path: &str, op: OpWrite) -> Self { + DropboxWriter { + core, + path: path.to_string(), + op, + } } +} - async fn write(&mut self, bs: Bytes) -> Result<()> { +#[async_trait] +impl oio::OneShotWrite for DropboxWriter { + async fn write_once(&self, stream: Streamer) -> Result<()> { let resp = self .core .dropbox_update( &self.path, - Some(bs.len()), + Some(stream.size()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::Stream(stream), ) .await?; let status = resp.status(); + match status { StatusCode::OK => { resp.into_body().consume().await?; @@ -57,21 +68,3 @@ impl DropboxWriter { } } } - -#[async_trait] -impl oio::Write for DropboxWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } -}
