This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 34fa1e9b7a2c1e3af15efd88bd998797ce26843f Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 15:39:51 2023 +0800 save work Signed-off-by: Xuanwo <[email protected]> --- core/src/services/ftp/writer.rs | 46 +++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 9fcfd989d..34ed85b37 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -16,7 +16,9 @@ // under the License. use async_trait::async_trait; +use futures::future::BoxFuture; use futures::AsyncWriteExt; +use std::task::{ready, Context, Poll}; use super::backend::FtpBackend; use crate::raw::*; @@ -25,41 +27,59 @@ use crate::*; pub struct FtpWriter { backend: FtpBackend, path: String, + + fut: Option<BoxFuture<'static, Result<usize>>>, } /// # TODO /// /// Writer is not implemented correctly. /// -/// After we can use datastream, we should return it directly. +/// After we can use data stream, we should return it directly. impl FtpWriter { pub fn new(backend: FtpBackend, path: String) -> Self { - FtpWriter { backend, path } + FtpWriter { + backend, + path, + fut: None, + } } } #[async_trait] impl oio::Write for FtpWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - let bs = bs.copy_to_bytes(size); + loop { + if let Some(fut) = self.fut.as_mut() { + let res = ready!(fut.poll_unpin(cx)); + self.fut = None; + return Poll::Ready(res); + } - let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; - let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; - data_stream.write_all(&bs).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; + let size = bs.remaining(); + let bs = bs.copy_to_bytes(size); - ftp_stream.finalize_put_stream(data_stream).await?; + let path = self.path.clone(); + let backend = self.backend.clone(); + let fut = async { + let mut ftp_stream = backend.ftp_connect(Operation::Write).await?; + let mut data_stream = ftp_stream.append_with_stream(&path).await?; + data_stream.write_all(&bs).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; - Ok(size) + ftp_stream.finalize_put_stream(data_stream).await?; + Ok(size) + }; + self.fut = Some(Box::pin(fut)); + } } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } }
