This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch buffer-refactor in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c6a5d8ef7d2ad361c5c74a9469cbdbf916689241 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 22 15:49:37 2023 +0800 Refactor Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/cursor.rs | 22 ++++ core/src/raw/oio/write/at_least_buf_write.rs | 11 ++ core/src/raw/oio/write/mod.rs | 5 + core/src/raw/oio/write/multipart_upload_write.rs | 139 ++++------------------- core/src/raw/oio/write/one_shot_write.rs | 69 +++++++++++ core/src/services/cos/backend.rs | 3 +- core/src/services/cos/writer.rs | 10 +- core/src/services/obs/backend.rs | 3 +- core/src/services/obs/writer.rs | 12 +- core/src/services/oss/backend.rs | 3 +- core/src/services/oss/writer.rs | 10 +- core/src/services/s3/writer.rs | 15 +-- 12 files changed, 160 insertions(+), 142 deletions(-) diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index 1080f78e9..7ef537025 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -45,6 +45,11 @@ impl Cursor { let len = self.pos.min(self.inner.len() as u64) as usize; &self.inner.as_ref()[len..] } + + /// Return the length of remaining slice. + pub fn len(&self) -> usize { + self.inner.len() - self.pos as usize + } } impl From<Bytes> for Cursor { @@ -148,6 +153,23 @@ impl oio::BlockingRead for Cursor { } } +impl oio::Stream for Cursor { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + if self.is_empty() { + return Poll::Ready(None); + } + + let bs = self.inner.clone(); + self.pos += bs.len() as u64; + Poll::Ready(Some(Ok(bs))) + } + + fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { + self.pos = 0; + Poll::Ready(Ok(())) + } +} + /// # TODO /// /// we can do some compaction during runtime. For example, merge 4K data diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 98a237970..91da42bbc 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -31,6 +31,17 @@ pub struct AtLeastBufWriter<W: oio::Write> { buf: oio::ChunkedCursor, } +impl<W: oio::Write> AtLeastBufWriter<W> { + /// Create a new at least buf writer. + pub fn new(inner: W, size: usize) -> Self { + Self { + inner, + size, + buf: oio::ChunkedCursor::new(), + } + } +} + #[async_trait] impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> { async fn write(&mut self, bs: Bytes) -> Result<()> { diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index fa49ea6e2..7994e2078 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -35,3 +35,8 @@ pub use append_object_write::AppendObjectWrite; pub use append_object_write::AppendObjectWriter; mod at_least_buf_write; +pub use at_least_buf_write::AtLeastBufWriter; + +mod one_shot_write; +pub use one_shot_write::OneShotWrite; +pub use one_shot_write::OneShotWriter; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index 8ca10c462..c013b24c3 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -21,8 +21,6 @@ use bytes::Bytes; use crate::raw::*; use crate::*; -const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; - /// MultipartUploadWrite is used to implement [`Write`] based on multipart /// uploads. By implementing MultipartUploadWrite, services don't need to /// care about the details of buffering and uploading parts. @@ -34,12 +32,6 @@ const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024; /// - Expose `MultipartUploadWriter` as `Accessor::Writer` #[async_trait] pub trait MultipartUploadWrite: Send + Sync + Unpin { - /// write_once write all data at once. - /// - /// MultipartUploadWriter will call this API when the size of data is - /// already known. - async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>; - /// initiate_part will call start a multipart upload and return the upload id. /// /// MultipartUploadWriter will call this when: @@ -94,39 +86,32 @@ pub struct MultipartUploadPart { /// - Allow users to switch to un-buffered mode if users write 16MiB every time. pub struct MultipartUploadWriter<W: MultipartUploadWrite> { inner: W, - total_size: Option<u64>, upload_id: Option<String>, parts: Vec<MultipartUploadPart>, - buffer: oio::VectorCursor, - buffer_size: usize, } impl<W: MultipartUploadWrite> MultipartUploadWriter<W> { /// Create a new MultipartUploadWriter. - pub fn new(inner: W, total_size: Option<u64>) -> Self { + pub fn new(inner: W) -> Self { Self { inner, - total_size, upload_id: None, parts: Vec::new(), - buffer: oio::VectorCursor::new(), - buffer_size: DEFAULT_WRITE_MIN_SIZE, } } - /// Configure the write_min_size. - /// - /// write_min_size is used to control the size of internal buffer. - /// - /// MultipartUploadWriter will flush the buffer to upload a part when - /// the size of buffer is larger than write_min_size. - /// - /// This value is default to 8 MiB (as recommended by AWS). - pub fn with_write_min_size(mut self, v: usize) -> Self { - self.buffer_size = v; - self + /// Get the upload id. Initiate a new multipart upload if the upload id is empty. + pub async fn upload_id(&mut self) -> Result<String> { + match &self.upload_id { + Some(upload_id) => Ok(upload_id.to_string()), + None => { + let upload_id = self.inner.initiate_part().await?; + self.upload_id = Some(upload_id.clone()); + Ok(upload_id) + } + } } } @@ -136,88 +121,28 @@ where W: MultipartUploadWrite, { async fn write(&mut self, bs: Bytes) -> Result<()> { - let upload_id = match &self.upload_id { - Some(upload_id) => upload_id, - None => { - if self.total_size.unwrap_or_default() == bs.len() as u64 { - return self - .inner - .write_once(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } - - let upload_id = self.inner.initiate_part().await?; - self.upload_id = Some(upload_id); - self.upload_id.as_deref().unwrap() - } - }; + let upload_id = self.upload_id().await?; - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } - - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.buffer_size { - return Ok(()); - } - - let bs = self.buffer.peak_at_least(self.buffer_size); let size = bs.len(); - match self - .inner + self.inner .write_part( - upload_id, + &upload_id, self.parts.len(), size as u64, AsyncBody::Bytes(bs), ) .await - { - Ok(part) => { - self.buffer.take(size); - self.parts.push(part); - Ok(()) - } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } + .map(|v| self.parts.push(v)) } async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - if !self.buffer.is_empty() { - return Err(Error::new( - ErrorKind::InvalidInput, - "Writer::sink should not be used mixed with existing buffer", - )); - } - - let upload_id = match &self.upload_id { - Some(upload_id) => upload_id, - None => { - if self.total_size.unwrap_or_default() == size { - return self.inner.write_once(size, AsyncBody::Stream(s)).await; - } - - let upload_id = self.inner.initiate_part().await?; - self.upload_id = Some(upload_id); - self.upload_id.as_deref().unwrap() - } - }; + let upload_id = self.upload_id().await?; - let part = self - .inner - .write_part(upload_id, self.parts.len(), size, AsyncBody::Stream(s)) - .await?; - self.parts.push(part); - - Ok(()) + self.inner + .write_part(&upload_id, self.parts.len(), size, AsyncBody::Stream(s)) + .await + .map(|v| self.parts.push(v)) } async fn close(&mut self) -> Result<()> { @@ -227,30 +152,6 @@ where return Ok(()); }; - // Make sure internal buffer has been flushed. - if !self.buffer.is_empty() { - let bs = self.buffer.peak_exact(self.buffer.len()); - - match self - .inner - .write_part( - upload_id, - self.parts.len(), - bs.len() as u64, - AsyncBody::Bytes(bs), - ) - .await - { - Ok(part) => { - self.buffer.clear(); - self.parts.push(part); - } - Err(e) => { - return Err(e); - } - } - } - self.inner.complete_part(upload_id, &self.parts).await } diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs new file mode 100644 index 000000000..c2ad25c83 --- /dev/null +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::*; +use crate::*; +use async_trait::async_trait; +use bytes::Bytes; + +/// OneShotWrite is used to implement [`Write`] based on one shot operation. +/// By implementing OneShotWrite, services don't need to care about the details. +/// +/// For example, S3 `PUT Object` and fs `write_all`. +/// +/// The layout after adopting [`OneShotWrite`]: +#[async_trait] +pub trait OneShotWrite: Send + Sync + Unpin { + /// write_once write all data at once. + /// + /// Implementations should make sure that the data is written correctly at once. + async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>; +} + +/// OneShotWrite is used to implement [`Write`] based on one shot. +pub struct OneShotWriter<W: OneShotWrite> { + inner: W, +} + +impl<W: OneShotWrite> OneShotWriter<W> { + /// Create a new one shot writer. + pub fn new(inner: W) -> Self { + Self { inner } + } +} + +#[async_trait] +impl<W: OneShotWrite> oio::Write for OneShotWriter<W> { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let cursor = oio::Cursor::from(bs); + self.inner + .write_once(cursor.len() as u64, Box::new(cursor)) + .await + } + + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + self.inner.write_once(size, s).await + } + + async fn abort(&mut self) -> Result<()> { + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + Ok(()) + } +} diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 9dc4b1b66..62a968ea0 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -364,8 +364,7 @@ impl Accessor for CosBackend { oio::TwoWaysWriter::Right(w) } else { - let w = oio::MultipartUploadWriter::new(writer, args.content_length()) - .with_write_min_size(self.core.write_min_size); + let w = oio::MultipartUploadWriter::new(writer); oio::TwoWaysWriter::Left(w) }; diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs index e64bc23a1..cdcbb9b18 100644 --- a/core/src/services/cos/writer.rs +++ b/core/src/services/cos/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -44,15 +45,15 @@ impl CosWriter { } #[async_trait] -impl oio::MultipartUploadWrite for CosWriter { - async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { +impl oio::OneShotWrite for CosWriter { + async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { let mut req = self.core.cos_put_object_request( &self.path, Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - body, + AsyncBody::Stream(stream), )?; self.core.sign(&mut req).await?; @@ -69,7 +70,10 @@ impl oio::MultipartUploadWrite for CosWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::MultipartUploadWrite for CosWriter { async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index e90308eda..ce2e83057 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -458,8 +458,7 @@ impl Accessor for ObsBackend { oio::TwoWaysWriter::Right(w) } else { - let w = oio::MultipartUploadWriter::new(writer, args.content_length()) - .with_write_min_size(self.core.write_min_size); + let w = oio::MultipartUploadWriter::new(writer); oio::TwoWaysWriter::Left(w) }; diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index efcfd3ab6..893a098d7 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -23,7 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; -use crate::raw::oio::MultipartUploadPart; +use crate::raw::oio::{MultipartUploadPart, Streamer}; use crate::raw::*; use crate::*; @@ -43,15 +43,16 @@ impl ObsWriter { } } } + #[async_trait] -impl oio::MultipartUploadWrite for ObsWriter { - async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { +impl oio::OneShotWrite for ObsWriter { + async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { let mut req = self.core.obs_put_object_request( &self.path, Some(size), self.op.content_type(), self.op.cache_control(), - body, + AsyncBody::Stream(stream), )?; self.core.sign(&mut req).await?; @@ -68,7 +69,10 @@ impl oio::MultipartUploadWrite for ObsWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::MultipartUploadWrite for ObsWriter { async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 6704d158b..1b6d1f33e 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -486,8 +486,7 @@ impl Accessor for OssBackend { oio::TwoWaysWriter::Right(w) } else { - let w = oio::MultipartUploadWriter::new(writer, args.content_length()) - .with_write_min_size(self.core.write_min_size); + let w = oio::MultipartUploadWriter::new(writer); oio::TwoWaysWriter::Left(w) }; diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs index 9faad249f..ff9c8f9d2 100644 --- a/core/src/services/oss/writer.rs +++ b/core/src/services/oss/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -44,15 +45,15 @@ impl OssWriter { } #[async_trait] -impl oio::MultipartUploadWrite for OssWriter { - async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { +impl oio::OneShotWrite for OssWriter { + async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { let mut req = self.core.oss_put_object_request( &self.path, Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - body, + AsyncBody::Stream(stream), false, )?; @@ -70,7 +71,10 @@ impl oio::MultipartUploadWrite for OssWriter { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::MultipartUploadWrite for OssWriter { async fn initiate_part(&self) -> Result<String> { let resp = self .core diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 508716d14..c323dd526 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::*; use super::error::parse_error; +use crate::raw::oio::Streamer; use crate::raw::*; use crate::*; @@ -35,29 +36,26 @@ pub struct S3Writer { impl S3Writer { pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> oio::MultipartUploadWriter<Self> { - let write_min_size = core.write_min_size; - - let total_size = op.content_length(); let s3_writer = S3Writer { core, path: path.to_string(), op, }; - oio::MultipartUploadWriter::new(s3_writer, total_size).with_write_min_size(write_min_size) + oio::MultipartUploadWriter::new(s3_writer) } } #[async_trait] -impl oio::MultipartUploadWrite for S3Writer { - async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> { +impl oio::OneShotWrite for S3Writer { + async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> { let mut req = self.core.s3_put_object_request( &self.path, Some(size), self.op.content_type(), self.op.content_disposition(), self.op.cache_control(), - body, + AsyncBody::Stream(stream), )?; self.core.sign(&mut req).await?; @@ -74,7 +72,10 @@ impl oio::MultipartUploadWrite for S3Writer { _ => Err(parse_error(resp).await?), } } +} +#[async_trait] +impl oio::MultipartUploadWrite for S3Writer { async fn initiate_part(&self) -> Result<String> { let resp = self .core
