This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch write_can_multig
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ec28cb41082bdaa1be5236206fbae1bfec946f2c
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 09:24:03 2023 +0800

    Fix ftp
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/ftp/backend.rs | 11 ++++----
 core/src/services/ftp/writer.rs  | 60 ++++++++++------------------------------
 2 files changed, 21 insertions(+), 50 deletions(-)

diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 5e33d8e36..2fe8ffe9a 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -42,6 +42,7 @@ use super::pager::FtpPager;
 use super::util::FtpReader;
 use super::writer::FtpWriter;
 use crate::raw::*;
+use crate::services::ftp::writer::FtpWriters;
 use crate::*;
 
 /// FTP and FTPS services support.
@@ -264,7 +265,7 @@ impl Debug for FtpBackend {
 impl Accessor for FtpBackend {
     type Reader = FtpReader;
     type BlockingReader = ();
-    type Writer = FtpWriter;
+    type Writer = FtpWriters;
     type BlockingWriter = ();
     type Pager = FtpPager;
     type BlockingPager = ();
@@ -374,10 +375,10 @@ impl Accessor for FtpBackend {
             }
         }
 
-        Ok((
-            RpWrite::new(),
-            FtpWriter::new(self.clone(), path.to_string()),
-        ))
+        let w = FtpWriter::new(self.clone(), path.to_string());
+        let w = oio::OneShotWriter::new(w);
+
+        Ok((RpWrite::new(), w))
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 4b2658907..79ae8e55d 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,24 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::task::ready;
-use std::task::Context;
-use std::task::Poll;
-
 use async_trait::async_trait;
-use futures::future::BoxFuture;
 use futures::AsyncWriteExt;
-use futures::FutureExt;
 
 use super::backend::FtpBackend;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
+pub type FtpWriters = oio::OneShotWriter<FtpWriter>;
+
 pub struct FtpWriter {
     backend: FtpBackend,
     path: String,
-
-    fut: Option<BoxFuture<'static, Result<usize>>>,
 }
 
 /// # TODO
@@ -42,11 +37,7 @@ pub struct FtpWriter {
 /// After we can use data stream, we should return it directly.
 impl FtpWriter {
     pub fn new(backend: FtpBackend, path: String) -> Self {
-        FtpWriter {
-            backend,
-            path,
-            fut: None,
-        }
+        FtpWriter { backend, path }
     }
 }
 
@@ -56,39 +47,18 @@ impl FtpWriter {
 unsafe impl Sync for FtpWriter {}
 
 #[async_trait]
-impl oio::Write for FtpWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        loop {
-            if let Some(fut) = self.fut.as_mut() {
-                let res = ready!(fut.poll_unpin(cx));
-                self.fut = None;
-                return Poll::Ready(res);
-            }
-
-            let size = bs.remaining();
-            let bs = bs.bytes(size);
+impl oio::OneShotWrite for FtpWriter {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let size = bs.remaining();
+        let bs = bs.bytes(size);
 
-            let path = self.path.clone();
-            let backend = self.backend.clone();
-            let fut = async move {
-                let mut ftp_stream = 
backend.ftp_connect(Operation::Write).await?;
-                let mut data_stream = 
ftp_stream.append_with_stream(&path).await?;
-                data_stream.write_all(&bs).await.map_err(|err| {
-                    Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-                })?;
-
-                ftp_stream.finalize_put_stream(data_stream).await?;
-                Ok(size)
-            };
-            self.fut = Some(Box::pin(fut));
-        }
-    }
-
-    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
-    }
+        let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
+        let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
+        data_stream.write_all(&bs).await.map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+        })?;
 
-    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Ok(()))
+        ftp_stream.finalize_put_stream(data_stream).await?;
+        Ok(())
     }
 }

Reply via email to