This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 332cdb83d feat: cos multipart uploads write (#2712)
332cdb83d is described below

commit 332cdb83da71372604e8ba4fe21ea7821ed5f365
Author: parkma99 <[email protected]>
AuthorDate: Wed Jul 26 13:29:04 2023 +0800

    feat: cos multipart uploads write (#2712)
    
    * init cos multipart uploads write
    
    * feat: update comment
---
 core/src/services/cos/backend.rs |   6 +-
 core/src/services/cos/core.rs    |  21 +++--
 core/src/services/cos/writer.rs  | 165 ++++++++++-----------------------------
 3 files changed, 54 insertions(+), 138 deletions(-)

diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index d83a8a376..78236f2f6 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -268,7 +268,7 @@ pub struct CosBackend {
 impl Accessor for CosBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = CosWriter;
+    type Writer = oio::MultipartUploadWriter<CosWriter>;
     type BlockingWriter = ();
     type Appender = CosAppender;
     type Pager = CosPager;
@@ -294,6 +294,7 @@ impl Accessor for CosBackend {
                 write_can_sink: true,
                 write_with_content_type: true,
                 write_with_cache_control: true,
+                write_with_content_disposition: true,
                 write_without_content_length: true,
 
                 append: true,
@@ -323,7 +324,7 @@ impl Accessor for CosBackend {
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
         let mut req =
             self.core
-                .cos_put_object_request(path, Some(0), None, None, 
AsyncBody::Empty)?;
+                .cos_put_object_request(path, Some(0), None, None, None, 
AsyncBody::Empty)?;
 
         self.core.sign(&mut req).await?;
 
@@ -437,6 +438,7 @@ impl Accessor for CosBackend {
                 path,
                 None,
                 v.content_type(),
+                v.content_disposition(),
                 v.cache_control(),
                 AsyncBody::Empty,
             )?,
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index 56081cb18..6a81c7d7e 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -153,6 +153,7 @@ impl CosCore {
         path: &str,
         size: Option<u64>,
         content_type: Option<&str>,
+        content_disposition: Option<&str>,
         cache_control: Option<&str>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
@@ -168,7 +169,9 @@ impl CosCore {
         if let Some(cache_control) = cache_control {
             req = req.header(CACHE_CONTROL, cache_control)
         }
-
+        if let Some(pos) = content_disposition {
+            req = req.header(CONTENT_DISPOSITION, pos)
+        }
         if let Some(mime) = content_type {
             req = req.header(CONTENT_TYPE, mime)
         }
@@ -368,7 +371,7 @@ impl CosCore {
         path: &str,
         upload_id: &str,
         part_number: usize,
-        size: Option<u64>,
+        size: u64,
         body: AsyncBody,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
@@ -382,11 +385,7 @@ impl CosCore {
         );
 
         let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size);
-        }
-
+        req = req.header(CONTENT_LENGTH, size);
         // Set body
         let mut req = req.body(body).map_err(new_request_build_error)?;
 
@@ -399,7 +398,7 @@ impl CosCore {
         &self,
         path: &str,
         upload_id: &str,
-        parts: &[CompleteMultipartUploadRequestPart],
+        parts: Vec<CompleteMultipartUploadRequestPart>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
@@ -412,10 +411,8 @@ impl CosCore {
 
         let req = Request::post(&url);
 
-        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest 
{
-            part: parts.to_vec(),
-        })
-        .map_err(new_xml_deserialize_error)?;
+        let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest 
{ part: parts })
+            .map_err(new_xml_deserialize_error)?;
         // Make sure content length has been set to avoid post with chunked 
encoding.
         let req = req.header(CONTENT_LENGTH, content.len());
         // Set content-type to `application/xml` to avoid mixed with form post.
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index 676e123cf..702680ea3 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -19,7 +19,6 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::Buf;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -32,33 +31,29 @@ pub struct CosWriter {
 
     op: OpWrite,
     path: String,
-    upload_id: Option<String>,
-
-    parts: Vec<CompleteMultipartUploadRequestPart>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl CosWriter {
-    pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
-        let buffer_size = core.write_min_size;
-        CosWriter {
+    pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> 
oio::MultipartUploadWriter<Self> {
+        let write_min_size = core.write_min_size;
+        let total_size = op.content_length();
+        let cos_writer = CosWriter {
             core,
             path: path.to_string(),
             op,
-
-            upload_id: None,
-            parts: vec![],
-            buffer: oio::VectorCursor::new(),
-            buffer_size,
-        }
+        };
+        oio::MultipartUploadWriter::new(cos_writer, 
total_size).with_write_min_size(write_min_size)
     }
+}
 
-    async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::MultipartUploadWrite for CosWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
         let mut req = self.core.cos_put_object_request(
             &self.path,
             Some(size),
             self.op.content_type(),
+            self.op.content_disposition(),
             self.op.cache_control(),
             body,
         )?;
@@ -78,7 +73,7 @@ impl CosWriter {
         }
     }
 
-    async fn initiate_upload(&self) -> Result<String> {
+    async fn initiate_part(&self) -> Result<String> {
         let resp = self
             .core
             .cos_initiate_multipart_upload(
@@ -107,20 +102,16 @@ impl CosWriter {
     async fn write_part(
         &self,
         upload_id: &str,
-        bs: Bytes,
-    ) -> Result<CompleteMultipartUploadRequestPart> {
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
         // COS requires part number must between [1..=10000]
-        let part_number = self.parts.len() + 1;
+        let part_number = part_number + 1;
 
         let resp = self
             .core
-            .cos_upload_part_request(
-                &self.path,
-                upload_id,
-                part_number,
-                Some(bs.len() as u64),
-                AsyncBody::Bytes(bs),
-            )
+            .cos_upload_part_request(&self.path, upload_id, part_number, size, 
body)
             .await?;
 
         let status = resp.status();
@@ -138,78 +129,43 @@ impl CosWriter {
 
                 resp.into_body().consume().await?;
 
-                Ok(CompleteMultipartUploadRequestPart { part_number, etag })
+                Ok(oio::MultipartUploadPart { part_number, etag })
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for CosWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
-                    return self
-                        .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                        .await;
-                } else {
-                    let upload_id = self.initiate_upload().await?;
-                    self.upload_id = Some(upload_id);
-                    self.upload_id.as_deref().unwrap()
-                }
-            }
-        };
+    async fn complete_part(
+        &self,
+        upload_id: &str,
+        parts: &[oio::MultipartUploadPart],
+    ) -> Result<()> {
+        let parts = parts
+            .iter()
+            .map(|p| CompleteMultipartUploadRequestPart {
+                part_number: p.part_number,
+                etag: p.etag.clone(),
+            })
+            .collect();
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
+        let resp = self
+            .core
+            .cos_complete_multipart_upload(&self.path, upload_id, parts)
+            .await?;
 
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
+        let status = resp.status();
 
-        let bs = self.buffer.peak_at_least(self.buffer_size);
-        let size = bs.len();
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
 
-        match self.write_part(upload_id, bs).await {
-            Ok(part) => {
-                self.buffer.take(size);
-                self.parts.push(part);
                 Ok(())
             }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
-    }
-
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        if self.op.content_length().unwrap_or_default() == size {
-            return self.write_oneshot(size, AsyncBody::Stream(s)).await;
-        } else {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "COS does not support streaming multipart upload",
-            ));
+            _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
-        let upload_id = if let Some(upload_id) = &self.upload_id {
-            upload_id
-        } else {
-            return Ok(());
-        };
-
+    async fn abort_part(&self, upload_id: &str) -> Result<()> {
         let resp = self
             .core
             .cos_abort_multipart_upload(&self.path, upload_id)
@@ -224,43 +180,4 @@ impl oio::Write for CosWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    async fn close(&mut self) -> Result<()> {
-        let upload_id = if let Some(upload_id) = &self.upload_id {
-            upload_id
-        } else {
-            return Ok(());
-        };
-
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            match self.write_part(upload_id, bs).await {
-                Ok(part) => {
-                    self.buffer.clear();
-                    self.parts.push(part);
-                }
-                Err(e) => {
-                    return Err(e);
-                }
-            }
-        }
-
-        let resp = self
-            .core
-            .cos_complete_multipart_upload(&self.path, upload_id, &self.parts)
-            .await?;
-
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK => {
-                resp.into_body().consume().await?;
-
-                Ok(())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
 }

Reply via email to