This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch write_can_multig
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 1fd07e3c68bd03d5ddadbcc600529fbd307ea9b8
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 08:36:46 2023 +0800

    delay write for oneshot
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/one_shot_write.rs     | 68 ++++++++++++++++++++--------
 core/src/services/azblob/writer.rs           |  6 +--
 core/src/services/azdfs/writer.rs            | 15 +++---
 core/src/services/dropbox/writer.rs          |  9 ++--
 core/src/services/gdrive/writer.rs           |  4 +-
 core/src/services/ipmfs/backend.rs           |  6 +--
 core/src/services/ipmfs/writer.rs            |  5 +-
 core/src/services/onedrive/writer.rs         |  4 +-
 core/src/services/supabase/writer.rs         | 11 +++--
 core/src/services/vercel_artifacts/writer.rs | 11 +++--
 core/src/services/wasabi/writer.rs           | 10 ++--
 core/src/services/webdav/writer.rs           | 10 ++--
 core/src/services/webhdfs/writer.rs          | 10 ++--
 13 files changed, 103 insertions(+), 66 deletions(-)

diff --git a/core/src/raw/oio/write/one_shot_write.rs 
b/core/src/raw/oio/write/one_shot_write.rs
index c56679e19..4efb3655b 100644
--- a/core/src/raw/oio/write/one_shot_write.rs
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -20,7 +20,6 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use futures::future::BoxFuture;
 
 use crate::raw::*;
@@ -37,17 +36,18 @@ pub trait OneShotWrite: Send + Sync + Unpin + 'static {
     /// write_once write all data at once.
     ///
     /// Implementations should make sure that the data is written correctly at 
once.
-    async fn write_once(&self, bs: Bytes) -> Result<()>;
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()>;
 }
 
 /// OneShotWrite is used to implement [`Write`] based on one shot.
 pub struct OneShotWriter<W: OneShotWrite> {
     state: State<W>,
+    buffer: Option<oio::ChunkedBytes>,
 }
 
 enum State<W> {
     Idle(Option<W>),
-    Write(BoxFuture<'static, (W, Result<usize>)>),
+    Write(BoxFuture<'static, (W, Result<()>)>),
 }
 
 /// # Safety
@@ -60,45 +60,73 @@ impl<W: OneShotWrite> OneShotWriter<W> {
     pub fn new(inner: W) -> Self {
         Self {
             state: State::Idle(Some(inner)),
+            buffer: None,
         }
     }
 }
 
 #[async_trait]
 impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+    fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
+        loop {
+            match &mut self.state {
+                State::Idle(_) => {
+                    return match &self.buffer {
+                        Some(_) => Poll::Ready(Err(Error::new(
+                            ErrorKind::Unsupported,
+                            "OneShotWriter doesn't support multiple write",
+                        ))),
+                        None => {
+                            let size = bs.remaining();
+                            let bs = bs.vectored_bytes(size);
+                            self.buffer = 
Some(oio::ChunkedBytes::from_vec(bs));
+                            Poll::Ready(Ok(size))
+                        }
+                    }
+                }
+                State::Write(_) => {
+                    unreachable!("OneShotWriter must not go into State::Write 
during poll_write")
+                }
+            }
+        }
+    }
+
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match &mut self.state {
                 State::Idle(w) => {
                     let w = w.take().expect("writer must be valid");
 
-                    let size = bs.remaining();
-                    let bs = bs.bytes(size);
-                    let fut = async move {
-                        let res = w.write_once(bs).await;
+                    match self.buffer.clone() {
+                        Some(bs) => {
+                            let fut = Box::pin(async move {
+                                let res = w.write_once(&bs).await;
 
-                        (w, res.map(|_| size))
-                    };
+                                (w, res)
+                            });
+                            self.state = State::Write(fut);
+                        }
+                        None => {
+                            let fut = Box::pin(async move {
+                                let res = w.write_once(&"".as_bytes()).await;
 
-                    self.state = State::Write(Box::pin(fut));
+                                (w, res)
+                            });
+                            self.state = State::Write(fut);
+                        }
+                    };
                 }
                 State::Write(fut) => {
-                    let (w, size) = ready!(fut.as_mut().poll(cx));
+                    let (w, res) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
-                    return Poll::Ready(size);
+                    return Poll::Ready(res);
                 }
             }
         }
     }
 
     fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        Poll::Ready(Err(Error::new(
-            ErrorKind::Unsupported,
-            "OneShotWriter doesn't support abort since all content has been 
flushed",
-        )))
-    }
-
-    fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        self.buffer = None;
         Poll::Ready(Ok(()))
     }
 }
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index 029278ad5..301c9f733 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzblobCore;
@@ -46,13 +45,14 @@ impl AzblobWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for AzblobWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
         let mut req = self.core.azblob_put_blob_request(
             &self.path,
             Some(bs.len() as u64),
             self.op.content_type(),
             self.op.cache_control(),
-            AsyncBody::Bytes(bs),
+            AsyncBody::ChunkedBytes(bs),
         )?;
 
         self.core.sign(&mut req).await?;
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 9e2a962f2..0424fe494 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzdfsCore;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -41,7 +41,7 @@ impl AzdfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for AzdfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -66,11 +66,12 @@ impl oio::OneShotWrite for AzdfsWriter {
             }
         }
 
