This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ace5c58b07dd5a77226e92ca42d2c46f6ab5f875 Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 11 16:30:10 2023 +0800 Fix fs abort Signed-off-by: Xuanwo <[email protected]> --- core/src/services/fs/writer.rs | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 26105115b..6964b4150 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -63,12 +63,6 @@ impl oio::Write for FsWriter<tokio::fs::File> { .map_err(parse_io_error) } - fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - self.f = None; - - Poll::Ready(Ok(())) - } - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { loop { if let Some(fut) = self.fut.as_mut() { @@ -93,6 +87,31 @@ impl oio::Write for FsWriter<tokio::fs::File> { })); } } + + fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { + loop { + if let Some(fut) = self.fut.as_mut() { + let res = ready!(fut.poll_unpin(cx)); + self.fut = None; + return Poll::Ready(res); + } + + let _ = self.f.take().expect("FsWriter must be initialized"); + let tmp_path = self.tmp_path.clone(); + self.fut = Some(Box::pin(async move { + if let Some(tmp_path) = &tmp_path { + tokio::fs::remove_file(tmp_path) + .await + .map_err(parse_io_error) + } else { + Err(Error::new( + ErrorKind::Unsupported, + "Fs doesn't support abort if atomic_write_dir is not set", + )) + } + })); + } + } } impl oio::BlockingWrite for FsWriter<std::fs::File> {
