This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 919d554ce0a627455ae85ab2b4cc5ad256d168f2
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 17:27:08 2023 +0800

    Build pass
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/utils.rs                        | 13 ++---
 core/benches/oio/write.rs                        |  2 +-
 core/src/raw/oio/write/api.rs                    |  4 +-
 core/src/raw/oio/write/append_object_write.rs    | 11 ++---
 core/src/raw/oio/write/exact_buf_write.rs        |  4 +-
 core/src/raw/oio/write/multipart_upload_write.rs | 22 ++++-----
 core/src/raw/oio/write/one_shot_write.rs         |  4 +-
 core/src/services/fs/writer.rs                   |  4 +-
 core/src/services/ftp/writer.rs                  |  3 +-
 core/src/services/ghac/writer.rs                 | 60 +++++++++++++-----------
 core/src/services/sftp/writer.rs                 | 11 +++--
 11 files changed, 74 insertions(+), 64 deletions(-)

diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs
index 67b4fd451..b3ea04027 100644
--- a/core/benches/oio/utils.rs
+++ b/core/benches/oio/utils.rs
@@ -20,6 +20,7 @@ use bytes::Bytes;
 use opendal::raw::oio;
 use rand::prelude::ThreadRng;
 use rand::RngCore;
+use std::task::{Context, Poll};
 
 /// BlackHoleWriter will discard all data written to it so we can measure the 
buffer's cost.
 pub struct BlackHoleWriter;
@@ -30,16 +31,16 @@ impl oio::Write for BlackHoleWriter {
         &mut self,
         cx: &mut Context<'_>,
         bs: &dyn oio::WriteBuf,
-    ) -> opendal::Result<usize> {
-        Ok(bs.remaining())
+    ) -> Poll<opendal::Result<usize>> {
+        Poll::Ready(Ok(bs.remaining()))
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> {
-        Ok(())
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> 
Poll<opendal::Result<()>> {
+        Poll::Ready(Ok(()))
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> opendal::Result<()> {
-        Ok(())
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> 
Poll<opendal::Result<()>> {
+        Poll::Ready(Ok(()))
     }
 }
 
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 8da2a6dd0..3ab71cdcf 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -18,8 +18,8 @@
 use bytes::Buf;
 use criterion::Criterion;
 use once_cell::sync::Lazy;
-use opendal::raw::oio::ExactBufWriter;
 use opendal::raw::oio::Write;
+use opendal::raw::oio::{ExactBufWriter, WriteExt};
 use rand::thread_rng;
 use size::Size;
 
diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs
index 41bd93491..c05f061c7 100644
--- a/core/src/raw/oio/write/api.rs
+++ b/core/src/raw/oio/write/api.rs
@@ -166,8 +166,8 @@ where
     type Output = Result<usize>;
 
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> 
{
-        let this = self.project();
-        Pin::new(this.writer).poll_write(cx, this.buf)
+        let mut this = self.project();
+        Pin::new(this.writer).poll_write(cx, *this.buf)
     }
 }
 
diff --git a/core/src/raw/oio/write/append_object_write.rs 
b/core/src/raw/oio/write/append_object_write.rs
index 393254097..5cbaedb37 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -32,7 +32,7 @@ use crate::*;
 /// - `AppendObjectWriter` impl `Write`
 /// - Expose `AppendObjectWriter` as `Accessor::Writer`
 #[async_trait]
-pub trait AppendObjectWrite: Send + Sync + Unpin {
+pub trait AppendObjectWrite: Send + Sync + Unpin + 'static {
     /// Get the current offset of the append object.
     ///
     /// Returns `0` if the object is not exist.
@@ -90,9 +90,9 @@ where
                             let bs = bs.copy_to_bytes(size);
 
                             self.state = State::Append(Box::pin(async move {
-                                w.append(offset, size as u64, 
AsyncBody::Bytes(bs)).await?;
+                                let res = w.append(offset, size as u64, 
AsyncBody::Bytes(bs)).await;
 
-                                (w, Ok(size))
+                                (w, res.map(|_| size))
                             }));
                         }
                         None => {
@@ -110,11 +110,10 @@ where
                     self.offset = Some(offset?);
                 }
                 State::Append(fut) => {
-                    let (w, res) = ready!(fut.as_mut().poll(cx));
+                    let (w, size) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
 
-                    let size = res?;
-                    return Poll::Ready(Ok(size));
+                    return Poll::Ready(Ok(size?));
                 }
             }
         }
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index f1615346b..636242280 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -164,11 +164,11 @@ mod tests {
         }
 
         fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            Ok(())
