This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit a20afd7b06dfc08755559e970e4fd40354e4ca5b
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 15:31:10 2023 +0800

    Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/azdfs/writer.rs |  5 +--
 core/src/services/fs/writer.rs    | 65 +++++++++++++++++++++++++++------------
 2 files changed, 49 insertions(+), 21 deletions(-)

diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 100065a7e..774c9ee4b 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 use http::StatusCode;
@@ -90,10 +91,10 @@ impl oio::Write for AzdfsWriter {
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
+        Poll::Ready(Ok(()))
     }
 }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 3d94a8888..a71de17d5 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -17,9 +17,13 @@
 
 use std::io::Write;
 use std::path::PathBuf;
+use std::pin::Pin;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
-use tokio::io::AsyncWriteExt;
+use futures::future::BoxFuture;
+use futures::FutureExt;
+use tokio::io::AsyncWrite;
 
 use super::error::parse_io_error;
 use crate::raw::*;
@@ -28,7 +32,9 @@ use crate::*;
 pub struct FsWriter<F> {
     target_path: PathBuf,
     tmp_path: Option<PathBuf>,
-    f: F,
+
+    f: Option<F>,
+    fut: Option<BoxFuture<'static, Result<()>>>,
 }
 
 impl<F> FsWriter<F> {
@@ -37,6 +43,7 @@ impl<F> FsWriter<F> {
             target_path,
             tmp_path,
             f,
+            fut: None,
         }
     }
 }
@@ -44,39 +51,59 @@ impl<F> FsWriter<F> {
 #[async_trait]
 impl oio::Write for FsWriter<tokio::fs::File> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        self.f.write(bs.chunk()).await.map_err(parse_io_error)
+        let f = self.f.as_mut().expect("FsWriter must be initialized");
+
+        Pin::new(f)
+            .poll_write(cx, bs.chunk())
+            .map_err(parse_io_error)
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support abort",
-        ))
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.f = None;
+
+        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.f.sync_all().await.map_err(parse_io_error)?;
+        loop {
+            if let Some(fut) = self.fut.as_mut() {
+                let res = ready!(fut.poll_unpin(cx));
+                self.fut = None;
+                return Poll::Ready(res);
+            }
 
-        if let Some(tmp_path) = &self.tmp_path {
-            tokio::fs::rename(tmp_path, &self.target_path)
-                .await
-                .map_err(parse_io_error)?;
-        }
+            let f = self.f.take().expect("FsWriter must be initialized");
+            let tmp_path = self.tmp_path.clone();
+            let target_path = self.target_path.clone();
+            self.fut = Some(Box::pin(async {
+                f.sync_all().await.map_err(parse_io_error)?;
 
-        Ok(())
+                if let Some(tmp_path) = &tmp_path {
+                    tokio::fs::rename(tmp_path, &target_path)
+                        .await
+                        .map_err(parse_io_error)?;
+                }
+
+                Ok(())
+            }));
+        }
     }
 }
 
 impl oio::BlockingWrite for FsWriter<std::fs::File> {
     fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
-        self.f.write(bs.chunk()).map_err(parse_io_error)
+        let f = self.f.as_mut().expect("FsWriter must be initialized");
+
+        f.write(bs.chunk()).map_err(parse_io_error)
     }
 
     fn close(&mut self) -> Result<()> {
-        self.f.sync_all().map_err(parse_io_error)?;
+        if let Some(f) = self.f.take() {
+            f.sync_all().map_err(parse_io_error)?;
 
-        if let Some(tmp_path) = &self.tmp_path {
-            std::fs::rename(tmp_path, 
&self.target_path).map_err(parse_io_error)?;
+            if let Some(tmp_path) = &self.tmp_path {
+                std::fs::rename(tmp_path, 
&self.target_path).map_err(parse_io_error)?;
+            }
         }
 
         Ok(())

Reply via email to