This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-multipart
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b8874de8ef9b39768e650ae4ff394a55061e58f7
Author: Xuanwo <[email protected]>
AuthorDate: Thu Aug 31 14:18:54 2023 +0800

    refactor: Merge MultipartUpload and OneshotWrite for supported services
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/multipart_upload_write.rs | 77 +++++++++++++++++-------
 core/src/services/cos/backend.rs                 |  6 +-
 core/src/services/cos/writer.rs                  | 17 ++----
 core/src/services/obs/backend.rs                 |  6 +-
 core/src/services/obs/writer.rs                  | 31 ++++++++--
 core/src/services/oss/backend.rs                 |  6 +-
 core/src/services/oss/writer.rs                  | 33 ++++++++--
 core/src/services/s3/backend.rs                  |  6 +-
 core/src/services/s3/writer.rs                   | 28 ++++++++-
 9 files changed, 148 insertions(+), 62 deletions(-)

diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index c013b24c3..81d306681 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -32,6 +32,11 @@ use crate::*;
 /// - Expose `MultipartUploadWriter` as `Accessor::Writer`
 #[async_trait]
 pub trait MultipartUploadWrite: Send + Sync + Unpin {
+    /// write_once write all data at once.
+    ///
+    /// Implementations should make sure that the data is written correctly at 
once.
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
+
     /// initiate_part will call start a multipart upload and return the upload 
id.
     ///
     /// MultipartUploadWriter will call this when:
@@ -89,6 +94,8 @@ pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
 
     upload_id: Option<String>,
     parts: Vec<MultipartUploadPart>,
+
+    cache: Option<(u64, AsyncBody)>,
 }
 
 impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
@@ -99,6 +106,7 @@ impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
 
             upload_id: None,
             parts: Vec::new(),
+            cache: None,
         }
     }
 
@@ -121,47 +129,74 @@ where
     W: MultipartUploadWrite,
 {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = self.upload_id().await?;
+        let (size, body) = match self.cache.take() {
+            Some(cache) => {
+                self.cache = Some((bs.len() as u64, AsyncBody::Bytes(bs)));
+                cache
+            }
+            None => {
+                self.cache = Some((bs.len() as u64, AsyncBody::Bytes(bs)));
+                return Ok(());
+            }
+        };
 
-        let size = bs.len();
+        let upload_id = self.upload_id().await?;
 
         self.inner
-            .write_part(
-                &upload_id,
-                self.parts.len(),
-                size as u64,
-                AsyncBody::Bytes(bs),
-            )
+            .write_part(&upload_id, self.parts.len(), size, body)
             .await
             .map(|v| self.parts.push(v))
     }
 
     async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        let (size, body) = match self.cache.take() {
+            Some(cache) => {
+                self.cache = Some((size, AsyncBody::Stream(s)));
+                cache
+            }
+            None => {
+                self.cache = Some((size, AsyncBody::Stream(s)));
+                return Ok(());
+            }
+        };
+
         let upload_id = self.upload_id().await?;
 
         self.inner
