This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch exact-buf-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit d7306f5b9f75ebe48fef80c1536a447cb8ff04b2 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 23 18:51:23 2023 +0800 Implement exact buf write Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/cursor.rs | 55 +++++---- core/src/raw/oio/write/exact_buf_write.rs | 184 ++++++++++++++++++++++++++++++ core/src/raw/oio/write/mod.rs | 9 +- 3 files changed, 222 insertions(+), 26 deletions(-) diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs index ec4589d18..592517855 100644 --- a/core/src/raw/oio/cursor.rs +++ b/core/src/raw/oio/cursor.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::cmp::Ordering; use std::collections::VecDeque; use std::io::Read; use std::io::SeekFrom; @@ -230,7 +231,7 @@ impl ChunkedCursor { /// /// - Panics if `at > len` /// - Panics if `idx != 0`, the cursor must be reset before split. - pub fn split_off(&mut self, mut at: usize) -> Self { + pub fn split_off(&mut self, at: usize) -> Self { assert!( at <= self.len(), "split_off at must smaller than current size" @@ -241,17 +242,21 @@ impl ChunkedCursor { let mut size = self.len() - at; while let Some(mut bs) = self.inner.pop_back() { - if size > bs.len() { - size -= bs.len(); - chunks.push_front(bs); - } else if size == bs.len() { - chunks.push_front(bs); - break; - } else { - let remaining = bs.split_off(bs.len() - size); - chunks.push_front(remaining); - self.inner.push_back(bs); - break; + match size.cmp(&bs.len()) { + Ordering::Less => { + let remaining = bs.split_off(bs.len() - size); + chunks.push_front(remaining); + self.inner.push_back(bs); + break; + } + Ordering::Equal => { + chunks.push_front(bs); + break; + } + Ordering::Greater => { + size -= bs.len(); + chunks.push_front(bs); + } } } @@ -281,17 +286,21 @@ impl ChunkedCursor { let mut size = at; while let Some(mut bs) = self.inner.pop_front() { - if size > bs.len() { - size -= bs.len(); - chunks.push_back(bs); - } else if size == bs.len() { - chunks.push_back(bs); - break; - } else { - let remaining = bs.split_off(size); - chunks.push_back(bs); - self.inner.push_front(remaining); - break; + match size.cmp(&bs.len()) { + Ordering::Less => { + let remaining = bs.split_off(size); + chunks.push_back(bs); + self.inner.push_front(remaining); + break; + } + Ordering::Equal => { + chunks.push_back(bs); + break; + } + Ordering::Greater => { + size -= bs.len(); + chunks.push_back(bs); + } } } diff --git a/core/src/raw/oio/write/exact_buf_write.rs b/core/src/raw/oio/write/exact_buf_write.rs new file mode 100644 index 000000000..f1cba001b --- /dev/null +++ b/core/src/raw/oio/write/exact_buf_write.rs @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::oio::{StreamExt, Streamer}; +use crate::raw::*; +use crate::*; +use async_trait::async_trait; +use bytes::Bytes; +use std::cmp::min; + +/// ExactBufWriter is used to implement [`oio::Write`] based on exact buffer strategy: flush the +/// underlying storage when the buffered size is exactly the same as the buffer size. +/// +/// ExactBufWriter makes sure that the size of the data written to the underlying storage is exactly +/// `buffer_size` bytes. It's useful when the underlying storage requires the size to be written. +/// +/// For example, R2 requires all parts must be the same size except the last part. +/// +/// ## Notes +/// +/// ExactBufWriter is not a good choice for most cases, because it will cause more network requests. +pub struct ExactBufWriter<W: oio::Write> { + inner: W, + + /// The size for buffer, we will flush the underlying storage at the size of this buffer. + buffer_size: usize, + buffer: oio::ChunkedCursor, + + buffer_stream: Option<Streamer>, +} + +impl<W: oio::Write> ExactBufWriter<W> { + /// Create a new exact buf writer. + pub fn new(inner: W, buffer_size: usize) -> Self { + Self { + inner, + buffer_size, + buffer: oio::ChunkedCursor::new(), + buffer_stream: None, + } + } + + /// Next bytes is used to fetch bytes from buffer or input streamer. + /// + /// We need this function because we need to make sure our write is reentrant. + /// We can't mutate state unless we are sure that the write is successful. + async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> { + match self.buffer_stream.as_mut() { + None => s.next().await, + Some(bs) => match bs.next().await { + None => { + self.buffer_stream = None; + s.next().await + } + Some(v) => Some(v), + }, + } + } + + fn chain_stream(&mut self, s: Streamer) { + self.buffer_stream = match self.buffer_stream.take() { + Some(stream) => Some(Box::new(stream.chain(s))), + None => Some(s), + } + } +} + +#[async_trait] +impl<W: oio::Write> oio::Write for ExactBufWriter<W> { + async fn write(&mut self, bs: Bytes) -> Result<()> { + self.sink(bs.len() as u64, Box::new(oio::Cursor::from(bs))) + .await + } + + async fn sink(&mut self, size: u64, mut s: Streamer) -> Result<()> { + // Collect the stream into buffer directly if the buffet is not full. + if self.buffer.len() as u64 + size < self.buffer_size as u64 { + self.buffer.push(s.collect().await?); + return Ok(()); + } + + if self.buffer.len() > self.buffer_size { + let buf = self.buffer.clone(); + let to_write = self.buffer.split_to(self.buffer_size); + return self + .inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await + // Replace buffer with remaining if the write is successful. + .map(|_| { + self.buffer = buf; + self.chain_stream(s); + }); + } + + let mut buf = self.buffer.clone(); + while buf.len() < self.buffer_size { + let bs = self.next_bytes(&mut s).await.transpose()?; + match bs { + None => break, + Some(bs) => buf.push(bs), + } + } + + // Return directly if the buffer is not full. + // + // We don't need to chain stream here because it must be consumed. + if buf.len() < self.buffer_size { + self.buffer = buf; + return Ok(()); + } + + let to_write = buf.split_to(self.buffer_size); + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await + // Replace buffer with remaining if the write is successful. + .map(|_| { + self.buffer = buf; + self.chain_stream(s); + }) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + self.buffer_stream = None; + + self.inner.abort().await + } + + async fn close(&mut self) -> Result<()> { + loop { + if let Some(stream) = self.buffer_stream.as_mut() { + let bs = stream.next().await.transpose()?; + match bs { + None => { + self.buffer_stream = None; + break; + } + Some(bs) => self.buffer.push(bs), + } + } + + let mut buf = self.buffer.clone(); + if buf.len() >= self.buffer_size { + let to_write = buf.split_to(self.buffer_size); + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await + // Replace buffer with remaining if the write is successful. + .map(|_| { + self.buffer = buf; + })? + } + } + + while !self.buffer.is_empty() { + let mut buf = self.buffer.clone(); + let to_write = buf.split_to(min(self.buffer_size, buf.len())); + + self.inner + .sink(to_write.len() as u64, Box::new(to_write)) + .await + // Replace buffer with remaining if the write is successful. + .map(|_| self.buffer = buf)?; + } + + self.inner.close().await + } +} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index 49dd94b2b..d06bacb2c 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -35,9 +35,12 @@ mod append_object_write; pub use append_object_write::AppendObjectWrite; pub use append_object_write::AppendObjectWriter; -mod at_least_buf_write; -pub use at_least_buf_write::AtLeastBufWriter; - mod one_shot_write; pub use one_shot_write::OneShotWrite; pub use one_shot_write::OneShotWriter; + +mod at_least_buf_write; +pub use at_least_buf_write::AtLeastBufWriter; + +mod exact_buf_write; +pub use exact_buf_write::ExactBufWriter;
