This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit a5e53461769aa6b357f50bf7c7a078065d674cb6
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 16:30:06 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/dropbox/writer.rs          | 20 ++++++-----------
 core/src/services/gdrive/writer.rs           | 31 ++++++---------------------
 core/src/services/onedrive/writer.rs         | 19 +++++------------
 core/src/services/vercel_artifacts/writer.rs | 24 ++++++---------------
 core/src/services/webdav/writer.rs           | 32 ++++++++--------------------
 core/src/services/webhdfs/writer.rs          | 20 ++++++-----------
 6 files changed, 39 insertions(+), 107 deletions(-)

diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index c7887a7d7..5e3e19a92 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::DropboxCore;
@@ -37,10 +38,9 @@ impl DropboxWriter {
     }
 }
 
-#[async_trait]
-impl oio::Write for DropboxWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
+impl oio::OneShotWrite for DropboxWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
         let resp = self
             .core
@@ -48,24 +48,16 @@ impl oio::Write for DropboxWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+                AsyncBody::Bytes(bs),
             )
             .await?;
         let status = resp.status();
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size)
+                Ok(())
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
 }
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index b733863a8..f2f0b4e1c 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -49,7 +49,7 @@ impl GdriveWriter {
     ///
     /// This is used for small objects.
     /// And should overwrite the object if it already exists.
-    pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> 
{
+    pub async fn write_create(&self, size: u64, body: Bytes) -> Result<()> {
         let resp = self
             .core
             .gdrive_upload_simple_request(&self.path, size, body)
@@ -59,13 +59,7 @@ impl GdriveWriter {
 
         match status {
             StatusCode::OK | StatusCode::CREATED => {
-                let bs = resp.into_body().bytes().await?;
-
-                let file = serde_json::from_slice::<GdriveFile>(&bs)
-                    .map_err(new_json_deserialize_error)?;
-
-                self.file_id = Some(file.id);
-
+                resp.into_body().consume().await?;
                 Ok(())
             }
             _ => Err(parse_error(resp).await?),
@@ -93,26 +87,15 @@ impl GdriveWriter {
     }
 }
 
-#[async_trait]
-impl oio::Write for GdriveWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
+impl oio::OneShotWrite for GdriveWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
         if self.file_id.is_none() {
-            self.write_create(size as u64, bs.copy_to_bytes(size))
-                .await?;
+            self.write_create(size as u64, bs).await?;
         } else {
-            self.write_overwrite(size as u64, bs.copy_to_bytes(size))
-                .await?;
+            self.write_overwrite(size as u64, bs).await?;
         }
 
-        Ok(size)
-    }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         Ok(())
     }
 }
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 46aa50a21..76fc734d1 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -23,6 +23,7 @@ use super::backend::OnedriveBackend;
 use super::error::parse_error;
 use super::graph_model::OneDriveUploadSessionCreationRequestBody;
 use super::graph_model::OneDriveUploadSessionCreationResponseBody;
+use crate::raw::oio::OneShotWriter;
 use crate::raw::*;
 use crate::*;
 
@@ -43,11 +44,9 @@ impl OneDriveWriter {
     }
 }
 
-#[async_trait]
-impl oio::Write for OneDriveWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-        let bs = bs.copy_to_bytes(size);
+impl oio::OneShotWrite for OneDriveWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
         if size <= Self::MAX_SIMPLE_SIZE {
             self.write_simple(bs).await?;
@@ -55,20 +54,12 @@ impl oio::Write for OneDriveWriter {
             self.write_chunked(bs).await?;
         }
 
-        Ok(size)
-    }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         Ok(())
     }
 }
 
 impl OneDriveWriter {
-    async fn write_simple(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_simple(&self, bs: Bytes) -> Result<()> {
         let resp = self
             .backend
             .onedrive_upload_simple(
diff --git a/core/src/services/vercel_artifacts/writer.rs 
b/core/src/services/vercel_artifacts/writer.rs
index 6579374a7..596c933cf 100644
--- a/core/src/services/vercel_artifacts/writer.rs
+++ b/core/src/services/vercel_artifacts/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::VercelArtifactsBackend;
@@ -36,18 +37,13 @@ impl VercelArtifactsWriter {
     }
 }
 
-#[async_trait]
-impl oio::Write for VercelArtifactsWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
+impl oio::OneShotWrite for VercelArtifactsWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
         let resp = self
             .backend
-            .vercel_artifacts_put(
-                self.path.as_str(),
-                self.op.content_length().unwrap(),
-                AsyncBody::Bytes(bs.copy_to_bytes(size)),
-            )
+            .vercel_artifacts_put(self.path.as_str(), size as u64, 
AsyncBody::Bytes(bs))
             .await?;
 
         let status = resp.status();
@@ -55,17 +51,9 @@ impl oio::Write for VercelArtifactsWriter {
         match status {
             StatusCode::OK | StatusCode::ACCEPTED => {
                 resp.into_body().consume().await?;
-                Ok(size)
+                Ok(())
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
 }
diff --git a/core/src/services/webdav/writer.rs 
b/core/src/services/webdav/writer.rs
index eaff3d8b4..5b6ea319f 100644
--- a/core/src/services/webdav/writer.rs
+++ b/core/src/services/webdav/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebdavBackend;
@@ -34,16 +35,21 @@ impl WebdavWriter {
     pub fn new(backend: WebdavBackend, op: OpWrite, path: String) -> Self {
         WebdavWriter { backend, op, path }
     }
+}
+
+#[async_trait]
+impl oio::OneShotWrite for WebdavWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
-    async fn write_oneshot(&mut self, size: u64, body: AsyncBody) -> 
Result<()> {
         let resp = self
             .backend
             .webdav_put(
                 &self.path,
-                Some(size),
+                Some(size as u64),
                 self.op.content_type(),
                 self.op.content_disposition(),
-                body,
+                AsyncBody::Bytes(bs),
             )
             .await?;
 
@@ -58,23 +64,3 @@ impl WebdavWriter {
         }
     }
 }
-
-#[async_trait]
-impl oio::Write for WebdavWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-
-        self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
-            .await?;
-
-        Ok(size)
-    }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-}
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index 0c7a9e10b..22901b496 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
@@ -36,10 +37,9 @@ impl WebhdfsWriter {
     }
 }
 
-#[async_trait]
-impl oio::Write for WebhdfsWriter {
-    fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
+impl oio::OneShotWrite for WebhdfsWriter {
+    async fn write_once(&self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
 
         let req = self
             .backend
@@ -47,7 +47,7 @@ impl oio::Write for WebhdfsWriter {
                 &self.path,
                 Some(size),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs.copy_to_bytes(size)),
+                AsyncBody::Bytes(bs),
             )
             .await?;
 
@@ -57,17 +57,9 @@ impl oio::Write for WebhdfsWriter {
         match status {
             StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
-                Ok(size)
+                Ok(())
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
-
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        Ok(())
-    }
 }

Reply via email to