-            .write_part(&upload_id, self.parts.len(), size, 
AsyncBody::Stream(s))
+            .write_part(&upload_id, self.parts.len(), size, body)
             .await
             .map(|v| self.parts.push(v))
     }
 
     async fn close(&mut self) -> Result<()> {
-        let upload_id = if let Some(upload_id) = &self.upload_id {
-            upload_id
-        } else {
-            return Ok(());
-        };
+        match self.upload_id.take() {
+            Some(upload_id) => {
+                if let Some((size, body)) = self.cache.take() {
+                    self.inner
+                        .write_part(&upload_id, self.parts.len(), size, body)
+                        .await
+                        .map(|v| self.parts.push(v))?;
+                }
+
+                self.inner.complete_part(&upload_id, &self.parts).await
+            }
+            None => {
+                if let Some((size, body)) = self.cache.take() {
+                    self.inner.write_once(size, body).await?;
+                }
 
-        self.inner.complete_part(upload_id, &self.parts).await
+                Ok(())
+            }
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {
-        let upload_id = if let Some(upload_id) = &self.upload_id {
-            upload_id
-        } else {
-            return Ok(());
-        };
+        // Cleanup existing cache.
+        self.cache = None;
 
-        self.inner.abort_part(upload_id).await
+        match self.upload_id.take() {
+            Some(upload_id) => self.inner.abort_part(&upload_id).await,
+            None => Ok(()),
+        }
     }
 }
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 3360d9dd5..13e9b29ca 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -338,11 +338,9 @@ impl Accessor for CosBackend {
         let writer = CosWriter::new(self.core.clone(), path, args.clone());
 
         let w = if args.append() {
-            CosWriters::Three(oio::AppendObjectWriter::new(writer))
-        } else if args.content_length().is_some() {
-            CosWriters::One(oio::OneShotWriter::new(writer))
+            CosWriters::Two(oio::AppendObjectWriter::new(writer))
         } else {
-            CosWriters::Two(oio::MultipartUploadWriter::new(writer))
+            CosWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
         let w = if let Some(buffer_size) = args.buffer_size() {
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index af2a6ebd0..bd29be2eb 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -23,15 +23,11 @@ use http::StatusCode;
 
 use super::core::*;
 use super::error::parse_error;
-use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-pub type CosWriters = oio::ThreeWaysWriter<
-    oio::OneShotWriter<CosWriter>,
-    oio::MultipartUploadWriter<CosWriter>,
-    oio::AppendObjectWriter<CosWriter>,
->;
+pub type CosWriters =
+    oio::TwoWaysWriter<oio::MultipartUploadWriter<CosWriter>, 
oio::AppendObjectWriter<CosWriter>>;
 
 pub struct CosWriter {
     core: Arc<CosCore>,
@@ -51,15 +47,15 @@ impl CosWriter {
 }
 
 #[async_trait]
-impl oio::OneShotWrite for CosWriter {
-    async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
+impl oio::MultipartUploadWrite for CosWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
         let mut req = self.core.cos_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
             self.op.content_disposition(),
             self.op.cache_control(),
-            AsyncBody::Stream(stream),
+            body,
         )?;
 
         self.core.sign(&mut req).await?;
@@ -76,10 +72,7 @@ impl oio::OneShotWrite for CosWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::MultipartUploadWrite for CosWriter {
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6600ddd9b..d8b4a33aa 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -376,11 +376,9 @@ impl Accessor for ObsBackend {
         let writer = ObsWriter::new(self.core.clone(), path, args.clone());
 
         let w = if args.append() {
-            ObsWriters::Three(oio::AppendObjectWriter::new(writer))
-        } else if args.content_length().is_some() {
-            ObsWriters::One(oio::OneShotWriter::new(writer))
+            ObsWriters::Two(oio::AppendObjectWriter::new(writer))
         } else {
-            ObsWriters::Two(oio::MultipartUploadWriter::new(writer))
+            ObsWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
         let w = if let Some(buffer_size) = args.buffer_size() {
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index d3b1e119f..7445613be 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -28,11 +28,8 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-pub type ObsWriters = oio::ThreeWaysWriter<
-    oio::OneShotWriter<ObsWriter>,
-    oio::MultipartUploadWriter<ObsWriter>,
-    oio::AppendObjectWriter<ObsWriter>,
->;
+pub type ObsWriters =
+    oio::TwoWaysWriter<oio::MultipartUploadWriter<ObsWriter>, 
oio::AppendObjectWriter<ObsWriter>>;
 
 pub struct ObsWriter {
     core: Arc<ObsCore>,
@@ -80,6 +77,30 @@ impl oio::OneShotWrite for ObsWriter {
 
 #[async_trait]
 impl oio::MultipartUploadWrite for ObsWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self.core.obs_put_object_request(
+            &self.path,
+            Some(size),
+            self.op.content_type(),
+            self.op.cache_control(),
+            body,
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index ee1131ee1..2ea5fe9e6 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -474,11 +474,9 @@ impl Accessor for OssBackend {
         let writer = OssWriter::new(self.core.clone(), path, args.clone());
 
         let w = if args.append() {
-            OssWriters::Three(oio::AppendObjectWriter::new(writer))
-        } else if args.content_length().is_some() {
-            OssWriters::One(oio::OneShotWriter::new(writer))
+            OssWriters::Two(oio::AppendObjectWriter::new(writer))
         } else {
-            OssWriters::Two(oio::MultipartUploadWriter::new(writer))
+            OssWriters::One(oio::MultipartUploadWriter::new(writer))
         };
 
         let w = if let Some(buffer_size) = args.buffer_size() {
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 27aa09011..fd759df88 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -27,11 +27,8 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-pub type OssWriters = oio::ThreeWaysWriter<
-    oio::OneShotWriter<OssWriter>,
-    oio::MultipartUploadWriter<OssWriter>,
-    oio::AppendObjectWriter<OssWriter>,
->;
+pub type OssWriters =
+    oio::TwoWaysWriter<oio::MultipartUploadWriter<OssWriter>, 
oio::AppendObjectWriter<OssWriter>>;
 
 pub struct OssWriter {
     core: Arc<OssCore>,
@@ -81,6 +78,32 @@ impl oio::OneShotWrite for OssWriter {
 
 #[async_trait]
 impl oio::MultipartUploadWrite for OssWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self.core.oss_put_object_request(
+            &self.path,
+            Some(size),
+            self.op.content_type(),
+            self.op.content_disposition(),
+            self.op.cache_control(),
+            body,
+            false,
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 600fac3f3..36344da80 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -982,11 +982,7 @@ impl Accessor for S3Backend {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let writer = S3Writer::new(self.core.clone(), path, args.clone());
 
-        let w = if args.content_length().is_some() {
-            S3Writers::One(oio::OneShotWriter::new(writer))
-        } else {
-            S3Writers::Two(oio::MultipartUploadWriter::new(writer))
-        };
+        let w = oio::MultipartUploadWriter::new(writer);
 
         let w = if let Some(buffer_size) = args.buffer_size() {
             let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index a27341d71..6bda47cdb 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -27,8 +27,7 @@ use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
-pub type S3Writers =
-    oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>, 
oio::MultipartUploadWriter<S3Writer>>;
+pub type S3Writers = oio::MultipartUploadWriter<S3Writer>;
 
 pub struct S3Writer {
     core: Arc<S3Core>,
@@ -77,6 +76,31 @@ impl oio::OneShotWrite for S3Writer {
 
 #[async_trait]
 impl oio::MultipartUploadWrite for S3Writer {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let mut req = self.core.s3_put_object_request(
+            &self.path,
+            Some(size),
+            self.op.content_type(),
+            self.op.content_disposition(),
+            self.op.cache_control(),
+            body,
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core

Reply via email to