This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 34fa1e9b7a2c1e3af15efd88bd998797ce26843f
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 15:39:51 2023 +0800

    save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/ftp/writer.rs | 46 +++++++++++++++++++++++++++++------------
 1 file changed, 33 insertions(+), 13 deletions(-)

diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 9fcfd989d..34ed85b37 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -16,7 +16,9 @@
 // under the License.
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
 use futures::AsyncWriteExt;
+use std::task::{ready, Context, Poll};
 
 use super::backend::FtpBackend;
 use crate::raw::*;
@@ -25,41 +27,59 @@ use crate::*;
 pub struct FtpWriter {
     backend: FtpBackend,
     path: String,
+
+    fut: Option<BoxFuture<'static, Result<usize>>>,
 }
 
 /// # TODO
 ///
 /// Writer is not implemented correctly.
 ///
-/// After we can use datastream, we should return it directly.
+/// After we can use data stream, we should return it directly.
 impl FtpWriter {
     pub fn new(backend: FtpBackend, path: String) -> Self {
-        FtpWriter { backend, path }
+        FtpWriter {
+            backend,
+            path,
+            fut: None,
+        }
     }
 }
 
 #[async_trait]
 impl oio::Write for FtpWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-        let bs = bs.copy_to_bytes(size);
+        loop {
+            if let Some(fut) = self.fut.as_mut() {
+                let res = ready!(fut.poll_unpin(cx));
+                self.fut = None;
+                return Poll::Ready(res);
+            }
 
-        let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?;
-        let mut data_stream = ftp_stream.append_with_stream(&self.path).await?;
-        data_stream.write_all(&bs).await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
-        })?;
+            let size = bs.remaining();
+            let bs = bs.copy_to_bytes(size);
 
-        ftp_stream.finalize_put_stream(data_stream).await?;
+            let path = self.path.clone();
+            let backend = self.backend.clone();
+            let fut = async {
+                let mut ftp_stream = 
backend.ftp_connect(Operation::Write).await?;
+                let mut data_stream = 
ftp_stream.append_with_stream(&path).await?;
+                data_stream.write_all(&bs).await.map_err(|err| {
+                    Error::new(ErrorKind::Unexpected, "copy from ftp 
stream").set_source(err)
+                })?;
 
-        Ok(size)
+                ftp_stream.finalize_put_stream(data_stream).await?;
+                Ok(size)
+            };
+            self.fut = Some(Box::pin(fut));
+        }
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 }

Reply via email to