This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-writer in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 4d71d9c777929b11b1b4b6eae2551b3a9f470f59 Author: Xuanwo <[email protected]> AuthorDate: Wed Apr 19 15:58:17 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/cursor.rs | 129 +++++++++++++++++++ core/src/raw/oio/mod.rs | 1 + core/src/raw/oio/write.rs | 28 +++-- core/src/services/azblob/backend.rs | 4 +- core/src/services/azdfs/backend.rs | 4 +- core/src/services/ftp/backend.rs | 4 +- core/src/services/gcs/backend.rs | 25 +--- core/src/services/gcs/core.rs | 10 +- core/src/services/gcs/writer.rs | 180 +++++++++++++++++---------- core/src/services/ghac/backend.rs | 4 +- core/src/services/ipmfs/backend.rs | 4 +- core/src/services/obs/backend.rs | 10 +- core/src/services/obs/core.rs | 5 - core/src/services/obs/writer.rs | 1 - core/src/services/oss/backend.rs | 18 +-- core/src/services/oss/writer.rs | 96 +++++++++++--- core/src/services/s3/backend.rs | 32 +---- core/src/services/s3/core.rs | 5 - core/src/services/s3/writer.rs | 119 ++++++++++++++---- core/src/services/webdav/backend.rs | 4 +- core/src/services/webhdfs/backend.rs | 4 +- core/src/types/operator/blocking_operator.rs | 18 ++- core/src/types/operator/operator.rs | 18 ++- core/src/types/ops.rs | 32 ++--- core/src/types/writer.rs | 4 +- 25 files changed, 510 insertions(+), 249 deletions(-) diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index c0282ffd..f8db0539 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -15,12 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::collections::VecDeque; use std::io::Read; use std::io::SeekFrom; use std::task::Context; use std::task::Poll; +use bytes::Buf; use bytes::Bytes; +use bytes::BytesMut; use crate::raw::*; use crate::*; @@ -138,3 +141,129 @@ impl oio::BlockingRead for Cursor { } } } + +/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`] +pub struct VectorCursor { + inner: VecDeque<Bytes>, + size: usize, +} + +impl VectorCursor { + /// Create a new vector cursor. + pub fn new() -> Self { + Self { + inner: VecDeque::new(), + size: 0, + } + } + + /// Returns `true` if current vector is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Return current bytes size of current vector. + pub fn len(&self) -> usize { + self.size + } + + /// Push a new bytes into vector cursor. + pub fn push(&mut self, bs: Bytes) { + self.size += bs.len(); + self.inner.push_back(bs); + } + + /// Pop a bytes from vector cursor. + pub fn pop(&mut self) { + let bs = self.inner.pop_back(); + self.size -= bs.expect("poped bytes must exist").len() + } + + /// Clear the entire vector. + pub fn clear(&mut self) { + self.inner.clear(); + self.size = 0; + } + + /// Peak will read and copy n bytes from current cursor without + /// change it's content. + /// + /// # Panics + /// + /// Panics if n is larger than current size. + /// + /// # TODO + /// + /// Optimize to avoid data copy. + pub fn peak(&self, n: usize) -> Bytes { + assert!(n <= self.size, "peak size must smamller than current size"); + + // Avoid data copy if n is smaller than first chunk. + if self.inner[0].len() >= n { + return self.inner[0].slice(..n); + } + + let mut bs = BytesMut::with_capacity(n); + let mut n = n; + for b in &self.inner { + if n == 0 { + break; + } + let len = b.len().min(n); + bs.extend_from_slice(&b[..len]); + n -= len; + } + bs.freeze() + } + + /// Take will consume n bytes from current cursor. + /// + /// # Panics + /// + /// Panics if n is larger than current size. + pub fn take(&mut self, n: usize) { + assert!(n <= self.size, "take size must smamller than current size"); + + // Update current size + self.size -= n; + + let mut n = n; + while n > 0 { + assert!(!self.inner.is_empty(), "inner must not be empty"); + + if self.inner[0].len() <= n { + n -= self.inner[0].len(); + self.inner.pop_front(); + } else { + self.inner[0].advance(n); + n = 0; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vector_cursor() { + let mut vc = VectorCursor::new(); + + vc.push(Bytes::from("hello")); + vc.push(Bytes::from("world")); + + assert_eq!(vc.peak(1), Bytes::from("h")); + assert_eq!(vc.peak(1), Bytes::from("h")); + assert_eq!(vc.peak(4), Bytes::from("hell")); + assert_eq!(vc.peak(6), Bytes::from("hellow")); + assert_eq!(vc.peak(10), Bytes::from("helloworld")); + + vc.take(1); + assert_eq!(vc.peak(1), Bytes::from("e")); + vc.take(1); + assert_eq!(vc.peak(1), Bytes::from("l")); + vc.take(5); + assert_eq!(vc.peak(1), Bytes::from("r")); + } +} diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs index f82af4ca..56b48dda 100644 --- a/core/src/raw/oio/mod.rs +++ b/core/src/raw/oio/mod.rs @@ -43,6 +43,7 @@ pub use write::Writer; mod cursor; pub use cursor::Cursor; +pub use cursor::VectorCursor; mod into_streamable; pub use into_streamable::into_streamable_reader; diff --git a/core/src/raw/oio/write.rs b/core/src/raw/oio/write.rs index 05f93a93..c74e0d90 100644 --- a/core/src/raw/oio/write.rs +++ b/core/src/raw/oio/write.rs @@ -76,23 +76,33 @@ impl From<WriteOperation> for &'static str { pub type Writer = Box<dyn Write>; /// Write is the trait that OpenDAL returns to callers. +/// +/// # Notes +/// +/// There are two possible two cases: +/// +/// - Sized: The total size of the object is known in advance. +/// - Unsized: The total size of the object is unknown in advance. +/// +/// And it's possible that the given bs length is less than the total +/// content length. Users will call write multiple times to write +/// the whole data. #[async_trait] pub trait Write: Unpin + Send + Sync { - /// Write whole content at once. + /// Write given into writer. + /// + /// # Notes /// - /// To append multiple bytes together, use `append` instead. + /// It's possible that the given bs length is less than the total + /// content length. And users will call write multiple times. + /// + /// Please make sure `write` is safe to re-enter. async fn write(&mut self, bs: Bytes) -> Result<()>; /// Append bytes to the writer. - /// - /// It is highly recommended to align the length of the input bytes - /// into blocks of 4MiB (except the last block) for better performance - /// and compatibility. async fn append(&mut self, bs: Bytes) -> Result<()>; - /// Abort the pending appendable writer. - /// #note - /// This method is only applicable to writers opened in append mode. + /// Abort the pending writer. async fn abort(&mut self) -> Result<()>; /// Close the writer and make sure all data has been flushed. diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 5daa104e..4e5f89b4 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -497,10 +497,10 @@ impl Accessor for AzblobBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/azdfs/backend.rs b/core/src/services/azdfs/backend.rs index 5c0b68db..1dbe605e 100644 --- a/core/src/services/azdfs/backend.rs +++ b/core/src/services/azdfs/backend.rs @@ -354,10 +354,10 @@ impl Accessor for AzdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index f509eff2..5dd4b0ff 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -388,10 +388,10 @@ impl Accessor for FtpBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 7e865dcb..aba03063 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -410,32 +410,9 @@ impl Accessor for GcsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let upload_location = if args.append() { - let resp = self.core.gcs_initiate_resumable_upload(path).await?; - let status = resp.status(); - - match status { - StatusCode::OK => { - let bs = parse_location(resp.headers()) - .expect("Failed to retrieve location of resumable upload"); - if let Some(location) = bs { - Some(String::from(location)) - } else { - return Err(Error::new( - ErrorKind::NotFound, - "location is not in the response header", - )); - } - } - _ => return Err(parse_error(resp).await?), - } - } else { - None - }; - Ok(( RpWrite::default(), - GcsWriter::new(self.core.clone(), args, path.to_string(), upload_location), + GcsWriter::new(self.core.clone(), path, args), )) } diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 18376acd..907d0ef4 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -322,7 +322,7 @@ impl GcsCore { &self, location: &str, size: u64, - written_bytes: u64, + written: u64, is_last_part: bool, body: AsyncBody, ) -> Result<Request<AsyncBody>> { @@ -331,12 +331,12 @@ impl GcsCore { let range_header = if is_last_part { format!( "bytes {}-{}/{}", - written_bytes, - written_bytes + size - 1, - written_bytes + size + written, + written + size - 1, + written + size ) } else { - format!("bytes {}-{}/*", written_bytes, written_bytes + size - 1) + format!("bytes {}-{}/*", written, written + size - 1) }; req = req diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index b47b2051..19fbd6d5 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -29,37 +29,40 @@ use crate::*; pub struct GcsWriter { core: Arc<GcsCore>, - - op: OpWrite, path: String, + op: OpWrite, + location: Option<String>, - written_bytes: u64, - is_last_part_written: bool, - last: Option<Bytes>, + written: u64, + buffer: oio::VectorCursor, + buffer_size: usize, } impl GcsWriter { - pub fn new( - core: Arc<GcsCore>, - op: OpWrite, - path: String, - upload_location: Option<String>, - ) -> Self { + pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self { GcsWriter { core, + path: path.to_string(), op, - path, - location: upload_location, - written_bytes: 0, - is_last_part_written: false, - last: None, + + location: None, + written: 0, + buffer: oio::VectorCursor::new(), + // The chunk size should be a multiple of 256 KiB + // (256 x 1024 bytes), unless it's the last chunk + // that completes the upload. + // + // Larger chunk sizes typically make uploads faster, + // but note that there's a tradeoff between speed and + // memory usage. It's recommended that you use at least + // 8 MiB for the chunk size. + // + // TODO: allow this value to be configured. + buffer_size: 8 * 1024 * 1024, } } -} -#[async_trait] -impl oio::Write for GcsWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write_oneshot(&self, bs: Bytes) -> Result<()> { let mut req = self.core.gcs_insert_object_request( &percent_encode_path(&self.path), Some(bs.len()), @@ -82,80 +85,123 @@ impl oio::Write for GcsWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let location = if let Some(location) = &self.location { - location - } else { - return Ok(()); - }; + async fn initiate_upload(&self) -> Result<String> { + let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?; + let status = resp.status(); - let result = if let Some(last) = &self.last { - let bytes_to_upload = last.slice(0..last.len()); - let part_size = bytes_to_upload.len() as u64; - let is_last_part = part_size % (256 * 1024) != 0; - let mut req = self.core.gcs_upload_in_resumable_upload( - location, - part_size, - self.written_bytes, - is_last_part, - AsyncBody::Bytes(bytes_to_upload), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::PERMANENT_REDIRECT => { - if is_last_part { - self.is_last_part_written = true - } else { - self.written_bytes += part_size; - } - Ok(()) + match status { + StatusCode::OK => { + let bs = parse_location(resp.headers())?; + if let Some(location) = bs { + Ok(location.to_string()) + } else { + Err(Error::new( + ErrorKind::Unexpected, + "location is not in the response header", + )) + } + } + _ => Err(parse_error(resp).await?), + } + } + + async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> { + let mut req = self.core.gcs_upload_in_resumable_upload( + location, + bs.len() as u64, + self.written, + false, + AsyncBody::Bytes(bs), + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()), + _ => Err(parse_error(resp).await?), + } + } +} + +#[async_trait] +impl oio::Write for GcsWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let location = match &self.location { + Some(location) => location, + None => { + if self.op.content_length().unwrap_or_default() == bs.len() as u64 + && self.written == 0 + { + return self.write_oneshot(bs).await; + } else { + let location = self.initiate_upload().await?; + self.location = Some(location.clone()); + self.location.as_deref().unwrap() } - _ => Err(parse_error(resp).await?), } - } else { - Ok(()) }; - self.last = Some(bs.slice(0..bs.len())); - return result; + // Ignore empty bytes + if bs.len() == 0 { + return Ok(()); + } + + self.buffer.push(bs); + // Return directly if the buffer is not full + if self.buffer.len() <= self.buffer_size { + return Ok(()); + } + + let bs = self.buffer.peak(self.buffer_size); + + match self.write_part(location, bs).await { + Ok(_) => { + self.buffer.take(self.buffer_size); + self.written += self.buffer_size as u64; + Ok(()) + } + Err(e) => { + // If the upload fails, we should pop the given bs to make sure + // write is re-enter safe. + self.buffer.pop(); + Err(e) + } + } } + async fn append(&mut self, bs: Bytes) -> Result<()> { + self.write(bs).await + } + + // TODO: we can cancel the upload by sending a DELETE request to the location async fn abort(&mut self) -> Result<()> { Ok(()) } async fn close(&mut self) -> Result<()> { - if self.is_last_part_written { - return Ok(()); - } - let location = if let Some(location) = &self.location { location } else { return Ok(()); }; - let bs = self - .last - .as_ref() - .expect("failed to get the previously uploaded part"); + let bs = self.buffer.peak(self.buffer.len()); let resp = self .core - .gcs_complete_resumable_upload(location, self.written_bytes, bs.slice(0..bs.len())) + .gcs_complete_resumable_upload(location, self.written, bs) .await?; let status = resp.status(); - match status { StatusCode::OK => { resp.into_body().consume().await?; + + self.location = None; + self.buffer.clear(); Ok(()) } _ => Err(parse_error(resp).await?), diff --git a/core/src/services/ghac/backend.rs b/core/src/services/ghac/backend.rs index 159b0022..c5afeb81 100644 --- a/core/src/services/ghac/backend.rs +++ b/core/src/services/ghac/backend.rs @@ -391,10 +391,10 @@ impl Accessor for GhacBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index c2e59f84..9e9377b8 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -110,10 +110,10 @@ impl Accessor for IpmfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 3412999e..c8223d1a 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -315,9 +315,9 @@ impl Accessor for ObsBackend { } async fn create_dir(&self, path: &str, _: OpCreate) -> Result<RpCreate> { - let mut req = - self.core - .obs_put_object_request(path, Some(0), None, None, AsyncBody::Empty)?; + let mut req = self + .core + .obs_put_object_request(path, Some(0), None, AsyncBody::Empty)?; self.core.sign(&mut req).await?; @@ -352,10 +352,10 @@ impl Accessor for ObsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs index c61f1f02..191ec95b 100644 --- a/core/src/services/obs/core.rs +++ b/core/src/services/obs/core.rs @@ -116,7 +116,6 @@ impl ObsCore { path: &str, size: Option<usize>, content_type: Option<&str>, - if_match: Option<&str>, body: AsyncBody, ) -> Result<Request<AsyncBody>> { let p = build_abs_path(&self.root, path); @@ -125,10 +124,6 @@ impl ObsCore { let mut req = Request::put(&url); - if let Some(if_match) = if_match { - req = req.header(IF_MATCH, if_match); - } - if let Some(size) = size { req = req.header(CONTENT_LENGTH, size) } diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index 080e0919..21c685c7 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -47,7 +47,6 @@ impl oio::Write for ObsWriter { &self.path, Some(bs.len()), self.op.content_type(), - self.op.if_match(), AsyncBody::Bytes(bs), )?; diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index b3077c7b..4e298fb6 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -407,25 +407,9 @@ impl Accessor for OssBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let upload_id = if args.append() { - let resp = self.core.oss_initiate_upload(path, &args).await?; - match resp.status() { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()) - .map_err(new_xml_deserialize_error)?; - Some(result.upload_id) - } - _ => return Err(parse_error(resp).await?), - } - } else { - None - }; - Ok(( RpWrite::default(), - OssWriter::new(self.core.clone(), args, path.to_string(), upload_id), + OssWriter::new(self.core.clone(), &path, args), )) } diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 3c56226c..095ba6e4 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use http::StatusCode; use super::core::*; @@ -33,24 +33,33 @@ pub struct OssWriter { op: OpWrite, path: String, upload_id: Option<String>, + parts: Vec<MultipartUploadPart>, + buffer: oio::VectorCursor, + buffer_size: usize, } impl OssWriter { - pub fn new(core: Arc<OssCore>, op: OpWrite, path: String, upload_id: Option<String>) -> Self { + pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self { OssWriter { core, + path: path.to_string(), op, - path, - upload_id, + + upload_id: None, parts: vec![], + buffer: oio::VectorCursor::new(), + // The part size must be 5 MiB to 5 GiB. There is no minimum + // size limit on the last part of your multipart upload. + // + // We pick the default value as 8 MiB for better thoughput. + // + // TODO: allow this value to be configured. + buffer_size: 8 * 1024 * 1024, } } -} -#[async_trait] -impl oio::Write for OssWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + async fn write_oneshot(&self, bs: Bytes) -> Result<()> { let mut req = self.core.oss_put_object_request( &self.path, Some(bs.len()), @@ -76,10 +85,20 @@ impl oio::Write for OssWriter { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let upload_id = self.upload_id.as_ref().expect( - "Writer doesn't have upload id, but users trying to call append, must be buggy", - ); + async fn initiate_upload(&self) -> Result<String> { + let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?; + match resp.status() { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let result: InitiateMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + Ok(result.upload_id) + } + _ => return Err(parse_error(resp).await?), + } + } + + async fn write_part(&self, upload_id: &str, bs: Bytes) -> Result<MultipartUploadPart> { // Aliyun OSS requires part number must between [1..=10000] let part_number = self.parts.len() + 1; let mut req = self @@ -108,13 +127,62 @@ impl oio::Write for OssWriter { })? .to_string(); resp.into_body().consume().await?; - self.parts.push(MultipartUploadPart { part_number, etag }); - Ok(()) + Ok(MultipartUploadPart { part_number, etag }) } _ => Err(parse_error(resp).await?), } } +} + +#[async_trait] +impl oio::Write for OssWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let upload_id = match &self.upload_id { + Some(upload_id) => upload_id, + None => { + if self.op.content_length().unwrap_or_default() == bs.len() as u64 { + return self.write_oneshot(bs).await; + } else { + let upload_id = self.initiate_upload().await?; + self.upload_id = Some(upload_id.clone()); + self.upload_id.as_deref().unwrap() + } + } + }; + + // Ignore empty bytes + if bs.len() == 0 { + return Ok(()); + } + + self.buffer.push(bs); + // Return directly if the buffer is not full + if self.buffer.len() <= self.buffer_size { + return Ok(()); + } + + let bs = self.buffer.peak(self.buffer_size); + + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.take(self.buffer_size); + self.parts.push(part); + Ok(()) + } + Err(e) => { + // If the upload fails, we should pop the given bs to make sure + // write is re-enter safe. + self.buffer.pop(); + Err(e) + } + } + } + + async fn append(&mut self, bs: Bytes) -> Result<()> { + todo!() + } + // TODO: we can cancel the upload by sending an abort request. async fn abort(&mut self) -> Result<()> { Err(Error::new( ErrorKind::Unsupported, diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index da2f1181..7eb7b1f9 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -959,39 +959,9 @@ impl Accessor for S3Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let upload_id = if args.append() { - let resp = self - .core - .s3_initiate_multipart_upload( - path, - args.content_type(), - args.content_disposition(), - args.cache_control(), - args.if_match(), - ) - .await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - - let result: InitiateMultipartUploadResult = - quick_xml::de::from_reader(bs.reader()) - .map_err(new_xml_deserialize_error)?; - - Some(result.upload_id) - } - _ => return Err(parse_error(resp).await?), - } - } else { - None - }; - Ok(( RpWrite::default(), - S3Writer::new(self.core.clone(), args, path.to_string(), upload_id), + S3Writer::new(self.core.clone(), &path, args), )) } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index c618b87d..4f1c36a2 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -489,7 +489,6 @@ impl S3Core { content_type: Option<&str>, content_disposition: Option<&str>, cache_control: Option<&str>, - if_match: Option<&str>, ) -> Result<Response<IncomingAsyncBody>> { let p = build_abs_path(&self.root, path); @@ -509,10 +508,6 @@ impl S3Core { req = req.header(CACHE_CONTROL, cache_control) } - if let Some(if_match) = if_match { - req = req.header(IF_MATCH, if_match) - } - // Set storage class header if let Some(v) = &self.default_storage_class { req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index d75906f9..5887cafc 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use http::StatusCode; use super::core::*; @@ -32,32 +32,34 @@ pub struct S3Writer { op: OpWrite, path: String, - upload_id: Option<String>, + parts: Vec<CompleteMultipartUploadRequestPart>, + buffer: oio::VectorCursor, + buffer_size: usize, } impl S3Writer { - pub fn new(core: Arc<S3Core>, op: OpWrite, path: String, upload_id: Option<String>) -> Self { + pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self { S3Writer { core, - + path: path.to_string(), op, - path, - upload_id, + + upload_id: None, parts: vec![], + buffer: oio::VectorCursor::new(), + // The part size must be 5 MiB to 5 GiB. There is no minimum + // size limit on the last part of your multipart upload. + // + // We pick the default value as 8 MiB for better thoughput. + // + // TODO: allow this value to be configured. + buffer_size: 8 * 1024 * 1024, } } -} - -#[async_trait] -impl oio::Write for S3Writer { - async fn write(&mut self, bs: Bytes) -> Result<()> { - debug_assert!( - self.upload_id.is_none(), - "Writer initiated with upload id, but users trying to call write, must be buggy" - ); + async fn write_oneshot(&self, bs: Bytes) -> Result<()> { let mut req = self.core.s3_put_object_request( &self.path, Some(bs.len()), @@ -82,10 +84,37 @@ impl oio::Write for S3Writer { } } - async fn append(&mut self, bs: Bytes) -> Result<()> { - let upload_id = self.upload_id.as_ref().expect( - "Writer doesn't have upload id, but users trying to call append, must be buggy", - ); + async fn initiate_upload(&self) -> Result<String> { + let resp = self + .core + .s3_initiate_multipart_upload( + &self.path, + self.op.content_type(), + self.op.content_disposition(), + self.op.cache_control(), + ) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let result: InitiateMultipartUploadResult = + quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; + + Ok(result.upload_id) + } + _ => return Err(parse_error(resp).await?), + } + } + + async fn write_part( + &self, + upload_id: &str, + bs: Bytes, + ) -> Result<CompleteMultipartUploadRequestPart> { // AWS S3 requires part number must between [1..=10000] let part_number = self.parts.len() + 1; @@ -116,15 +145,61 @@ impl oio::Write for S3Writer { resp.into_body().consume().await?; - self.parts - .push(CompleteMultipartUploadRequestPart { part_number, etag }); + Ok(CompleteMultipartUploadRequestPart { part_number, etag }) + } + _ => Err(parse_error(resp).await?), + } + } +} + +#[async_trait] +impl oio::Write for S3Writer { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let upload_id = match &self.upload_id { + Some(upload_id) => upload_id, + None => { + if self.op.content_length().unwrap_or_default() == bs.len() as u64 { + return self.write_oneshot(bs).await; + } else { + let upload_id = self.initiate_upload().await?; + self.upload_id = Some(upload_id.clone()); + self.upload_id.as_deref().unwrap() + } + } + }; + + // Ignore empty bytes + if bs.len() == 0 { + return Ok(()); + } + + self.buffer.push(bs); + // Return directly if the buffer is not full + if self.buffer.len() <= self.buffer_size { + return Ok(()); + } + + let bs = self.buffer.peak(self.buffer_size); + match self.write_part(upload_id, bs).await { + Ok(part) => { + self.buffer.take(self.buffer_size); + self.parts.push(part); Ok(()) } - _ => Err(parse_error(resp).await?), + Err(e) => { + // If the upload fails, we should pop the given bs to make sure + // write is re-enter safe. + self.buffer.pop(); + Err(e) + } } } + async fn append(&mut self, bs: Bytes) -> Result<()> { + todo!() + } + async fn abort(&mut self) -> Result<()> { let upload_id = if let Some(upload_id) = &self.upload_id { upload_id diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 23032855..2f872229 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -300,10 +300,10 @@ impl Accessor for WebdavBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 094bef24..3a8bb7c2 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -519,10 +519,10 @@ impl Accessor for WebhdfsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - if args.append() { + if args.content_length().is_none() { return Err(Error::new( ErrorKind::Unsupported, - "append write is not supported", + "write without content length is not supported", )); } diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index c6bae2dd..a985474d 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -443,7 +443,12 @@ impl BlockingOperator { /// # } /// ``` pub fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> { - self.write_with(path, OpWrite::new(), bs) + let bs = bs.into(); + self.write_with( + path, + OpWrite::new().with_content_length(bs.len() as u64), + bs, + ) } /// Copy a file from `from` to `to`. @@ -594,8 +599,11 @@ impl BlockingOperator { ); } - let (_, mut w) = self.inner().blocking_write(&path, args)?; - w.write(bs.into())?; + let bs = bs.into(); + let (_, mut w) = self + .inner() + .blocking_write(&path, args.with_content_length(bs.len() as u64))?; + w.write(bs)?; w.close()?; Ok(()) @@ -636,8 +644,8 @@ impl BlockingOperator { ); } - let op = OpWrite::default().with_append(); - BlockingWriter::create_dir(self.inner().clone(), &path, op) + let op = OpWrite::default(); + BlockingWriter::create(self.inner().clone(), &path, op) } /// Delete given path. diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 8156dcc8..0d46ebad 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -631,7 +631,13 @@ impl Operator { /// # } /// ``` pub async fn write(&self, path: &str, bs: impl Into<Bytes>) -> Result<()> { - self.write_with(path, OpWrite::new(), bs).await + let bs = bs.into(); + self.write_with( + path, + OpWrite::new().with_content_length(bs.len() as u64), + bs, + ) + .await } /// Copy a file from `from` to `to`. @@ -817,7 +823,7 @@ impl Operator { ); } - Writer::create_dir(self.inner().clone(), &path, args.with_append()).await + Writer::create(self.inner().clone(), &path, args).await } /// Write data with extra options. @@ -854,8 +860,12 @@ impl Operator { ); } - let (_, mut w) = self.inner().write(&path, args).await?; - w.write(bs.into()).await?; + let bs = bs.into(); + let (_, mut w) = self + .inner() + .write(&path, args.with_content_length(bs.len() as u64)) + .await?; + w.write(bs).await?; w.close().await?; Ok(()) diff --git a/core/src/types/ops.rs b/core/src/types/ops.rs index 616316a5..63180d5a 100644 --- a/core/src/types/ops.rs +++ b/core/src/types/ops.rs @@ -320,12 +320,10 @@ impl OpStat { /// Args for `write` operation. #[derive(Debug, Clone, Default)] pub struct OpWrite { - append: bool, - + content_length: Option<u64>, content_type: Option<String>, content_disposition: Option<String>, cache_control: Option<String>, - if_match: Option<String>, } impl OpWrite { @@ -336,13 +334,20 @@ impl OpWrite { Self::default() } - pub(crate) fn with_append(mut self) -> Self { - self.append = true; - self + /// Get the content length from op. + /// + /// The content length is the total length of the data to be written. + pub fn content_length(&self) -> Option<u64> { + self.content_length } - pub(crate) fn append(&self) -> bool { - self.append + /// Set the content length of op. + /// + /// If the content length is not set, the content length will be + /// calculated automatically by buffering part of data. + pub fn with_content_length(mut self, content_length: u64) -> Self { + self.content_length = Some(content_length); + self } /// Get the content type from option @@ -377,17 +382,6 @@ impl OpWrite { self.cache_control = Some(cache_control.to_string()); self } - - /// Set the If-Match of the option - pub fn with_if_match(mut self, if_match: &str) -> Self { - self.if_match = Some(if_match.to_string()); - self - } - - /// Get If-Match from option - pub fn if_match(&self) -> Option<&str> { - self.if_match.as_deref() - } } /// Args for `copy` operation. diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index dc4d2bfd..3705f4d6 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -52,7 +52,7 @@ impl Writer { /// /// We don't want to expose those details to users so keep this function /// in crate only. - pub(crate) async fn create_dir(acc: FusedAccessor, path: &str, op: OpWrite) -> Result<Self> { + pub(crate) async fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> Result<Self> { let (_, w) = acc.write(path, op).await?; Ok(Writer { @@ -198,7 +198,7 @@ impl BlockingWriter { /// /// We don't want to expose those details to users so keep this function /// in crate only. - pub(crate) fn create_dir(acc: FusedAccessor, path: &str, op: OpWrite) -> Result<Self> { + pub(crate) fn create(acc: FusedAccessor, path: &str, op: OpWrite) -> Result<Self> { let (_, w) = acc.blocking_write(path, op)?; Ok(BlockingWriter { inner: w })
