This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit a20afd7b06dfc08755559e970e4fd40354e4ca5b Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 15:31:10 2023 +0800 Fix build Signed-off-by: Xuanwo <[email protected]> --- core/src/services/azdfs/writer.rs | 5 +-- core/src/services/fs/writer.rs | 65 +++++++++++++++++++++++++++------------ 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 100065a7e..774c9ee4b 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::task::{Context, Poll}; use async_trait::async_trait; use http::StatusCode; @@ -90,10 +91,10 @@ impl oio::Write for AzdfsWriter { } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Ok(()) + Poll::Ready(Ok(())) } } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 3d94a8888..a71de17d5 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -17,9 +17,13 @@ use std::io::Write; use std::path::PathBuf; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; -use tokio::io::AsyncWriteExt; +use futures::future::BoxFuture; +use futures::FutureExt; +use tokio::io::AsyncWrite; use super::error::parse_io_error; use crate::raw::*; @@ -28,7 +32,9 @@ use crate::*; pub struct FsWriter<F> { target_path: PathBuf, tmp_path: Option<PathBuf>, - f: F, + + f: Option<F>, + fut: Option<BoxFuture<'static, Result<()>>>, } impl<F> FsWriter<F> { @@ -37,6 +43,7 @@ impl<F> FsWriter<F> { target_path, tmp_path, f, + fut: None, } } } @@ -44,39 +51,59 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - self.f.write(bs.chunk()).await.map_err(parse_io_error) + let f = self.f.as_mut().expect("FsWriter must be initialized"); + + Pin::new(f) + .poll_write(cx, bs.chunk()) + .map_err(parse_io_error) } - fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support abort", - )) + fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + self.f = None; + + Poll::Ready(Ok(())) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.f.sync_all().await.map_err(parse_io_error)?; + loop { + if let Some(fut) = self.fut.as_mut() { + let res = ready!(fut.poll_unpin(cx)); + self.fut = None; + return Poll::Ready(res); + } - if let Some(tmp_path) = &self.tmp_path { - tokio::fs::rename(tmp_path, &self.target_path) - .await - .map_err(parse_io_error)?; - } + let f = self.f.take().expect("FsWriter must be initialized"); + let tmp_path = self.tmp_path.clone(); + let target_path = self.target_path.clone(); + self.fut = Some(Box::pin(async { + f.sync_all().await.map_err(parse_io_error)?; - Ok(()) + if let Some(tmp_path) = &tmp_path { + tokio::fs::rename(tmp_path, &target_path) + .await + .map_err(parse_io_error)?; + } + + Ok(()) + })); + } } } impl oio::BlockingWrite for FsWriter<std::fs::File> { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { - self.f.write(bs.chunk()).map_err(parse_io_error) + let f = self.f.as_mut().expect("FsWriter must be initialized"); + + f.write(bs.chunk()).map_err(parse_io_error) } fn close(&mut self) -> Result<()> { - self.f.sync_all().map_err(parse_io_error)?; + if let Some(f) = self.f.take() { + f.sync_all().map_err(parse_io_error)?; - if let Some(tmp_path) = &self.tmp_path { - std::fs::rename(tmp_path, &self.target_path).map_err(parse_io_error)?; + if let Some(tmp_path) = &self.tmp_path { + std::fs::rename(tmp_path, &self.target_path).map_err(parse_io_error)?; + } } Ok(())
