This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5f365cd21f01c61e101a7366c576ce2abb704d9e Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:25:59 2023 +0800 migrate ftp Signed-off-by: Xuanwo <[email protected]> --- core/src/services/ftp/backend.rs | 36 ++++++++++++++++++++++++------------ core/src/services/ftp/writer.rs | 29 +++++++++++++++-------------- 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index ed960c5f7..b059844b6 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -264,7 +264,7 @@ impl Debug for FtpBackend { impl Accessor for FtpBackend { type Reader = FtpReader; type BlockingReader = (); - type Writer = FtpWriter; + type Writer = oio::TwoWaysWriter<FtpWriter, oio::AtLeastBufWriter<FtpWriter>>; type BlockingWriter = (); type Pager = FtpPager; type BlockingPager = (); @@ -352,13 +352,6 @@ impl Accessor for FtpBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - // Ensure the parent dir exists. let parent = get_parent(path); let paths: Vec<&str> = parent.split('/').collect(); @@ -381,10 +374,29 @@ impl Accessor for FtpBackend { } } - Ok(( - RpWrite::new(), - FtpWriter::new(self.clone(), path.to_string()), - )) + let w = if args.content_length().is_some() { + FtpWriter::new(self.clone(), path.to_string()) + } else if args.append() { + return Err(Error::new( + ErrorKind::Unsupported, + "ftp write with append is not supported yet, refer to https://github.com/apache/incubator-opendal/issues/2977 for more details", + )); + } else { + return Err(Error::new( + ErrorKind::Unsupported, + "ftp write without content-length is not supported yet, refer to https://github.com/apache/incubator-opendal/issues/2978 for more details", + )); + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + oio::TwoWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()), + ) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((RpWrite::default(), w)) } async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 6ac3f38c8..734fd4f74 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -20,6 +20,7 @@ use bytes::Bytes; use futures::AsyncWriteExt; use super::backend::FtpBackend; +use crate::raw::oio::StreamExt; use crate::raw::*; use crate::*; @@ -37,28 +38,28 @@ impl FtpWriter { pub fn new(backend: FtpBackend, path: String) -> Self { FtpWriter { backend, path } } +} - async fn write(&mut self, bs: Bytes) -> Result<()> { +#[async_trait] +impl oio::Write for FtpWriter { + /// TODO + /// + /// This implement is not reentrant which doesn't fulfill the contract of `Write`. + /// We should polish it after we can use datastream. + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; - data_stream.write_all(&bs).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; + + while let Some(bs) = s.next().await.transpose()? { + data_stream.write_all(&bs).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; + } ftp_stream.finalize_put_stream(data_stream).await?; Ok(()) } -} - -#[async_trait] -impl oio::Write for FtpWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) - } async fn abort(&mut self) -> Result<()> { Ok(())
