This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 5f365cd21f01c61e101a7366c576ce2abb704d9e
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:25:59 2023 +0800

    migrate ftp
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/ftp/backend.rs | 36 ++++++++++++++++++++++++------------
 core/src/services/ftp/writer.rs  | 29 +++++++++++++++--------------
 2 files changed, 39 insertions(+), 26 deletions(-)

diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index ed960c5f7..b059844b6 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -264,7 +264,7 @@ impl Debug for FtpBackend {
 impl Accessor for FtpBackend {
     type Reader = FtpReader;
     type BlockingReader = ();
-    type Writer = FtpWriter;
+    type Writer = oio::TwoWaysWriter<FtpWriter, 
oio::AtLeastBufWriter<FtpWriter>>;
     type BlockingWriter = ();
     type Pager = FtpPager;
     type BlockingPager = ();
@@ -352,13 +352,6 @@ impl Accessor for FtpBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write without content length is not supported",
-            ));
-        }
-
         // Ensure the parent dir exists.
         let parent = get_parent(path);
         let paths: Vec<&str> = parent.split('/').collect();
@@ -381,10 +374,29 @@ impl Accessor for FtpBackend {
             }
         }
 
-        Ok((
-            RpWrite::new(),
-            FtpWriter::new(self.clone(), path.to_string()),
-        ))
+        let w = if args.content_length().is_some() {
+            FtpWriter::new(self.clone(), path.to_string())
+        } else if args.append() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "ftp write with append is not supported yet, refer to 
https://github.com/apache/incubator-opendal/issues/2977 for more details",
+            ));
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "ftp write without content-length is not supported yet, refer 
to https://github.com/apache/incubator-opendal/issues/2978 for more details",
+            ));
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            oio::TwoWaysWriter::Two(
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length()),
+            )
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 6ac3f38c8..734fd4f74 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -20,6 +20,7 @@ use bytes::Bytes;
 use futures::AsyncWriteExt;
 
 use super::backend::FtpBackend;
+use crate::raw::oio::StreamExt;
 use crate::raw::*;
 use crate::*;
 
@@ -37,28 +38,28 @@ impl FtpWriter {
     pub fn new(backend: FtpBackend, path: String) -> Self {
         FtpWriter { backend, path }
     }
+}
 
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+#[async_trait]
+impl oio::Write for FtpWriter {
+    /// TODO
+    ///
+    /// This implement is not reentrant which doesn't fulfill the contract of 
`Write`.
+    /// We should polish it after we can use datastream.
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
         let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
         let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
-        data_stream.write_all(&bs).await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-        })?;
+
+        while let Some(bs) = s.next().await.transpose()? {
+            data_stream.write_all(&bs).await.map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+            })?;
+        }
 
         ftp_stream.finalize_put_stream(data_stream).await?;
 
         Ok(())
     }
-}
-
-#[async_trait]
-impl oio::Write for FtpWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
 
     async fn abort(&mut self) -> Result<()> {
         Ok(())

Reply via email to