This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b31a58438 feat: oss multipart uploads write (#2723)
b31a58438 is described below
commit b31a58438c591af63bcec3cf7d67e1ef30f8ac1e
Author: parkma99 <[email protected]>
AuthorDate: Thu Jul 27 23:09:09 2023 +0800
feat: oss multipart uploads write (#2723)
---
core/src/services/oss/backend.rs | 3 +-
core/src/services/oss/core.rs | 58 ++++++------
core/src/services/oss/writer.rs | 184 ++++++++++++++++-----------------------
3 files changed, 108 insertions(+), 137 deletions(-)
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 370c7d2e7..14cc08c2f 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -386,7 +386,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = OssWriter;
+ type Writer = oio::MultipartUploadWriter<OssWriter>;
type BlockingWriter = ();
type Appender = OssAppender;
type Pager = OssPager;
@@ -412,6 +412,7 @@ impl Accessor for OssBackend {
write_can_sink: true,
write_with_cache_control: true,
write_with_content_type: true,
+ write_with_content_disposition: true,
write_without_content_length: true,
delete: true,
create_dir: true,
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index da2628a3a..11279f9e1 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -487,27 +487,13 @@ impl OssCore {
}
pub async fn oss_initiate_upload(
- &self,
- path: &str,
- args: &OpWrite,
- ) -> Result<Response<IncomingAsyncBody>> {
- let cache_control = args.cache_control();
- let req = self
- .oss_initiate_upload_request(path, None, None, cache_control,
AsyncBody::Empty, false)
- .await?;
- self.send(req).await
- }
-
- /// Creates a request that initiates multipart upload
- async fn oss_initiate_upload_request(
&self,
path: &str,
content_type: Option<&str>,
content_disposition: Option<&str>,
cache_control: Option<&str>,
- body: AsyncBody,
is_presign: bool,
- ) -> Result<Request<AsyncBody>> {
+ ) -> Result<Response<IncomingAsyncBody>> {
let path = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
let url = format!("{}/{}?uploads", endpoint,
percent_encode_path(&path));
@@ -522,9 +508,11 @@ impl OssCore {
req = req.header(CACHE_CONTROL, cache_control);
}
req = self.insert_sse_headers(req);
- let mut req = req.body(body).map_err(new_request_build_error)?;
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
self.sign(&mut req).await?;
- Ok(req)
+ self.send(req).await
}
/// Creates a request to upload a part
@@ -534,9 +522,9 @@ impl OssCore {
upload_id: &str,
part_number: usize,
is_presign: bool,
- size: Option<u64>,
+ size: u64,
body: AsyncBody,
- ) -> Result<Request<AsyncBody>> {
+ ) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
@@ -549,13 +537,10 @@ impl OssCore {
);
let mut req = Request::put(&url);
-
- if let Some(size) = size {
- req = req.header(CONTENT_LENGTH, size);
- }
+ req = req.header(CONTENT_LENGTH, size);
let mut req = req.body(body).map_err(new_request_build_error)?;
self.sign(&mut req).await?;
- Ok(req)
+ self.send(req).await
}
pub async fn oss_complete_multipart_upload_request(
@@ -563,7 +548,7 @@ impl OssCore {
path: &str,
upload_id: &str,
is_presign: bool,
- parts: &[MultipartUploadPart],
+ parts: Vec<MultipartUploadPart>,
) -> Result<Response<IncomingAsyncBody>> {
let p = build_abs_path(&self.root, path);
let endpoint = self.get_endpoint(is_presign);
@@ -592,6 +577,29 @@ impl OssCore {
self.sign(&mut req).await?;
self.send(req).await
}
+
+ /// Abort an on-going multipart upload.
+ /// reference docs
https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
+ pub async fn oss_abort_multipart_upload(
+ &self,
+ path: &str,
+ upload_id: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/{}?uploadId={}",
+ self.endpoint,
+ percent_encode_path(&p),
+ percent_encode_path(upload_id)
+ );
+
+ let mut req = Request::delete(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
}
/// Request of DeleteObjects.
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index d81fbebf1..f2f737d65 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -19,7 +19,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Buf;
-use bytes::Bytes;
use http::StatusCode;
use super::core::*;
@@ -32,29 +31,28 @@ pub struct OssWriter {
op: OpWrite,
path: String,
- upload_id: Option<String>,
-
- parts: Vec<MultipartUploadPart>,
- buffer: oio::VectorCursor,
- buffer_size: usize,
}
impl OssWriter {
- pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
- let buffer_size = core.write_min_size;
- OssWriter {
+ pub fn new(
+ core: Arc<OssCore>,
+ path: &str,
+ op: OpWrite,
+ ) -> oio::MultipartUploadWriter<OssWriter> {
+ let write_min_size = core.write_min_size;
+ let total_size = op.content_length();
+ let oss_writer = OssWriter {
core,
path: path.to_string(),
op,
-
- upload_id: None,
- parts: vec![],
- buffer: oio::VectorCursor::new(),
- buffer_size,
- }
+ };
+ oio::MultipartUploadWriter::new(oss_writer,
total_size).with_write_min_size(write_min_size)
}
+}
- async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::MultipartUploadWrite for OssWriter {
+ async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.oss_put_object_request(
&self.path,
Some(size),
@@ -80,38 +78,51 @@ impl OssWriter {
}
}
- async fn initiate_upload(&self) -> Result<String> {
- let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
- match resp.status() {
+ async fn initiate_part(&self) -> Result<String> {
+ let resp = self
+ .core
+ .oss_initiate_upload(
+ &self.path,
+ self.op.content_type(),
+ self.op.content_disposition(),
+ self.op.cache_control(),
+ false,
+ )
+ .await?;
+
+ let status = resp.status();
+
+ match status {
StatusCode::OK => {
let bs = resp.into_body().bytes().await?;
+
let result: InitiateMultipartUploadResult =
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
Ok(result.upload_id)
}
_ => Err(parse_error(resp).await?),
}
}
- async fn write_part(&self, upload_id: &str, bs: Bytes) ->
Result<MultipartUploadPart> {
- // Aliyun OSS requires part number must between [1..=10000]
- let part_number = self.parts.len() + 1;
- let mut req = self
+ async fn write_part(
+ &self,
+ upload_id: &str,
+ part_number: usize,
+ size: u64,
+ body: AsyncBody,
+ ) -> Result<oio::MultipartUploadPart> {
+ // OSS requires part number must between [1..=10000]
+ let part_number = part_number + 1;
+
+ let resp = self
.core
- .oss_upload_part_request(
- &self.path,
- upload_id,
- part_number,
- false,
- Some(bs.len() as u64),
- AsyncBody::Bytes(bs),
- )
+ .oss_upload_part_request(&self.path, upload_id, part_number,
false, size, body)
.await?;
- self.core.sign(&mut req).await?;
+ let status = resp.status();
- let resp = self.core.send(req).await?;
- match resp.status() {
+ match status {
StatusCode::OK => {
let etag = parse_etag(resp.headers())?
.ok_or_else(|| {
@@ -121,103 +132,54 @@ impl OssWriter {
)
})?
.to_string();
+
resp.into_body().consume().await?;
- Ok(MultipartUploadPart { part_number, etag })
+
+ Ok(oio::MultipartUploadPart { part_number, etag })
}
_ => Err(parse_error(resp).await?),
}
}
-}
-#[async_trait]
-impl oio::Write for OssWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- let upload_id = match &self.upload_id {
- Some(upload_id) => upload_id,
- None => {
- if self.op.content_length().unwrap_or_default() == bs.len() as
u64 {
- return self
- .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
- .await;
- } else {
- let upload_id = self.initiate_upload().await?;
- self.upload_id = Some(upload_id);
- self.upload_id.as_deref().unwrap()
- }
- }
- };
+ async fn complete_part(
+ &self,
+ upload_id: &str,
+ parts: &[oio::MultipartUploadPart],
+ ) -> Result<()> {
+ let parts = parts
+ .iter()
+ .map(|p| MultipartUploadPart {
+ part_number: p.part_number,
+ etag: p.etag.clone(),
+ })
+ .collect();
- // Ignore empty bytes
- if bs.is_empty() {
- return Ok(());
- }
+ let resp = self
+ .core
+ .oss_complete_multipart_upload_request(&self.path, upload_id,
false, parts)
+ .await?;
- self.buffer.push(bs);
- // Return directly if the buffer is not full
- if self.buffer.len() <= self.buffer_size {
- return Ok(());
- }
+ let status = resp.status();
- let bs = self.buffer.peak_at_least(self.buffer_size);
- let size = bs.len();
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
- match self.write_part(upload_id, bs).await {
- Ok(part) => {
- self.buffer.take(size);
- self.parts.push(part);
Ok(())
}
- Err(e) => {
- // If the upload fails, we should pop the given bs to make sure
- // write is re-enter safe.
- self.buffer.pop();
- Err(e)
- }
+ _ => Err(parse_error(resp).await?),
}
}
- async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- self.write_oneshot(size, AsyncBody::Stream(s)).await
- }
-
- // TODO: we can cancel the upload by sending an abort request.
- async fn abort(&mut self) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support abort",
- ))
- }
-
- async fn close(&mut self) -> Result<()> {
- let upload_id = if let Some(upload_id) = &self.upload_id {
- upload_id
- } else {
- return Ok(());
- };
-
- // Make sure internal buffer has been flushed.
- if !self.buffer.is_empty() {
- let bs = self.buffer.peak_exact(self.buffer.len());
-
- match self.write_part(upload_id, bs).await {
- Ok(part) => {
- self.buffer.clear();
- self.parts.push(part);
- }
- Err(e) => {
- return Err(e);
- }
- }
- }
-
+ async fn abort_part(&self, upload_id: &str) -> Result<()> {
let resp = self
.core
- .oss_complete_multipart_upload_request(&self.path, upload_id,
false, &self.parts)
+ .oss_abort_multipart_upload(&self.path, upload_id)
.await?;
match resp.status() {
- StatusCode::OK => {
+ // OSS returns code 204 if abort succeeds.
+ StatusCode::NO_CONTENT => {
resp.into_body().consume().await?;
-
Ok(())
}
_ => Err(parse_error(resp).await?),