+            Poll::Ready(Ok(()))
         }
 
         fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            Ok(())
+            Poll::Ready(Ok(()))
         }
     }
 
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 97b2f0220..af6f825ac 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -33,7 +33,7 @@ use crate::*;
 /// - `MultipartUploadWriter` impl `Write`
 /// - Expose `MultipartUploadWriter` as `Accessor::Writer`
 #[async_trait]
-pub trait MultipartUploadWrite: Send + Sync + Unpin {
+pub trait MultipartUploadWrite: Send + Sync + Unpin + 'static {
     /// initiate_part will call start a multipart upload and return the upload 
id.
     ///
     /// MultipartUploadWriter will call this when:
@@ -72,6 +72,7 @@ pub trait MultipartUploadWrite: Send + Sync + Unpin {
 ///
 /// - `part_number` is the index of the part, starting from 0.
 /// - `etag` is the `ETag` of the part.
+#[derive(Clone)]
 pub struct MultipartUploadPart {
     /// The number of the part, starting from 0.
     pub part_number: usize,
@@ -91,7 +92,7 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
 enum State<W> {
     Idle(Option<W>),
     Init(BoxFuture<'static, (W, Result<String>)>),
-    Write(BoxFuture<'static, (W, Result<(usize, MultipartUploadPart)>)>),
+    Write(BoxFuture<'static, (W, usize, Result<MultipartUploadPart>)>),
     Close(BoxFuture<'static, (W, Result<()>)>),
     Abort(BoxFuture<'static, (W, Result<()>)>),
 }
@@ -138,9 +139,9 @@ where
                                         size as u64,
                                         AsyncBody::Bytes(bs),
                                     )
-                                    .await?;
+                                    .await;
 
-                                (w, Ok((size, part)))
+                                (w, size, part)
                             }));
                         }
                         None => {
@@ -157,12 +158,11 @@ where
                     self.upload_id = Some(Arc::new(upload_id?));
                 }
                 State::Write(fut) => {
-                    let (w, res) = ready!(fut.as_mut().poll(cx));
+                    let (w, size, part) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
 
-                    let (written, part) = res?;
-                    self.parts.push(part);
-                    return Poll::Ready(Ok(written));
+                    self.parts.push(part?);
+                    return Poll::Ready(Ok(size));
                 }
                 State::Close(_) => {
                     unreachable!(
@@ -183,11 +183,11 @@ where
             match &mut self.state {
                 State::Idle(w) => {
                     let w = w.take().expect("writer must be valid");
-                    match &self.upload_id {
+                    match self.upload_id.clone() {
                         Some(upload_id) => {
                             let parts = self.parts.clone();
                             self.state = State::Close(Box::pin(async move {
-                                let res = w.complete_part(&upload_id, 
&self.parts).await;
+                                let res = w.complete_part(&upload_id, 
&parts).await;
                                 (w, res)
                             }));
                         }
@@ -217,7 +217,7 @@ where
             match &mut self.state {
                 State::Idle(w) => {
                     let w = w.take().expect("writer must be valid");
-                    match &self.upload_id {
+                    match self.upload_id.clone() {
                         Some(upload_id) => {
                             self.state = State::Close(Box::pin(async move {
                                 let res = w.abort_part(&upload_id).await;
diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index 6cc083491..b798da5a4 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -30,7 +30,7 @@ use crate::*;
 ///
 /// The layout after adopting [`OneShotWrite`]:
 #[async_trait]
-pub trait OneShotWrite: Send + Sync + Unpin {
+pub trait OneShotWrite: Send + Sync + Unpin + 'static {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
@@ -71,7 +71,7 @@ impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
 
                     let size = bs.remaining();
                     let bs = bs.copy_to_bytes(size);
-                    let fut = async {
+                    let fut = async move {
                         let res = w.write_once(bs).await;
 
                         (w, res.map(|_| size))
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 1a0aa49eb..26105115b 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -42,7 +42,7 @@ impl<F> FsWriter<F> {
         Self {
             target_path,
             tmp_path,
-            f,
+            f: Some(f),
             fut: None,
         }
     }
@@ -80,7 +80,7 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             let f = self.f.take().expect("FsWriter must be initialized");
             let tmp_path = self.tmp_path.clone();
             let target_path = self.target_path.clone();
-            self.fut = Some(Box::pin(async {
+            self.fut = Some(Box::pin(async move {
                 f.sync_all().await.map_err(parse_io_error)?;
 
                 if let Some(tmp_path) = &tmp_path {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index ec3c3f0e2..5a759aa21 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -18,6 +18,7 @@
 use async_trait::async_trait;
 use futures::future::BoxFuture;
 use futures::AsyncWriteExt;
+use futures::FutureExt;
 use std::task::{ready, Context, Poll};
 
 use super::backend::FtpBackend;
@@ -66,7 +67,7 @@ impl oio::Write for FtpWriter {
 
             let path = self.path.clone();
             let backend = self.backend.clone();
-            let fut = async {
+            let fut = async move {
                 let mut ftp_stream = 
backend.ftp_connect(Operation::Write).await?;
                 let mut data_stream = 
ftp_stream.append_with_stream(&path).await?;
                 data_stream.write_all(&bs).await.map_err(|err| {
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index c780c2710..582a150aa 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -64,21 +64,24 @@ impl oio::Write for GhacWriter {
                     let size = bs.remaining();
                     let bs = bs.copy_to_bytes(size);
 
-                    let fut = async {
-                        let req = backend
-                            .ghac_upload(cache_id, size as u64, 
AsyncBody::Bytes(bs))
-                            .await?;
-
-                        let resp = backend.client.send(req).await?;
-
-                        let res = if resp.status().is_success() {
-                            resp.into_body().consume().await?;
-                            Ok(size)
-                        } else {
-                            Err(parse_error(resp)
-                                .await
-                                .map(|err| 
err.with_operation("Backend::ghac_upload"))?)
-                        };
+                    let fut = async move {
+                        let res = async {
+                            let req = backend
+                                .ghac_upload(cache_id, size as u64, 
AsyncBody::Bytes(bs))
+                                .await?;
+
+                            let resp = backend.client.send(req).await?;
+
+                            if resp.status().is_success() {
+                                resp.into_body().consume().await?;
+                                Ok(size)
+                            } else {
+                                Err(parse_error(resp)
+                                    .await
+                                    .map(|err| 
err.with_operation("Backend::ghac_upload"))?)
+                            }
+                        }
+                        .await;
 
                         (backend, res)
                     };
@@ -114,18 +117,21 @@ impl oio::Write for GhacWriter {
                     let cache_id = self.cache_id;
                     let size = self.size;
 
-                    let fut = async {
-                        let req = backend.ghac_commit(cache_id, size).await?;
-                        let resp = backend.client.send(req).await?;
-
-                        let res = if resp.status().is_success() {
-                            resp.into_body().consume().await?;
-                            Ok(())
-                        } else {
-                            Err(parse_error(resp)
-                                .await
-                                .map(|err| 
err.with_operation("Backend::ghac_commit"))?)
-                        };
+                    let fut = async move {
+                        let res = async {
+                            let req = backend.ghac_commit(cache_id, 
size).await?;
+                            let resp = backend.client.send(req).await?;
+
+                            if resp.status().is_success() {
+                                resp.into_body().consume().await?;
+                                Ok(size as usize)
+                            } else {
+                                Err(parse_error(resp)
+                                    .await
+                                    .map(|err| 
err.with_operation("Backend::ghac_commit"))?)
+                            }
+                        }
+                        .await;
 
                         (backend, res)
                     };
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index fb1ab0a3e..455e0fdbc 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -37,9 +37,11 @@ impl SftpWriter {
 #[async_trait]
 impl oio::Write for SftpWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        Pin::new(&mut self.file)
-            .poll_write(cx, bs.chunk())
-            .map_err(Error::from)
+        // Pin::new(&mut self.file)
+        //     .poll_write(cx, bs.chunk())
+        //     .map_err(Error::from)
+
+        todo!()
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
@@ -47,6 +49,7 @@ impl oio::Write for SftpWriter {
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Pin::new(&mut self.file).poll_flush(cx).map_err(Error::from)
+        // Pin::new(&mut self.file).poll_flush(cx).map_err(Error::from)
+        todo!()
     }
 }

Reply via email to