This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 332cdb83d feat: cos multipart uploads write (#2712)
332cdb83d is described below
commit 332cdb83da71372604e8ba4fe21ea7821ed5f365
Author: parkma99 <[email protected]>
AuthorDate: Wed Jul 26 13:29:04 2023 +0800
feat: cos multipart uploads write (#2712)
* init cos multipart uploads write
* feat: update comment
---
core/src/services/cos/backend.rs | 6 +-
core/src/services/cos/core.rs | 21 +++--
core/src/services/cos/writer.rs | 165 ++++++++++-----------------------------
3 files changed, 54 insertions(+), 138 deletions(-)
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index d83a8a376..78236f2f6 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -268,7 +268,7 @@ pub struct CosBackend {
impl Accessor for CosBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = CosWriter;
+ type Writer = oio::MultipartUploadWriter<CosWriter>;
type BlockingWriter = ();
type Appender = CosAppender;
type Pager = CosPager;
@@ -294,6 +294,7 @@ impl Accessor for CosBackend {
write_can_sink: true,
write_with_content_type: true,
write_with_cache_control: true,
+ write_with_content_disposition: true,
write_without_content_length: true,
append: true,
@@ -323,7 +324,7 @@ impl Accessor for CosBackend {
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
let mut req =
self.core
- .cos_put_object_request(path, Some(0), None, None,
AsyncBody::Empty)?;
+ .cos_put_object_request(path, Some(0), None, None, None,
AsyncBody::Empty)?;
self.core.sign(&mut req).await?;
@@ -437,6 +438,7 @@ impl Accessor for CosBackend {
path,
None,
v.content_type(),
+ v.content_disposition(),
v.cache_control(),
AsyncBody::Empty,
)?,
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index 56081cb18..6a81c7d7e 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -153,6 +153,7 @@ impl CosCore {
path: &str,
size: Option<u64>,
content_type: Option<&str>,
+ content_disposition: Option<&str>,
cache_control: Option<&str>,
body: AsyncBody,
) -> Result<Request<AsyncBody>> {
@@ -168,7 +169,9 @@ impl CosCore {
if let Some(cache_control) = cache_control {
req = req.header(CACHE_CONTROL, cache_control)
}
-
+ if let Some(pos) = content_disposition {
+ req = req.header(CONTENT_DISPOSITION, pos)
+ }
if let Some(mime) = content_type {
req = req.header(CONTENT_TYPE, mime)
}
@@ -368,7 +371,7 @@ impl CosCore {
path: &str,
upload_id: &str,
part_number: usize,
- size: Option<u64>,
+ size: u64,
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -382,11 +385,7 @@ impl CosCore {
);
let mut req = Request::put(&url);
-
- if let Some(size) = size {
- req = req.header(CONTENT_LENGTH, size);
- }
-
+ req = req.header(CONTENT_LENGTH, size);
// Set body
let mut req = req.body(body).map_err(new_request_build_error)?;
@@ -399,7 +398,7 @@ impl CosCore {
&self,
path: &str,
upload_id: &str,
- parts: &[CompleteMultipartUploadRequestPart],
+ parts: Vec<CompleteMultipartUploadRequestPart>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
@@ -412,10 +411,8 @@ impl CosCore {
let req = Request::post(&url);
- let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest
{
- part: parts.to_vec(),
- })
- .map_err(new_xml_deserialize_error)?;
+ let content = quick_xml::se::to_string(&CompleteMultipartUploadRequest
{ part: parts })
+ .map_err(new_xml_deserialize_error)?;
// Make sure content length has been set to avoid post with chunked
encoding.
let req = req.header(CONTENT_LENGTH, content.len());
// Set content-type to `application/xml` to avoid mixed with form post.
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index 676e123cf..702680ea3 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -19,7 +19,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Buf;
-use bytes::Bytes;
use http::StatusCode;
use super::core::*;
@@ -32,33 +31,29 @@ pub struct CosWriter {
op: OpWrite,
path: String,
- upload_id: Option<String>,
-
- parts: Vec<CompleteMultipartUploadRequestPart>,
- buffer: oio::VectorCursor,
- buffer_size: usize,
}
impl CosWriter {
- pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) -> Self {
- let buffer_size = core.write_min_size;
- CosWriter {
+ pub fn new(core: Arc<CosCore>, path: &str, op: OpWrite) ->
oio::MultipartUploadWriter<Self> {
+ let write_min_size = core.write_min_size;
+ let total_size = op.content_length();
+ let cos_writer = CosWriter {
core,
path: path.to_string(),
op,
-
- upload_id: None,
- parts: vec![],
- buffer: oio::VectorCursor::new(),
- buffer_size,
- }
+ };
+ oio::MultipartUploadWriter::new(cos_writer,
total_size).with_write_min_size(write_min_size)
}
+}
- async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::MultipartUploadWrite for CosWriter {
+ async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.cos_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
+ self.op.content_disposition(),
self.op.cache_control(),
body,
)?;
@@ -78,7 +73,7 @@ impl CosWriter {
}
}
- async fn initiate_upload(&self) -> Result<String> {
+ async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
.cos_initiate_multipart_upload(
@@ -107,20 +102,16 @@ impl CosWriter {
async fn write_part(
&self,
upload_id: &str,
- bs: Bytes,
- ) -> Result<CompleteMultipartUploadRequestPart> {
+ part_number: usize,
+ size: u64,
+ body: AsyncBody,
+ ) -> Result<oio::MultipartUploadPart> {
// COS requires part number must between [1..=10000]
- let part_number = self.parts.len() + 1;
+ let part_number = part_number + 1;
let resp = self
.core
- .cos_upload_part_request(
- &self.path,
- upload_id,
- part_number,
- Some(bs.len() as u64),
- AsyncBody::Bytes(bs),
- )
+ .cos_upload_part_request(&self.path, upload_id, part_number, size,
body)
.await?;
let status = resp.status();
@@ -138,78 +129,43 @@ impl CosWriter {
resp.into_body().consume().await?;
- Ok(CompleteMultipartUploadRequestPart { part_number, etag })
+ Ok(oio::MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
-}
-#[async_trait]
-impl oio::Write for CosWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- let upload_id = match &self.upload_id {
- Some(upload_id) => upload_id,
- None => {
- if self.op.content_length().unwrap_or_default() == bs.len() as
u64 {
- return self
- .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await;
- } else {
- let upload_id = self.initiate_upload().await?;
- self.upload_id = Some(upload_id);
- self.upload_id.as_deref().unwrap()
- }
- }
- };
+ async fn complete_part(
+ &self,
+ upload_id: &str,
+ parts: &[oio::MultipartUploadPart],
+ ) -> Result<()> {
+ let parts = parts
+ .iter()
+ .map(|p| CompleteMultipartUploadRequestPart {
+ part_number: p.part_number,
+ etag: p.etag.clone(),
+ })
+ .collect();
- // Ignore empty bytes
- if bs.is_empty() {
- return Ok(());
- }
+ let resp = self
+ .core
+ .cos_complete_multipart_upload(&self.path, upload_id, parts)
+ .await?;
- self.buffer.push(bs);
- // Return directly if the buffer is not full
- if self.buffer.len() <= self.buffer_size {
- return Ok(());
- }
+ let status = resp.status();
- let bs = self.buffer.peak_at_least(self.buffer_size);
- let size = bs.len();
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
- match self.write_part(upload_id, bs).await {
- Ok(part) => {
- self.buffer.take(size);
- self.parts.push(part);
Ok(())
}
- Err(e) => {
- // If the upload fails, we should pop the given bs to make sure
- // write is re-enter safe.
- self.buffer.pop();
- Err(e)
- }
- }
- }
-
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- if self.op.content_length().unwrap_or_default() == size {
- return self.write_oneshot(size, AsyncBody::Stream(s)).await;
- } else {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "COS does not support streaming multipart upload",
- ));
+ _ => Err(parse_error(resp).await?),
}
}
- async fn abort(&mut self) -> Result<()> {
- let upload_id = if let Some(upload_id) = &self.upload_id {
- upload_id
- } else {
- return Ok(());
- };
-
+ async fn abort_part(&self, upload_id: &str) -> Result<()> {
let resp = self
.core
.cos_abort_multipart_upload(&self.path, upload_id)
@@ -224,43 +180,4 @@ impl oio::Write for CosWriter {
_ => Err(parse_error(resp).await?),
}
}
-
- async fn close(&mut self) -> Result<()> {
- let upload_id = if let Some(upload_id) = &self.upload_id {
- upload_id
- } else {
- return Ok(());
- };
-
- // Make sure internal buffer has been flushed.
- if !self.buffer.is_empty() {
- let bs = self.buffer.peak_exact(self.buffer.len());
-
- match self.write_part(upload_id, bs).await {
- Ok(part) => {
- self.buffer.clear();
- self.parts.push(part);
- }
- Err(e) => {
- return Err(e);
- }
- }
- }
-
- let resp = self
- .core
- .cos_complete_multipart_upload(&self.path, upload_id, &self.parts)
- .await?;
-
- let status = resp.status();
-
- match status {
- StatusCode::OK => {
- resp.into_body().consume().await?;
-
- Ok(())
- }
- _ => Err(parse_error(resp).await?),
- }
- }
}