-        let size = bs.len();
-
-        let mut req =
-            self.core
-                .azdfs_update_request(&self.path, Some(size), 
AsyncBody::Bytes(bs))?;
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+        let mut req = self.core.azdfs_update_request(
+            &self.path,
+            Some(bs.len()),
+            AsyncBody::ChunkedBytes(bs),
+        )?;
 
         self.core.sign(&mut req).await?;
 
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 3a5c6cdd7..b5a4a69af 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::DropboxCore;
@@ -40,16 +39,16 @@ impl DropboxWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for DropboxWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .core
             .dropbox_update(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
         let status = resp.status();
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index e70f1754e..445ecb36f 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::GdriveCore;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -88,7 +89,8 @@ impl GdriveWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for GdriveWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
         let size = bs.len();
         if self.file_id.is_none() {
             self.write_create(size as u64, bs).await?;
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index ac70369c9..e7999767b 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -21,7 +21,6 @@ use std::str;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::Request;
 use http::Response;
 use http::StatusCode;
@@ -283,7 +282,7 @@ impl IpmfsBackend {
     pub async fn ipmfs_write(
         &self,
         path: &str,
-        body: Bytes,
+        body: oio::ChunkedBytes,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_rooted_abs_path(&self.root, path);
 
@@ -293,7 +292,8 @@ impl IpmfsBackend {
             percent_encode_path(&p)
         );
 
-        let multipart = 
Multipart::new().part(FormDataPart::new("data").content(body));
+        let multipart = Multipart::new()
+            .part(FormDataPart::new("data").stream(body.len() as u64, 
Box::new(body)));
 
         let req: http::request::Builder = Request::post(url);
         let req = multipart.apply(req)?;
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index e6a32ac1b..96d9dab18 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::IpmfsBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -38,7 +38,8 @@ impl IpmfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for IpmfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
         let resp = self.backend.ipmfs_write(&self.path, bs).await?;
 
         let status = resp.status();
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 5ad92ac63..326035acb 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -23,6 +23,7 @@ use super::backend::OnedriveBackend;
 use super::error::parse_error;
 use super::graph_model::OneDriveUploadSessionCreationRequestBody;
 use super::graph_model::OneDriveUploadSessionCreationResponseBody;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -45,7 +46,8 @@ impl OneDriveWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for OneDriveWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = bs.bytes(bs.remaining());
         let size = bs.len();
 
         if size <= Self::MAX_SIMPLE_SIZE {
diff --git a/core/src/services/supabase/writer.rs 
b/core/src/services/supabase/writer.rs
index c7fc6c8f8..b6929c4f5 100644
--- a/core/src/services/supabase/writer.rs
+++ b/core/src/services/supabase/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -45,13 +45,14 @@ impl SupabaseWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for SupabaseWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
+
         let mut req = self.core.supabase_upload_object_request(
             &self.path,
-            Some(size),
+            Some(bs.len()),
             self.op.content_type(),
-            AsyncBody::Bytes(bs),
+            AsyncBody::ChunkedBytes(bs),
         )?;
 
         self.core.sign(&mut req)?;
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index f62d76cf5..058e57041 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::VercelArtifactsBackend;
@@ -43,12 +42,16 @@ impl VercelArtifactsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for VercelArtifactsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn oio::WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .backend
-            .vercel_artifacts_put(self.path.as_str(), size as u64, 
AsyncBody::Bytes(bs))
+            .vercel_artifacts_put(
+                self.path.as_str(),
+                bs.len() as u64,
+                AsyncBody::ChunkedBytes(bs),
+            )
             .await?;
 
         let status = resp.status();
diff --git a/core/src/services/wasabi/writer.rs 
b/core/src/services/wasabi/writer.rs
index f9e27334e..cd76bf82c 100644
--- a/core/src/services/wasabi/writer.rs
+++ b/core/src/services/wasabi/writer.rs
@@ -18,11 +18,11 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -41,18 +41,18 @@ impl WasabiWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WasabiWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .core
             .put_object(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
                 self.op.content_disposition(),
                 self.op.cache_control(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
 
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index 5b6ea319f..42de4fc0e 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebdavBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -39,17 +39,17 @@ impl WebdavWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WebdavWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let resp = self
             .backend
             .webdav_put(
                 &self.path,
-                Some(size as u64),
+                Some(bs.len() as u64),
                 self.op.content_type(),
                 self.op.content_disposition(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
 
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index b323c0173..38cd6d577 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,11 +16,11 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
 use super::error::parse_error;
+use crate::raw::oio::WriteBuf;
 use crate::raw::*;
 use crate::*;
 
@@ -39,16 +39,16 @@ impl WebhdfsWriter {
 
 #[async_trait]
 impl oio::OneShotWrite for WebhdfsWriter {
-    async fn write_once(&self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
+    async fn write_once(&self, bs: &dyn WriteBuf) -> Result<()> {
+        let bs = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(bs.remaining()));
 
         let req = self
             .backend
             .webhdfs_create_object_request(
                 &self.path,
-                Some(size),
+                Some(bs.len()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::ChunkedBytes(bs),
             )
             .await?;
 

Reply via email to