This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch write_can_multig in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ec28cb41082bdaa1be5236206fbae1bfec946f2c Author: Xuanwo <[email protected]> AuthorDate: Wed Sep 13 09:24:03 2023 +0800 Fix ftp Signed-off-by: Xuanwo <[email protected]> --- core/src/services/ftp/backend.rs | 11 ++++---- core/src/services/ftp/writer.rs | 60 ++++++++++------------------------------ 2 files changed, 21 insertions(+), 50 deletions(-) diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 5e33d8e36..2fe8ffe9a 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -42,6 +42,7 @@ use super::pager::FtpPager; use super::util::FtpReader; use super::writer::FtpWriter; use crate::raw::*; +use crate::services::ftp::writer::FtpWriters; use crate::*; /// FTP and FTPS services support. @@ -264,7 +265,7 @@ impl Debug for FtpBackend { impl Accessor for FtpBackend { type Reader = FtpReader; type BlockingReader = (); - type Writer = FtpWriter; + type Writer = FtpWriters; type BlockingWriter = (); type Pager = FtpPager; type BlockingPager = (); @@ -374,10 +375,10 @@ impl Accessor for FtpBackend { } } - Ok(( - RpWrite::new(), - FtpWriter::new(self.clone(), path.to_string()), - )) + let w = FtpWriter::new(self.clone(), path.to_string()); + let w = oio::OneShotWriter::new(w); + + Ok((RpWrite::new(), w)) } async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 4b2658907..79ae8e55d 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -15,24 +15,19 @@ // specific language governing permissions and limitations // under the License. -use std::task::ready; -use std::task::Context; -use std::task::Poll; - use async_trait::async_trait; -use futures::future::BoxFuture; use futures::AsyncWriteExt; -use futures::FutureExt; use super::backend::FtpBackend; +use crate::raw::oio::WriteBuf; use crate::raw::*; use crate::*; +pub type FtpWriters = oio::OneShotWriter<FtpWriter>; + pub struct FtpWriter { backend: FtpBackend, path: String, - - fut: Option<BoxFuture<'static, Result<usize>>>, } /// # TODO @@ -42,11 +37,7 @@ pub struct FtpWriter { /// After we can use data stream, we should return it directly. impl FtpWriter { pub fn new(backend: FtpBackend, path: String) -> Self { - FtpWriter { - backend, - path, - fut: None, - } + FtpWriter { backend, path } } } @@ -56,39 +47,18 @@ impl FtpWriter { unsafe impl Sync for FtpWriter {} #[async_trait] -impl oio::Write for FtpWriter { - fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - loop { - if let Some(fut) = self.fut.as_mut() { - let res = ready!(fut.poll_unpin(cx)); - self.fut = None; - return Poll::Ready(res); - } - - let size = bs.remaining(); - let bs = bs.bytes(size); +impl oio::OneShotWrite for FtpWriter { + async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> { + let size = bs.remaining(); + let bs = bs.bytes(size); - let path = self.path.clone(); - let backend = self.backend.clone(); - let fut = async move { - let mut ftp_stream = backend.ftp_connect(Operation::Write).await?; - let mut data_stream = ftp_stream.append_with_stream(&path).await?; - data_stream.write_all(&bs).await.map_err(|err| { - Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) - })?; - - ftp_stream.finalize_put_stream(data_stream).await?; - Ok(size) - }; - self.fut = Some(Box::pin(fut)); - } - } - - fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) - } + let mut ftp_stream = self.backend.ftp_connect(Operation::Write).await?; + let mut data_stream = ftp_stream.append_with_stream(&self.path).await?; + data_stream.write_all(&bs).await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "copy from ftp stream").set_source(err) + })?; - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) + ftp_stream.finalize_put_stream(data_stream).await?; + Ok(()) } }
