This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9535d0222 refactor: Polish RangeWrite implementation to remove the
extra buffer logic (#3038)
9535d0222 is described below
commit 9535d02227f50449073f2b5acd2f9169e71f5a23
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 20:29:22 2023 +0800
refactor: Polish RangeWrite implementation to remove the extra buffer logic
(#3038)
* Build pass
Signed-off-by: Xuanwo <[email protected]>
* Save code
Signed-off-by: Xuanwo <[email protected]>
* Fix panic
Signed-off-by: Xuanwo <[email protected]>
* Fix gcs
Signed-off-by: Xuanwo <[email protected]>
* Remove chunked cursor
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/http_util/body.rs | 4 +
core/src/raw/http_util/client.rs | 1 +
core/src/raw/http_util/multipart.rs | 1 +
core/src/raw/oio/buf/chunked_bytes.rs | 22 +-
core/src/raw/oio/cursor.rs | 378 -----------------------
core/src/raw/oio/mod.rs | 1 -
core/src/raw/oio/write/multipart_upload_write.rs | 17 +-
core/src/raw/oio/write/range_write.rs | 151 +++++----
core/src/services/gcs/backend.rs | 14 +-
core/src/services/gcs/core.rs | 3 +
core/src/services/gcs/writer.rs | 15 +-
11 files changed, 126 insertions(+), 481 deletions(-)
diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs
index 7583f4437..474f74897 100644
--- a/core/src/raw/http_util/body.rs
+++ b/core/src/raw/http_util/body.rs
@@ -40,6 +40,10 @@ pub enum AsyncBody {
Empty,
/// Body with bytes.
Bytes(Bytes),
+ /// Body with chunked bytes.
+ ///
+ /// This is nearly the same with stream, but we can save an extra box.
+ ChunkedBytes(oio::ChunkedBytes),
/// Body with stream.
Stream(oio::Streamer),
}
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index d0926d1cd..783eaf8fb 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -101,6 +101,7 @@ impl HttpClient {
req_builder = match body {
AsyncBody::Empty => req_builder.body(reqwest::Body::from("")),
AsyncBody::Bytes(bs) => req_builder.body(reqwest::Body::from(bs)),
+ AsyncBody::ChunkedBytes(bs) =>
req_builder.body(reqwest::Body::wrap_stream(bs)),
AsyncBody::Stream(s) =>
req_builder.body(reqwest::Body::wrap_stream(s)),
};
diff --git a/core/src/raw/http_util/multipart.rs
b/core/src/raw/http_util/multipart.rs
index edc061c1f..a6c6f7652 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -410,6 +410,7 @@ impl MixedPart {
bs.len() as u64,
Some(Box::new(oio::Cursor::from(bs)) as Streamer),
),
+ AsyncBody::ChunkedBytes(bs) => (bs.len() as u64, Some(Box::new(bs)
as Streamer)),
AsyncBody::Stream(stream) => {
let len = parts
.headers
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs
b/core/src/raw/oio/buf/chunked_bytes.rs
index 82447dc62..810c0dfda 100644
--- a/core/src/raw/oio/buf/chunked_bytes.rs
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -16,9 +16,11 @@
// under the License.
use bytes::{Bytes, BytesMut};
+use futures::Stream;
use std::cmp::min;
use std::collections::VecDeque;
use std::io::IoSlice;
+use std::pin::Pin;
use std::task::{Context, Poll};
use crate::raw::*;
@@ -74,9 +76,9 @@ impl ChunkedBytes {
/// Reference:
<https://doc.rust-lang.org/stable/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT,+A%3E%3E-for-VecDeque%3CT,+A%3E>
pub fn from_vec(bs: Vec<Bytes>) -> Self {
Self {
+ size: bs.iter().map(|v| v.len()).sum(),
frozen: bs.into(),
active: BytesMut::new(),
- size: 0,
chunk_size: DEFAULT_CHUNK_SIZE,
}
@@ -332,6 +334,24 @@ impl oio::Stream for ChunkedBytes {
}
}
+impl Stream for ChunkedBytes {
+ type Item = Result<Bytes>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ match self.frozen.pop_front() {
+ Some(bs) => {
+ self.size -= bs.len();
+ Poll::Ready(Some(Ok(bs)))
+ }
+ None if !self.active.is_empty() => {
+ self.size -= self.active.len();
+ Poll::Ready(Some(Ok(self.active.split().freeze())))
+ }
+ None => Poll::Ready(None),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use log::debug;
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index ddead52c9..0cc4e42a5 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp::Ordering;
-use std::collections::VecDeque;
use std::io::Read;
use std::io::SeekFrom;
use std::task::Context;
@@ -174,379 +172,3 @@ impl oio::Stream for Cursor {
Poll::Ready(Ok(()))
}
}
-
-/// ChunkedCursor is used represents a non-contiguous bytes in memory.
-///
-/// This is useful when we buffer users' random writes without copy.
ChunkedCursor implements
-/// [`oio::Stream`] so it can be used in [`oio::Write::copy_from`] directly.
-///
-/// # TODO
-///
-/// we can do some compaction during runtime. For example, merge 4K data
-/// into the same bytes instead.
-#[derive(Clone)]
-pub struct ChunkedCursor {
- inner: VecDeque<Bytes>,
- idx: usize,
-}
-
-impl Default for ChunkedCursor {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl ChunkedCursor {
- /// Create a new chunked cursor.
- pub fn new() -> Self {
- Self {
- inner: VecDeque::new(),
- idx: 0,
- }
- }
-
- /// Returns `true` if current cursor is empty.
- pub fn is_empty(&self) -> bool {
- self.inner.len() <= self.idx
- }
-
- /// Return current bytes size of cursor.
- pub fn len(&self) -> usize {
- self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
- }
-
- /// Clear the entire cursor.
- pub fn clear(&mut self) {
- self.idx = 0;
- self.inner.clear();
- }
-
- /// Push a new bytes into vector cursor.
- pub fn push(&mut self, bs: Bytes) {
- self.inner.push_back(bs);
- }
-
- /// split_off will split the cursor into two cursors at given size.
- ///
- /// After split, `self` will contains the `0..at` part and the returned
cursor contains
- /// `at..` parts.
- ///
- /// # Panics
- ///
- /// - Panics if `at > len`
- /// - Panics if `idx != 0`, the cursor must be reset before split.
- pub fn split_off(&mut self, at: usize) -> Self {
- assert!(
- at <= self.len(),
- "split_off at must smaller than current size"
- );
- assert_eq!(self.idx, 0, "split_off must reset cursor first");
-
- let mut chunks = VecDeque::new();
- let mut size = self.len() - at;
-
- while let Some(mut bs) = self.inner.pop_back() {
- match size.cmp(&bs.len()) {
- Ordering::Less => {
- let remaining = bs.split_off(bs.len() - size);
- chunks.push_front(remaining);
- self.inner.push_back(bs);
- break;
- }
- Ordering::Equal => {
- chunks.push_front(bs);
- break;
- }
- Ordering::Greater => {
- size -= bs.len();
- chunks.push_front(bs);
- }
- }
- }
-
- Self {
- inner: chunks,
- idx: 0,
- }
- }
-
- /// split_to will split the cursor into two cursors at given size.
- ///
- /// After split, `self` will contains the `at..` part and the returned
cursor contains
- /// `0..at` parts.
- ///
- /// # Panics
- ///
- /// - Panics if `at > len`
- /// - Panics if `idx != 0`, the cursor must be reset before split.
- pub fn split_to(&mut self, at: usize) -> Self {
- assert!(
- at <= self.len(),
- "split_to at must smaller than current size"
- );
- assert_eq!(self.idx, 0, "split_to must reset cursor first");
-
- let mut chunks = VecDeque::new();
- let mut size = at;
-
- while let Some(mut bs) = self.inner.pop_front() {
- match size.cmp(&bs.len()) {
- Ordering::Less => {
- let remaining = bs.split_off(size);
- chunks.push_back(bs);
- self.inner.push_front(remaining);
- break;
- }
- Ordering::Equal => {
- chunks.push_back(bs);
- break;
- }
- Ordering::Greater => {
- size -= bs.len();
- chunks.push_back(bs);
- }
- }
- }
-
- Self {
- inner: chunks,
- idx: 0,
- }
- }
-
- #[cfg(test)]
- fn concat(&self) -> Bytes {
- let mut bs = bytes::BytesMut::new();
- for v in self.inner.iter().skip(self.idx) {
- bs.extend_from_slice(v);
- }
- bs.freeze()
- }
-}
-
-impl oio::Stream for ChunkedCursor {
- fn poll_next(&mut self, _: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
- if self.is_empty() {
- return Poll::Ready(None);
- }
-
- let bs = self.inner[self.idx].clone();
- self.idx += 1;
- Poll::Ready(Some(Ok(bs)))
- }
-
- fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- self.idx = 0;
- Poll::Ready(Ok(()))
- }
-}
-
-#[cfg(test)]
-mod tests {
- use pretty_assertions::assert_eq;
- use rand::thread_rng;
- use rand::Rng;
- use rand::RngCore;
- use sha2::Digest;
- use sha2::Sha256;
-
- use super::*;
- use crate::raw::oio::StreamExt;
-
- #[tokio::test]
- async fn test_chunked_cursor() -> Result<()> {
- let mut c = ChunkedCursor::new();
-
- c.push(Bytes::from("hello"));
- assert_eq!(c.len(), 5);
- assert!(!c.is_empty());
-
- c.push(Bytes::from("world"));
- assert_eq!(c.len(), 10);
- assert!(!c.is_empty());
-
- let bs = c.next().await.unwrap().unwrap();
- assert_eq!(bs, Bytes::from("hello"));
- assert_eq!(c.len(), 5);
- assert!(!c.is_empty());
-
- let bs = c.next().await.unwrap().unwrap();
- assert_eq!(bs, Bytes::from("world"));
- assert_eq!(c.len(), 0);
- assert!(c.is_empty());
-
- c.reset().await?;
- assert_eq!(c.len(), 10);
- assert!(!c.is_empty());
-
- c.clear();
- assert_eq!(c.len(), 0);
- assert!(c.is_empty());
-
- Ok(())
- }
-
- #[test]
- fn test_chunked_cursor_split_to() {
- let mut base = ChunkedCursor::new();
- base.push(Bytes::from("Hello"));
- base.push(Bytes::from("Wor"));
- base.push(Bytes::from("ld"));
-
- // Case 1: split less than first chunk
- let mut c1 = base.clone();
- let c2 = c1.split_to(3);
-
- assert_eq!(c1.len(), 7);
- assert_eq!(
- &c1.inner,
- &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
- );
-
- assert_eq!(c2.len(), 3);
- assert_eq!(&c2.inner, &[Bytes::from("Hel")]);
-
- // Case 2: split larger than first chunk
- let mut c1 = base.clone();
- let c2 = c1.split_to(6);
-
- assert_eq!(c1.len(), 4);
- assert_eq!(&c1.inner, &[Bytes::from("or"), Bytes::from("ld")]);
-
- assert_eq!(c2.len(), 6);
- assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("W")]);
-
- // Case 3: split at chunk edge
- let mut c1 = base.clone();
- let c2 = c1.split_to(8);
-
- assert_eq!(c1.len(), 2);
- assert_eq!(&c1.inner, &[Bytes::from("ld")]);
-
- assert_eq!(c2.len(), 8);
- assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);
- }
-
- #[test]
- fn test_chunked_cursor_split_off() {
- let mut base = ChunkedCursor::new();
- base.push(Bytes::from("Hello"));
- base.push(Bytes::from("Wor"));
- base.push(Bytes::from("ld"));
-
- // Case 1: split less than first chunk
- let mut c1 = base.clone();
- let c2 = c1.split_off(3);
-
- assert_eq!(c1.len(), 3);
- assert_eq!(&c1.inner, &[Bytes::from("Hel")]);
-
- assert_eq!(c2.len(), 7);
- assert_eq!(
- &c2.inner,
- &[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
- );
-
- // Case 2: split larger than first chunk
- let mut c1 = base.clone();
- let c2 = c1.split_off(6);
-
- assert_eq!(c1.len(), 6);
- assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("W")]);
-
- assert_eq!(c2.len(), 4);
- assert_eq!(&c2.inner, &[Bytes::from("or"), Bytes::from("ld")]);
-
- // Case 3: split at chunk edge
- let mut c1 = base.clone();
- let c2 = c1.split_off(8);
-
- assert_eq!(c1.len(), 8);
- assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);
-
- assert_eq!(c2.len(), 2);
- assert_eq!(&c2.inner, &[Bytes::from("ld")]);
- }
-
- #[test]
- fn test_fuzz_chunked_cursor_split_to() {
- let mut rng = thread_rng();
- let mut expected = vec![];
- let mut total_size = 0;
-
- let mut cursor = ChunkedCursor::new();
-
- // Build Cursor
- let count = rng.gen_range(1..1000);
- for _ in 0..count {
- let size = rng.gen_range(1..100);
- let mut content = vec![0; size];
- rng.fill_bytes(&mut content);
- total_size += size;
-
- expected.extend_from_slice(&content);
- cursor.push(Bytes::from(content));
- }
-
- // Test Cursor
- for _ in 0..count {
- let mut cursor = cursor.clone();
-
- let at = rng.gen_range(0..total_size);
- let to = cursor.split_to(at);
-
- assert_eq!(cursor.len(), total_size - at);
- assert_eq!(
- format!("{:x}", Sha256::digest(&cursor.concat())),
- format!("{:x}", Sha256::digest(&expected[at..])),
- );
-
- assert_eq!(to.len(), at);
- assert_eq!(
- format!("{:x}", Sha256::digest(&to.concat())),
- format!("{:x}", Sha256::digest(&expected[0..at])),
- );
- }
- }
-
- #[test]
- fn test_fuzz_chunked_cursor_split_off() {
- let mut rng = thread_rng();
- let mut expected = vec![];
- let mut total_size = 0;
-
- let mut cursor = ChunkedCursor::new();
-
- // Build Cursor
- let count = rng.gen_range(1..1000);
- for _ in 0..count {
- let size = rng.gen_range(1..100);
- let mut content = vec![0; size];
- rng.fill_bytes(&mut content);
- total_size += size;
-
- expected.extend_from_slice(&content);
- cursor.push(Bytes::from(content));
- }
-
- // Test Cursor
- for _ in 0..count {
- let mut cursor = cursor.clone();
-
- let at = rng.gen_range(0..total_size);
- let off = cursor.split_off(at);
-
- assert_eq!(cursor.len(), at);
- assert_eq!(
- format!("{:x}", Sha256::digest(&cursor.concat())),
- format!("{:x}", Sha256::digest(&expected[..at])),
- );
-
- assert_eq!(off.len(), total_size - at);
- assert_eq!(
- format!("{:x}", Sha256::digest(&off.concat())),
- format!("{:x}", Sha256::digest(&expected[at..])),
- );
- }
- }
-}
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 29cf8e474..9e52acfd2 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -35,7 +35,6 @@ mod page;
pub use page::*;
mod cursor;
-pub use cursor::ChunkedCursor;
pub use cursor::Cursor;
mod entry;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index ec42275f0..6edeb96a8 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -21,7 +21,6 @@ use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
-use bytes::Bytes;
use futures::future::BoxFuture;
use crate::raw::*;
@@ -109,7 +108,7 @@ pub struct MultipartUploadPart {
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
state: State<W>,
- cache: Option<Bytes>,
+ cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
}
@@ -163,7 +162,7 @@ where
&upload_id,
part_number,
size as u64,
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)
.await;
@@ -174,7 +173,8 @@ where
// Fill cache with the first write.
if self.cache.is_none() {
let size = bs.remaining();
- self.cache = Some(bs.bytes(size));
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ self.cache = Some(cb);
return Poll::Ready(Ok(size));
}
@@ -197,7 +197,8 @@ where
self.parts.push(part?);
// Replace the cache when last write succeeded
let size = bs.remaining();
- self.cache = Some(bs.bytes(size));
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ self.cache = Some(cb);
return Poll::Ready(Ok(size));
}
State::Close(_) => {
@@ -232,7 +233,7 @@ where
&upload_id,
parts.len(),
size as u64,
- AsyncBody::Bytes(bs),
+ AsyncBody::ChunkedBytes(bs),
)
.await;
(w, part)
@@ -250,7 +251,9 @@ where
Some(bs) => {
self.state = State::Close(Box::pin(async move {
let size = bs.len();
- let res = w.write_once(size as u64,
AsyncBody::Bytes(bs)).await;
+ let res = w
+ .write_once(size as u64,
AsyncBody::ChunkedBytes(bs))
+ .await;
(w, res)
}));
}
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 9dfe73197..9ae017a5e 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -50,6 +50,13 @@ use crate::*;
/// - Expose `RangeWriter` as `Accessor::Writer`
#[async_trait]
pub trait RangeWrite: Send + Sync + Unpin + 'static {
+ /// write_once is used to write the data to underlying storage at once.
+ ///
+ /// RangeWriter will call this API when:
+ ///
+ /// - All the data has been written to the buffer and we can perform the
upload at once.
+ async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
+
/// Initiate range the range write, the returning value is the location.
async fn initiate_range(&self) -> Result<String>;
@@ -79,8 +86,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
pub struct RangeWriter<W: RangeWrite> {
location: Option<String>,
written: u64,
- align_size: usize,
- align_buffer: oio::ChunkedCursor,
+ buffer: Option<oio::ChunkedBytes>,
state: State<W>,
}
@@ -88,8 +94,7 @@ pub struct RangeWriter<W: RangeWrite> {
enum State<W> {
Idle(Option<W>),
Init(BoxFuture<'static, (W, Result<String>)>),
- /// The returning value is (consume size, written size).
- Write(BoxFuture<'static, (W, Result<(usize, u64)>)>),
+ Write(BoxFuture<'static, (W, Result<u64>)>),
Complete(BoxFuture<'static, (W, Result<()>)>),
Abort(BoxFuture<'static, (W, Result<()>)>),
}
@@ -105,25 +110,11 @@ impl<W: RangeWrite> RangeWriter<W> {
Self {
state: State::Idle(Some(inner)),
+ buffer: None,
location: None,
written: 0,
- align_size: 256 * 1024,
- align_buffer: oio::ChunkedCursor::default(),
}
}
-
- /// Set the align size.
- ///
- /// The size is default to 256 KiB.
- ///
- /// # Note
- ///
- /// Please don't mix this with the buffer size. Align size is usually the
hard
- /// limit for the service to accept the chunk.
- pub fn with_align_size(mut self, size: usize) -> Self {
- self.align_size = size;
- self
- }
}
impl<W: RangeWrite> oio::Write for RangeWriter<W> {
@@ -133,51 +124,38 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Idle(w) => {
match self.location.clone() {
Some(location) => {
- let remaining = bs.remaining();
- let current_size = self.align_buffer.len();
- let mut total_size = current_size + remaining;
-
- if total_size <= self.align_size {
- let bs = bs.bytes(remaining);
- self.align_buffer.push(bs);
- return Poll::Ready(Ok(remaining));
- }
- // If total_size is aligned, we need to write one
less chunk to make sure
- // that the file has at least one chunk during
complete stage.
- if total_size % self.align_size == 0 {
- total_size -= self.align_size;
- }
-
- let consume = total_size - total_size %
self.align_size - current_size;
- let mut align_buffer = self.align_buffer.clone();
- let bs = bs.bytes(consume);
- align_buffer.push(bs);
-
let written = self.written;
- let w = w.take().unwrap();
- let fut = async move {
- let size = align_buffer.len() as u64;
+
+ let buffer = self.buffer.clone().expect("cache
must be valid").clone();
+ let w = w.take().expect("writer must be valid");
+ self.state = State::Write(Box::pin(async move {
+ let size = buffer.len() as u64;
let res = w
.write_range(
&location,
written,
size,
-
AsyncBody::Stream(Box::new(align_buffer)),
+ AsyncBody::ChunkedBytes(buffer),
)
.await;
- (w, res.map(|_| (consume, size)))
- };
- self.state = State::Write(Box::pin(fut));
+ (w, res.map(|_| size))
+ }));
}
None => {
- let w = w.take().unwrap();
- let fut = async move {
- let res = w.initiate_range().await;
+ // Fill cache with the first write.
+ if self.buffer.is_none() {
+ let size = bs.remaining();
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ self.buffer = Some(cb);
+ return Poll::Ready(Ok(size));
+ }
- (w, res)
- };
- self.state = State::Init(Box::pin(fut));
+ let w = w.take().expect("writer must be valid");
+ self.state = State::Init(Box::pin(async move {
+ let location = w.initiate_range().await;
+ (w, location)
+ }));
}
}
}
@@ -187,15 +165,16 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
self.location = Some(res?);
}
State::Write(fut) => {
- let (w, res) = ready!(fut.poll_unpin(cx));
+ let (w, size) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
- let (consume, written) = res?;
- self.written += written;
- self.align_buffer.clear();
- // It's possible that the buffer is already aligned, no
bytes has been consumed.
- if consume != 0 {
- return Poll::Ready(Ok(consume));
- }
+ // Update the written.
+ self.written += size?;
+
+ // Replace the cache when last write succeeded
+ let size = bs.remaining();
+ let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ self.buffer = Some(cb);
+ return Poll::Ready(Ok(size));
}
State::Complete(_) => {
unreachable!("RangeWriter must not go into State::Complete
during poll_write")
@@ -211,28 +190,41 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
loop {
match &mut self.state {
State::Idle(w) => {
- let w = w.take().unwrap();
+ let w = w.take().expect("writer must be valid");
match self.location.clone() {
Some(location) => {
- let align_buffer = self.align_buffer.clone();
-
let written = self.written;
- let fut = async move {
- let size = align_buffer.len() as u64;
- let res = w
- .complete_range(
- &location,
- written,
- size,
-
AsyncBody::Stream(Box::new(align_buffer)),
- )
- .await;
-
- (w, res)
- };
- self.state = State::Complete(Box::pin(fut));
+ match self.buffer.clone() {
+ Some(bs) => {
+ self.state =
State::Complete(Box::pin(async move {
+ let res = w
+ .complete_range(
+ &location,
+ written,
+ bs.len() as u64,
+ AsyncBody::ChunkedBytes(bs),
+ )
+ .await;
+ (w, res)
+ }));
+ }
+ None => {
+ unreachable!("It's must be bug that
RangeWrite is in State::Idle with no cache but has location")
+ }
+ }
}
- None => return Poll::Ready(Ok(())),
+ None => match self.buffer.clone() {
+ Some(bs) => {
+ self.state = State::Complete(Box::pin(async
move {
+ let size = bs.len();
+ let res = w
+ .write_once(size as u64,
AsyncBody::ChunkedBytes(bs))
+ .await;
+ (w, res)
+ }));
+ }
+ None => return Poll::Ready(Ok(())),
+ },
}
}
State::Init(_) => {
@@ -244,7 +236,6 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Complete(fut) => {
let (w, res) = ready!(fut.poll_unpin(cx));
self.state = State::Idle(Some(w));
- self.align_buffer.clear();
return Poll::Ready(res);
}
State::Abort(_) => {
@@ -283,7 +274,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Abort(fut) => {
let (w, res) = ready!(fut.poll_unpin(cx));
self.state = State::Idle(Some(w));
- self.align_buffer.clear();
+ self.buffer = None;
return Poll::Ready(res);
}
}
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index f91c792a0..9e26cdce1 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
@@ -337,7 +338,7 @@ pub struct GcsBackend {
impl Accessor for GcsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = GcsWriters;
+ type Writer = oio::TwoWaysWriter<GcsWriters,
oio::ExactBufWriter<GcsWriters>>;
type BlockingWriter = ();
type Pager = GcsPager;
type BlockingPager = ();
@@ -420,10 +421,15 @@ impl Accessor for GcsBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let w = GcsWriter::new(self.core.clone(), path, args.clone());
- let w = if args.content_length().is_some() {
- GcsWriters::One(oio::OneShotWriter::new(w))
+ let w = oio::RangeWriter::new(w);
+
+ let w = if let Some(buffer_size) = args.buffer_size() {
+ // FIXME: we should align with 256KiB instead.
+ let buffer_size = max(DEFAULT_WRITE_FIXED_SIZE, buffer_size);
+
+ oio::TwoWaysWriter::Two(oio::ExactBufWriter::new(w, buffer_size))
} else {
- GcsWriters::Two(oio::RangeWriter::new(w))
+ oio::TwoWaysWriter::One(w)
};
Ok((RpWrite::default(), w))
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 06d772e06..40acf42bc 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -299,6 +299,9 @@ impl GcsCore {
AsyncBody::Bytes(bytes) => {
media_part = media_part.content(bytes);
}
+ AsyncBody::ChunkedBytes(bs) => {
+ media_part = media_part.stream(bs.len() as u64,
Box::new(bs));
+ }
AsyncBody::Stream(stream) => {
media_part = media_part.stream(size.unwrap(), stream);
}
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 1b22acd7b..d55d6d7a9 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use async_trait::async_trait;
-use bytes::Bytes;
use http::StatusCode;
use super::core::GcsCore;
@@ -26,8 +25,7 @@ use super::error::parse_error;
use crate::raw::*;
use crate::*;
-pub type GcsWriters =
- oio::TwoWaysWriter<oio::OneShotWriter<GcsWriter>,
oio::RangeWriter<GcsWriter>>;
+pub type GcsWriters = oio::RangeWriter<GcsWriter>;
pub struct GcsWriter {
core: Arc<GcsCore>,
@@ -46,13 +44,13 @@ impl GcsWriter {
}
#[async_trait]
-impl oio::OneShotWrite for GcsWriter {
- async fn write_once(&self, bs: Bytes) -> Result<()> {
+impl oio::RangeWrite for GcsWriter {
+ async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
let mut req = self.core.gcs_insert_object_request(
&percent_encode_path(&self.path),
- Some(bs.len() as u64),
+ Some(size),
&self.op,
- AsyncBody::Bytes(bs),
+ body,
)?;
self.core.sign(&mut req).await?;
@@ -69,10 +67,7 @@ impl oio::OneShotWrite for GcsWriter {
_ => Err(parse_error(resp).await?),
}
}
-}
-#[async_trait]
-impl oio::RangeWrite for GcsWriter {
async fn initiate_range(&self) -> Result<String> {
let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?;
let status = resp.status();