This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new d740edbbe refactor(core): Split buffer logic from underlying storage
operations (#2903)
d740edbbe is described below
commit d740edbbed29aa199946ed07ef94ee0d78ae32a2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 23 12:52:36 2023 +0800
refactor(core): Split buffer logic from underlying storage operations
(#2903)
* feat: Add AtLeastBufWrite
Signed-off-by: Xuanwo <[email protected]>
* Refactor
Signed-off-by: Xuanwo <[email protected]>
* Add compose
Signed-off-by: Xuanwo <[email protected]>
* refactor
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
* Avoid exeeed of buffer size
Signed-off-by: Xuanwo <[email protected]>
* Fix unit test
Signed-off-by: Xuanwo <[email protected]>
* Disable fuzz test for now
Signed-off-by: Xuanwo <[email protected]>
* use aws cli to create bucket
Signed-off-by: Xuanwo <[email protected]>
* Disable r2 instead
Signed-off-by: Xuanwo <[email protected]>
* Fix fuzz test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
.github/workflows/service_test_s3.yml | 79 ++++++-------
core/fuzz/fuzz_writer.rs | 4 +-
core/src/raw/oio/cursor.rs | 93 ++++++++++++++-
core/src/raw/oio/mod.rs | 1 +
core/src/raw/oio/stream/api.rs | 79 ++++++++++++-
core/src/raw/oio/write/append_object_write.rs | 79 +------------
core/src/raw/oio/write/at_least_buf_write.rs | 125 ++++++++++++++++++++
core/src/raw/oio/write/compose_write.rs | 136 ++++++++++++++++++++++
core/src/raw/oio/write/mod.rs | 12 +-
core/src/raw/oio/write/multipart_upload_write.rs | 139 ++++-------------------
core/src/raw/oio/write/one_shot_write.rs | 69 +++++++++++
core/src/raw/oio/write/two_ways_write.rs | 64 -----------
core/src/raw/ops.rs | 21 ++++
core/src/services/cos/backend.rs | 58 ++++------
core/src/services/cos/core.rs | 1 -
core/src/services/cos/writer.rs | 16 ++-
core/src/services/obs/backend.rs | 60 ++++------
core/src/services/obs/core.rs | 1 -
core/src/services/obs/writer.rs | 18 ++-
core/src/services/oss/backend.rs | 44 +++----
core/src/services/oss/core.rs | 1 -
core/src/services/oss/writer.rs | 16 ++-
core/src/services/s3/backend.rs | 42 ++++---
core/src/services/s3/core.rs | 1 -
core/src/services/s3/writer.rs | 24 ++--
core/src/types/operator/operator_futures.rs | 26 +++++
core/tests/behavior/write.rs | 4 +-
27 files changed, 775 insertions(+), 438 deletions(-)
diff --git a/.github/workflows/service_test_s3.yml
b/.github/workflows/service_test_s3.yml
index 80ed0ee4e..e066e375b 100644
--- a/.github/workflows/service_test_s3.yml
+++ b/.github/workflows/service_test_s3.yml
@@ -147,17 +147,18 @@ jobs:
run: |
docker-compose -f docker-compose-minio.yml up -d
- name: Setup test bucket
+ env:
+ AWS_ACCESS_KEY_ID: "minioadmin"
+ AWS_SECRET_ACCESS_KEY: "minioadmin"
+ AWS_EC2_METADATA_DISABLED: "true"
run: |
+ aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
+
curl -O https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
./mc alias set local http://127.0.0.1:9000/ minioadmin minioadmin
- ./mc mb local/test
./mc anonymous set public local/test
- while :; do
- echo "waiting minio to be ready"
- [[ "$(./mc ping --count 1 local | awk '{print $6}' | tr -d '\n')"
== "errors=1" ]] || break
- sleep 1
- done
+
- name: Setup Rust toolchain
uses: ./.github/actions/setup
with:
@@ -173,35 +174,37 @@ jobs:
OPENDAL_S3_ALLOW_ANONYMOUS: on
OPENDAL_S3_REGION: us-east-1
- r2:
- runs-on: ubuntu-latest
- if: github.event_name == 'push' ||
!github.event.pull_request.head.repo.fork
- steps:
- - uses: actions/checkout@v3
- - name: Setup Rust toolchain
- uses: ./.github/actions/setup
- with:
- need-nextest: true
-
- - name: Load secret
- id: op-load-secret
- uses: 1password/load-secrets-action@v1
- with:
- export-env: true
- env:
- OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
- OPENDAL_S3_TEST: op://services/r2/test
- OPENDAL_S3_BUCKET: op://services/r2/bucket
- OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
- OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
- OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
-
- - name: Test
- shell: bash
- working-directory: core
- run: cargo nextest run s3
- env:
- OPENDAL_S3_REGION: auto
- # This is the R2's limitation
- # Refer to
https://opendal.apache.org/docs/services/s3#compatible-services for more
information
- OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
+# Disable this test until we addressed
https://github.com/apache/incubator-opendal/issues/2904
+#
+# r2:
+# runs-on: ubuntu-latest
+# if: github.event_name == 'push' ||
!github.event.pull_request.head.repo.fork
+# steps:
+# - uses: actions/checkout@v3
+# - name: Setup Rust toolchain
+# uses: ./.github/actions/setup
+# with:
+# need-nextest: true
+#
+# - name: Load secret
+# id: op-load-secret
+# uses: 1password/load-secrets-action@v1
+# with:
+# export-env: true
+# env:
+# OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+# OPENDAL_S3_TEST: op://services/r2/test
+# OPENDAL_S3_BUCKET: op://services/r2/bucket
+# OPENDAL_S3_ENDPOINT: op://services/r2/endpoint
+# OPENDAL_S3_ACCESS_KEY_ID: op://services/r2/access_key_id
+# OPENDAL_S3_SECRET_ACCESS_KEY: op://services/r2/secret_access_key
+#
+# - name: Test
+# shell: bash
+# working-directory: core
+# run: cargo nextest run s3
+# env:
+# OPENDAL_S3_REGION: auto
+# # This is the R2's limitation
+# # Refer to
https://opendal.apache.org/docs/services/s3#compatible-services for more
information
+# OPENDAL_S3_BATCH_MAX_OPERATIONS: 700
diff --git a/core/fuzz/fuzz_writer.rs b/core/fuzz/fuzz_writer.rs
index 4c2151035..2f992b919 100644
--- a/core/fuzz/fuzz_writer.rs
+++ b/core/fuzz/fuzz_writer.rs
@@ -107,7 +107,7 @@ async fn fuzz_writer(op: Operator, input: FuzzInput) ->
Result<()> {
let checker = WriteChecker::new(total_size);
- let mut writer = op.writer(&path).await?;
+ let mut writer = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
for chunk in &checker.chunks {
writer.write(chunk.clone()).await?;
@@ -132,7 +132,7 @@ fuzz_target!(|input: FuzzInput| {
runtime.block_on(async {
fuzz_writer(op, input.clone())
.await
- .unwrap_or_else(|_| panic!("fuzz reader must succeed"));
+ .unwrap_or_else(|_| panic!("fuzz writer must succeed"));
})
}
});
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 1940c40cd..848bc1786 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -45,6 +45,11 @@ impl Cursor {
let len = self.pos.min(self.inner.len() as u64) as usize;
&self.inner.as_ref()[len..]
}
+
+ /// Return the length of remaining slice.
+ pub fn len(&self) -> usize {
+ self.inner.len() - self.pos as usize
+ }
}
impl From<Bytes> for Cursor {
@@ -148,7 +153,93 @@ impl oio::BlockingRead for Cursor {
}
}
-/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements [`oio::Read`]
+impl oio::Stream for Cursor {
+ fn poll_next(&mut self, _: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if self.is_empty() {
+ return Poll::Ready(None);
+ }
+
+ let bs = self.inner.clone();
+ self.pos += bs.len() as u64;
+ Poll::Ready(Some(Ok(bs)))
+ }
+
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ self.pos = 0;
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// # TODO
+///
+/// we can do some compaction during runtime. For example, merge 4K data
+/// into the same bytes instead.
+#[derive(Clone)]
+pub struct ChunkedCursor {
+ inner: VecDeque<Bytes>,
+ idx: usize,
+}
+
+impl Default for ChunkedCursor {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl ChunkedCursor {
+ /// Create a new chunked cursor.
+ pub fn new() -> Self {
+ Self {
+ inner: VecDeque::new(),
+ idx: 0,
+ }
+ }
+
+ /// Returns `true` if current cursor is empty.
+ pub fn is_empty(&self) -> bool {
+ self.inner.len() <= self.idx
+ }
+
+ /// Return current bytes size of cursor.
+ pub fn len(&self) -> usize {
+ self.inner.iter().skip(self.idx).map(|v| v.len()).sum()
+ }
+
+ /// Reset current cursor to start.
+ pub fn reset(&mut self) {
+ self.idx = 0;
+ }
+
+ /// Clear the entire cursor.
+ pub fn clear(&mut self) {
+ self.idx = 0;
+ self.inner.clear();
+ }
+
+ /// Push a new bytes into vector cursor.
+ pub fn push(&mut self, bs: Bytes) {
+ self.inner.push_back(bs);
+ }
+}
+
+impl oio::Stream for ChunkedCursor {
+ fn poll_next(&mut self, _: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if self.is_empty() {
+ return Poll::Ready(None);
+ }
+
+ let bs = self.inner[self.idx].clone();
+ self.idx += 1;
+ Poll::Ready(Some(Ok(bs)))
+ }
+
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ self.reset();
+ Poll::Ready(Ok(()))
+ }
+}
+
+/// VectorCursor is the cursor for [`Vec<Bytes>`] that implements
[`oio::Stream`]
pub struct VectorCursor {
inner: VecDeque<Bytes>,
size: usize,
diff --git a/core/src/raw/oio/mod.rs b/core/src/raw/oio/mod.rs
index 5a4729fe8..1b24bec9c 100644
--- a/core/src/raw/oio/mod.rs
+++ b/core/src/raw/oio/mod.rs
@@ -35,6 +35,7 @@ mod page;
pub use page::*;
mod cursor;
+pub use cursor::ChunkedCursor;
pub use cursor::Cursor;
pub use cursor::VectorCursor;
diff --git a/core/src/raw/oio/stream/api.rs b/core/src/raw/oio/stream/api.rs
index 7345564a2..495a4fb22 100644
--- a/core/src/raw/oio/stream/api.rs
+++ b/core/src/raw/oio/stream/api.rs
@@ -18,10 +18,10 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
-use std::task::Context;
use std::task::Poll;
+use std::task::{ready, Context};
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
use pin_project::pin_project;
use crate::*;
@@ -135,6 +135,29 @@ pub trait StreamExt: Stream {
fn reset(&mut self) -> ResetFuture<'_, Self> {
ResetFuture { inner: self }
}
+
+ /// Chain this stream with another stream.
+ fn chain<S>(self, other: S) -> Chain<Self, S>
+ where
+ Self: Sized,
+ S: Stream,
+ {
+ Chain {
+ first: Some(self),
+ second: other,
+ }
+ }
+
+ /// Collect all items from this stream into a single bytes.
+ fn collect(self) -> Collect<Self>
+ where
+ Self: Sized,
+ {
+ Collect {
+ stream: self,
+ buf: BytesMut::new(),
+ }
+ }
}
/// Make this future `!Unpin` for compatibility with async trait methods.
@@ -172,3 +195,55 @@ where
Pin::new(this.inner).poll_reset(cx)
}
}
+
+/// Stream for the [`chain`](StreamExt::chain) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Chain<S1: Stream, S2: Stream> {
+ first: Option<S1>,
+ second: S2,
+}
+
+impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if let Some(first) = self.first.as_mut() {
+ if let Some(item) = ready!(first.poll_next(cx)) {
+ return Poll::Ready(Some(item));
+ }
+
+ self.first = None;
+ }
+ self.second.poll_next(cx)
+ }
+
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "chained stream doesn't support reset",
+ )))
+ }
+}
+
+/// Stream for the [`collect`](StreamExt::collect) method.
+#[must_use = "streams do nothing unless polled"]
+pub struct Collect<S> {
+ stream: S,
+ buf: BytesMut,
+}
+
+impl<S> Future for Collect<S>
+where
+ S: Stream,
+{
+ type Output = Result<Bytes>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
+ let mut this = self.as_mut();
+ loop {
+ match ready!(this.stream.poll_next(cx)) {
+ Some(Ok(bs)) => this.buf.extend(bs),
+ Some(Err(err)) => return Poll::Ready(Err(err)),
+ None => return Poll::Ready(Ok(self.buf.split().freeze())),
+ }
+ }
+ }
+}
diff --git a/core/src/raw/oio/write/append_object_write.rs
b/core/src/raw/oio/write/append_object_write.rs
index 5d6eda9da..07fa546cc 100644
--- a/core/src/raw/oio/write/append_object_write.rs
+++ b/core/src/raw/oio/write/append_object_write.rs
@@ -22,8 +22,6 @@ use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
/// AppendObjectWrite is used to implement [`Write`] based on append
/// object. By implementing AppendObjectWrite, services don't need to
/// care about the details of buffering and uploading parts.
@@ -53,8 +51,6 @@ pub struct AppendObjectWriter<W: AppendObjectWrite> {
inner: W,
offset: Option<u64>,
- buffer: oio::VectorCursor,
- buffer_size: usize,
}
impl<W: AppendObjectWrite> AppendObjectWriter<W> {
@@ -63,24 +59,9 @@ impl<W: AppendObjectWrite> AppendObjectWriter<W> {
Self {
inner,
offset: None,
- buffer: oio::VectorCursor::new(),
- buffer_size: DEFAULT_WRITE_MIN_SIZE,
}
}
- /// Configure the write_min_size.
- ///
- /// write_min_size is used to control the size of internal buffer.
- ///
- /// AppendObjectWriter will flush the buffer to storage when
- /// the size of buffer is larger than write_min_size.
- ///
- /// This value is default to 8 MiB.
- pub fn with_write_min_size(mut self, v: usize) -> Self {
- self.buffer_size = v;
- self
- }
-
async fn offset(&mut self) -> Result<u64> {
if let Some(offset) = self.offset {
return Ok(offset);
@@ -101,72 +82,24 @@ where
async fn write(&mut self, bs: Bytes) -> Result<()> {
let offset = self.offset().await?;
- // Ignore empty bytes
- if bs.is_empty() {
- return Ok(());
- }
-
- self.buffer.push(bs);
- // Return directly if the buffer is not full
- if self.buffer.len() <= self.buffer_size {
- return Ok(());
- }
-
- let bs = self.buffer.peak_all();
- let size = bs.len();
+ let size = bs.len() as u64;
- match self
- .inner
- .append(offset, size as u64, AsyncBody::Bytes(bs))
+ self.inner
+ .append(offset, size, AsyncBody::Bytes(bs))
.await
- {
- Ok(_) => {
- self.buffer.take(size);
- self.offset = Some(offset + size as u64);
- Ok(())
- }
- Err(e) => {
- // If the upload fails, we should pop the given bs to make sure
- // write is re-enter safe.
- self.buffer.pop();
- Err(e)
- }
- }
+ .map(|_| self.offset = Some(offset + size))
}
async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
- if !self.buffer.is_empty() {
- return Err(Error::new(
- ErrorKind::InvalidInput,
- "Writer::sink should not be used mixed with existing buffer",
- ));
- }
-
let offset = self.offset().await?;
self.inner
.append(offset, size, AsyncBody::Stream(s))
- .await?;
- self.offset = Some(offset + size);
-
- Ok(())
+ .await
+ .map(|_| self.offset = Some(offset + size))
}
async fn close(&mut self) -> Result<()> {
- // Make sure internal buffer has been flushed.
- if !self.buffer.is_empty() {
- let bs = self.buffer.peak_exact(self.buffer.len());
-
- let offset = self.offset().await?;
- let size = bs.len() as u64;
- self.inner
- .append(offset, size, AsyncBody::Bytes(bs))
- .await?;
-
- self.buffer.clear();
- self.offset = Some(offset + size);
- }
-
Ok(())
}
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs
b/core/src/raw/oio/write/at_least_buf_write.rs
new file mode 100644
index 000000000..d484127d7
--- /dev/null
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio::{StreamExt, Streamer};
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// AtLeastBufWrite is used to implement [`Write`] based on at least buffer.
+///
+/// Users can wrap a writer and a buffer together.
+pub struct AtLeastBufWriter<W: oio::Write> {
+ inner: W,
+
+ /// The total size of the data.
+ ///
+ /// If the total size is known, we will write to underlying storage
directly without buffer it
+ /// when possible.
+ total_size: Option<u64>,
+
+ /// The size for buffer, we will flush the underlying storage if the
buffer is full.
+ buffer_size: usize,
+ buffer: oio::ChunkedCursor,
+}
+
+impl<W: oio::Write> AtLeastBufWriter<W> {
+ /// Create a new at least buf writer.
+ pub fn new(inner: W, buffer_size: usize) -> Self {
+ Self {
+ inner,
+ total_size: None,
+ buffer_size,
+ buffer: oio::ChunkedCursor::new(),
+ }
+ }
+
+ /// Configure the total size for writer.
+ pub fn with_total_size(mut self, total_size: Option<u64>) -> Self {
+ self.total_size = total_size;
+ self
+ }
+}
+
+#[async_trait]
+impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ // If total size is known and equals to given bytes, we can write it
directly.
+ if let Some(total_size) = self.total_size {
+ if total_size == bs.len() as u64 {
+ return self.inner.write(bs).await;
+ }
+ }
+
+ // Push the bytes into the buffer if the buffer is not full.
+ if self.buffer.len() + bs.len() < self.buffer_size {
+ self.buffer.push(bs);
+ return Ok(());
+ }
+
+ let mut buf = self.buffer.clone();
+ buf.push(bs);
+
+ self.inner
+ .sink(buf.len() as u64, Box::new(buf))
+ .await
+ // Clear buffer if the write is successful.
+ .map(|_| self.buffer.clear())
+ }
+
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ // If total size is known and equals to given stream, we can write it
directly.
+ if let Some(total_size) = self.total_size {
+ if total_size == size {
+ return self.inner.sink(size, s).await;
+ }
+ }
+
+ // Push the bytes into the buffer if the buffer is not full.
+ if self.buffer.len() as u64 + size < self.buffer_size as u64 {
+ self.buffer.push(s.collect().await?);
+ return Ok(());
+ }
+
+ let buf = self.buffer.clone();
+ let buffer_size = buf.len() as u64;
+ let stream = buf.chain(s);
+
+ self.inner
+ .sink(buffer_size + size, Box::new(stream))
+ .await
+ // Clear buffer if the write is successful.
+ .map(|_| self.buffer.clear())
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ self.inner.abort().await
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ if !self.buffer.is_empty() {
+ self.inner
+ .sink(self.buffer.len() as u64, Box::new(self.buffer.clone()))
+ .await?;
+ self.buffer.clear();
+ }
+
+ self.inner.close().await
+ }
+}
diff --git a/core/src/raw/oio/write/compose_write.rs
b/core/src/raw/oio/write/compose_write.rs
new file mode 100644
index 000000000..043df2978
--- /dev/null
+++ b/core/src/raw/oio/write/compose_write.rs
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`type_alias_impl_trait`](https://github.com/rust-lang/rust/issues/63063)
is not stable yet.
+//! So we can't write the following code:
+//!
+//! ```txt
+//! impl Accessor for S3Backend {
+//! type Writer = impl oio::Write;
+//! }
+//! ```
+//!
+//! Which means we have to write the type directly like:
+//!
+//! ```txt
+//! impl Accessor for OssBackend {
+//! type Writer = oio::TwoWaysWriter<
+//! oio::MultipartUploadWriter<OssWriter>,
+//! oio::AppendObjectWriter<OssWriter>,
+//! >;
+//! }
+//! ```
+//!
+//! This module is used to provide some enums for the above code. We should
remove this module once
+//! type_alias_impl_trait has been stabilized.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+
+use crate::raw::oio::Streamer;
+use crate::raw::*;
+use crate::*;
+
+/// TwoWaysWrite is used to implement [`Write`] based on two ways.
+///
+/// Users can wrap two different writers together.
+pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> {
+ /// The first type for the [`TwoWaysWriter`].
+ One(ONE),
+ /// The second type for the [`TwoWaysWriter`].
+ Two(TWO),
+}
+
+#[async_trait]
+impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ match self {
+ Self::One(one) => one.write(bs).await,
+ Self::Two(two) => two.write(bs).await,
+ }
+ }
+
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ match self {
+ Self::One(one) => one.sink(size, s).await,
+ Self::Two(two) => two.sink(size, s).await,
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ match self {
+ Self::One(one) => one.abort().await,
+ Self::Two(two) => two.abort().await,
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ match self {
+ Self::One(one) => one.close().await,
+ Self::Two(two) => two.close().await,
+ }
+ }
+}
+
+/// ThreeWaysWriter is used to implement [`Write`] based on three ways.
+///
+/// Users can wrap three different writers together.
+pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> {
+ /// The first type for the [`ThreeWaysWriter`].
+ One(ONE),
+ /// The second type for the [`ThreeWaysWriter`].
+ Two(TWO),
+ /// The third type for the [`ThreeWaysWriter`].
+ Three(THREE),
+}
+
+#[async_trait]
+impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
+ for ThreeWaysWriter<ONE, TWO, THREE>
+{
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ match self {
+ Self::One(one) => one.write(bs).await,
+ Self::Two(two) => two.write(bs).await,
+ Self::Three(three) => three.write(bs).await,
+ }
+ }
+
+ async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
+ match self {
+ Self::One(one) => one.sink(size, s).await,
+ Self::Two(two) => two.sink(size, s).await,
+ Self::Three(three) => three.sink(size, s).await,
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ match self {
+ Self::One(one) => one.abort().await,
+ Self::Two(two) => two.abort().await,
+ Self::Three(three) => three.abort().await,
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ match self {
+ Self::One(one) => one.close().await,
+ Self::Two(two) => two.close().await,
+ Self::Three(three) => three.close().await,
+ }
+ }
+}
diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index b13b3e2c7..49dd94b2b 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -22,8 +22,9 @@ pub use api::Write;
pub use api::WriteOperation;
pub use api::Writer;
-mod two_ways_write;
-pub use two_ways_write::TwoWaysWriter;
+mod compose_write;
+pub use compose_write::ThreeWaysWriter;
+pub use compose_write::TwoWaysWriter;
mod multipart_upload_write;
pub use multipart_upload_write::MultipartUploadPart;
@@ -33,3 +34,10 @@ pub use multipart_upload_write::MultipartUploadWriter;
mod append_object_write;
pub use append_object_write::AppendObjectWrite;
pub use append_object_write::AppendObjectWriter;
+
+mod at_least_buf_write;
+pub use at_least_buf_write::AtLeastBufWriter;
+
+mod one_shot_write;
+pub use one_shot_write::OneShotWrite;
+pub use one_shot_write::OneShotWriter;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 8ca10c462..c013b24c3 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -21,8 +21,6 @@ use bytes::Bytes;
use crate::raw::*;
use crate::*;
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
-
/// MultipartUploadWrite is used to implement [`Write`] based on multipart
/// uploads. By implementing MultipartUploadWrite, services don't need to
/// care about the details of buffering and uploading parts.
@@ -34,12 +32,6 @@ const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
/// - Expose `MultipartUploadWriter` as `Accessor::Writer`
#[async_trait]
pub trait MultipartUploadWrite: Send + Sync + Unpin {
- /// write_once write all data at once.
- ///
- /// MultipartUploadWriter will call this API when the size of data is
- /// already known.
- async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
-
/// initiate_part will call start a multipart upload and return the upload
id.
///
/// MultipartUploadWriter will call this when:
@@ -94,39 +86,32 @@ pub struct MultipartUploadPart {
/// - Allow users to switch to un-buffered mode if users write 16MiB every
time.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
inner: W,
- total_size: Option<u64>,
upload_id: Option<String>,
parts: Vec<MultipartUploadPart>,
- buffer: oio::VectorCursor,
- buffer_size: usize,
}
impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W, total_size: Option<u64>) -> Self {
+ pub fn new(inner: W) -> Self {
Self {
inner,
- total_size,
upload_id: None,
parts: Vec::new(),
- buffer: oio::VectorCursor::new(),
- buffer_size: DEFAULT_WRITE_MIN_SIZE,
}
}
- /// Configure the write_min_size.
- ///
- /// write_min_size is used to control the size of internal buffer.
- ///
- /// MultipartUploadWriter will flush the buffer to upload a part when
- /// the size of buffer is larger than write_min_size.
- ///
- /// This value is default to 8 MiB (as recommended by AWS).
- pub fn with_write_min_size(mut self, v: usize) -> Self {
- self.buffer_size = v;
- self
+ /// Get the upload id. Initiate a new multipart upload if the upload id is
empty.
+ pub async fn upload_id(&mut self) -> Result<String> {
+ match &self.upload_id {
+ Some(upload_id) => Ok(upload_id.to_string()),
+ None => {
+ let upload_id = self.inner.initiate_part().await?;
+ self.upload_id = Some(upload_id.clone());
+ Ok(upload_id)
+ }
+ }
}
}
@@ -136,88 +121,28 @@ where
W: MultipartUploadWrite,
{
async fn write(&mut self, bs: Bytes) -> Result<()> {
- let upload_id = match &self.upload_id {
- Some(upload_id) => upload_id,
- None => {
- if self.total_size.unwrap_or_default() == bs.len() as u64 {
- return self
- .inner
- .write_once(bs.len() as u64, AsyncBody::Bytes(bs))
- .await;
- }
-
- let upload_id = self.inner.initiate_part().await?;
- self.upload_id = Some(upload_id);
- self.upload_id.as_deref().unwrap()
- }
- };
+ let upload_id = self.upload_id().await?;
- // Ignore empty bytes
- if bs.is_empty() {
- return Ok(());
- }
-
- self.buffer.push(bs);
- // Return directly if the buffer is not full
- if self.buffer.len() <= self.buffer_size {
- return Ok(());
- }
-
- let bs = self.buffer.peak_at_least(self.buffer_size);
let size = bs.len();
- match self
- .inner
+ self.inner
.write_part(
- upload_id,
+ &upload_id,
self.parts.len(),
size as u64,
AsyncBody::Bytes(bs),
)
.await
- {
- Ok(part) => {
- self.buffer.take(size);
- self.parts.push(part);
- Ok(())
- }
- Err(e) => {
- // If the upload fails, we should pop the given bs to make sure
- // write is re-enter safe.
- self.buffer.pop();
- Err(e)
- }
- }
+ .map(|v| self.parts.push(v))
}
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
- if !self.buffer.is_empty() {
- return Err(Error::new(
- ErrorKind::InvalidInput,
- "Writer::sink should not be used mixed with existing buffer",
- ));
- }
-
- let upload_id = match &self.upload_id {
- Some(upload_id) => upload_id,
- None => {
- if self.total_size.unwrap_or_default() == size {
- return self.inner.write_once(size,
AsyncBody::Stream(s)).await;
- }
-
- let upload_id = self.inner.initiate_part().await?;
- self.upload_id = Some(upload_id);
- self.upload_id.as_deref().unwrap()
- }
- };
+ let upload_id = self.upload_id().await?;
- let part = self
- .inner
- .write_part(upload_id, self.parts.len(), size,
AsyncBody::Stream(s))
- .await?;
- self.parts.push(part);
-
- Ok(())
+ self.inner
+ .write_part(&upload_id, self.parts.len(), size,
AsyncBody::Stream(s))
+ .await
+ .map(|v| self.parts.push(v))
}
async fn close(&mut self) -> Result<()> {
@@ -227,30 +152,6 @@ where
return Ok(());
};
- // Make sure internal buffer has been flushed.
- if !self.buffer.is_empty() {
- let bs = self.buffer.peak_exact(self.buffer.len());
-
- match self
- .inner
- .write_part(
- upload_id,
- self.parts.len(),
- bs.len() as u64,
- AsyncBody::Bytes(bs),
- )
- .await
- {
- Ok(part) => {
- self.buffer.clear();
- self.parts.push(part);
- }
- Err(e) => {
- return Err(e);
- }
- }
- }
-
self.inner.complete_part(upload_id, &self.parts).await
}
diff --git a/core/src/raw/oio/write/one_shot_write.rs
b/core/src/raw/oio/write/one_shot_write.rs
new file mode 100644
index 000000000..c2ad25c83
--- /dev/null
+++ b/core/src/raw/oio/write/one_shot_write.rs
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use bytes::Bytes;
+
+/// OneShotWrite is used to implement [`Write`] based on one shot operation.
+/// By implementing OneShotWrite, services don't need to care about the
details.
+///
+/// For example, S3 `PUT Object` and fs `write_all`.
+///
+/// The layout after adopting [`OneShotWrite`]:
+#[async_trait]
+pub trait OneShotWrite: Send + Sync + Unpin {
+ /// write_once write all data at once.
+ ///
+ /// Implementations should make sure that the data is written correctly at
once.
+ async fn write_once(&self, size: u64, stream: oio::Streamer) -> Result<()>;
+}
+
+/// OneShotWrite is used to implement [`Write`] based on one shot.
+pub struct OneShotWriter<W: OneShotWrite> {
+ inner: W,
+}
+
+impl<W: OneShotWrite> OneShotWriter<W> {
+ /// Create a new one shot writer.
+ pub fn new(inner: W) -> Self {
+ Self { inner }
+ }
+}
+
+#[async_trait]
+impl<W: OneShotWrite> oio::Write for OneShotWriter<W> {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let cursor = oio::Cursor::from(bs);
+ self.inner
+ .write_once(cursor.len() as u64, Box::new(cursor))
+ .await
+ }
+
+ async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+ self.inner.write_once(size, s).await
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/raw/oio/write/two_ways_write.rs
b/core/src/raw/oio/write/two_ways_write.rs
deleted file mode 100644
index 470655783..000000000
--- a/core/src/raw/oio/write/two_ways_write.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use async_trait::async_trait;
-use bytes::Bytes;
-
-use crate::raw::oio::Streamer;
-use crate::raw::*;
-use crate::*;
-
-/// TwoWaysWrite is used to implement [`Write`] based on two ways.
-///
-/// Users can wrap two different writers together.
-pub enum TwoWaysWriter<L: oio::Write, R: oio::Write> {
- /// The left side for the [`TwoWaysWriter`].
- Left(L),
- /// The right side for the [`TwoWaysWriter`].
- Right(R),
-}
-
-#[async_trait]
-impl<L: oio::Write, R: oio::Write> oio::Write for TwoWaysWriter<L, R> {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
- match self {
- Self::Left(l) => l.write(bs).await,
- Self::Right(r) => r.write(bs).await,
- }
- }
-
- async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> {
- match self {
- Self::Left(l) => l.sink(size, s).await,
- Self::Right(r) => r.sink(size, s).await,
- }
- }
-
- async fn abort(&mut self) -> Result<()> {
- match self {
- Self::Left(l) => l.abort().await,
- Self::Right(r) => r.abort().await,
- }
- }
-
- async fn close(&mut self) -> Result<()> {
- match self {
- Self::Left(l) => l.close().await,
- Self::Right(r) => r.close().await,
- }
- }
-}
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 2617768da..0404ed468 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -406,6 +406,7 @@ impl OpStat {
pub struct OpWrite {
append: bool,
+ buffer_size: Option<usize>,
content_length: Option<u64>,
content_type: Option<String>,
content_disposition: Option<String>,
@@ -439,6 +440,26 @@ impl OpWrite {
self
}
+ /// Get the buffer size from op.
+ ///
+ /// The buffer size is used by service to decide the buffer size of the
underlying writer.
+ pub fn buffer_size(&self) -> Option<usize> {
+ self.buffer_size
+ }
+
+ /// Set the buffer size of op.
+ ///
+ /// If buffer size is set, the data will be buffered by the underlying
writer.
+ ///
+ /// ## NOTE
+ ///
+ /// Service could have their own minimum buffer size while perform write
operations like
+ /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
+ self.buffer_size = Some(buffer_size);
+ self
+ }
+
/// Get the content length from op.
///
/// The content length is the total length of the data to be written.
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 9dc4b1b66..66aa65449 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -32,9 +33,13 @@ use super::error::parse_error;
use super::pager::CosPager;
use super::writer::CosWriter;
use crate::raw::*;
+use crate::services::cos::writer::CosWriters;
use crate::*;
-const DEFAULT_WRITE_MIN_SIZE: usize = 1024 * 1024;
+/// The minimum multipart size of COS is 1 MiB.
+///
+/// ref: <https://www.tencentcloud.com/document/product/436/14112>
+const MINIMUM_MULTIPART_SIZE: usize = 1024 * 1024;
/// Tencent-Cloud COS services support.
#[doc = include_str!("docs.md")]
@@ -47,10 +52,6 @@ pub struct CosBuilder {
bucket: Option<String>,
http_client: Option<HttpClient>,
- /// the part size of cos multipart upload, which should be 1 MB to 5 GB.
- /// There is no minimum size limit on the last part of your multipart
upload
- write_min_size: Option<usize>,
-
disable_config_load: bool,
}
@@ -125,14 +126,6 @@ impl CosBuilder {
self
}
- /// set the minimum size of unsized write, it should be greater than 1 MB.
- /// Reference: [Upload Part | Tencent
Cloud](https://www.tencentcloud.com/document/product/436/7750)
- pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
- self.write_min_size = Some(write_min_size);
-
- self
- }
-
/// Disable config load so that opendal will not load config from
/// environment.
///
@@ -168,8 +161,6 @@ impl Builder for CosBuilder {
map.get("endpoint").map(|v| builder.endpoint(v));
map.get("secret_id").map(|v| builder.secret_id(v));
map.get("secret_key").map(|v| builder.secret_key(v));
- map.get("write_min_size")
- .map(|v| builder.write_min_size(v.parse().expect("input must be a
number")));
builder
}
@@ -233,14 +224,6 @@ impl Builder for CosBuilder {
let cred_loader = TencentCosCredentialLoader::new(client.client(),
cfg);
let signer = TencentCosSigner::new();
- let write_min_size =
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
- if write_min_size < 1024 * 1024 {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "The write minimum buffer size is misconfigured",
- )
- .with_context("service", Scheme::Cos));
- }
debug!("backend build finished");
Ok(CosBackend {
@@ -251,7 +234,6 @@ impl Builder for CosBuilder {
signer,
loader: cred_loader,
client,
- write_min_size,
}),
})
}
@@ -267,10 +249,7 @@ pub struct CosBackend {
impl Accessor for CosBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<
- oio::MultipartUploadWriter<CosWriter>,
- oio::AppendObjectWriter<CosWriter>,
- >;
+ type Writer = oio::TwoWaysWriter<CosWriters,
oio::AtLeastBufWriter<CosWriters>>;
type BlockingWriter = ();
type Pager = CosPager;
type BlockingPager = ();
@@ -358,19 +337,26 @@ impl Accessor for CosBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let writer = CosWriter::new(self.core.clone(), path, args.clone());
- let tw = if args.append() {
+ let w = if args.append() {
+ CosWriters::Three(oio::AppendObjectWriter::new(writer))
+ } else if args.content_length().is_some() {
+ CosWriters::One(oio::OneShotWriter::new(writer))
+ } else {
+ CosWriters::Two(oio::MultipartUploadWriter::new(writer))
+ };
+
+ let w = if let Some(buffer_size) = args.buffer_size() {
+ let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
let w =
-
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+ oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
- oio::TwoWaysWriter::Right(w)
+ oio::TwoWaysWriter::Two(w)
} else {
- let w = oio::MultipartUploadWriter::new(writer,
args.content_length())
- .with_write_min_size(self.core.write_min_size);
-
- oio::TwoWaysWriter::Left(w)
+ oio::TwoWaysWriter::One(w)
};
- return Ok((RpWrite::default(), tw));
+ Ok((RpWrite::default(), w))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index e288a447c..490dada6e 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -45,7 +45,6 @@ pub struct CosCore {
pub signer: TencentCosSigner,
pub loader: TencentCosCredentialLoader,
pub client: HttpClient,
- pub write_min_size: usize,
}
impl Debug for CosCore {
diff --git a/core/src/services/cos/writer.rs b/core/src/services/cos/writer.rs
index e64bc23a1..af2a6ebd0 100644
--- a/core/src/services/cos/writer.rs
+++ b/core/src/services/cos/writer.rs
@@ -23,9 +23,16 @@ use http::StatusCode;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
+pub type CosWriters = oio::ThreeWaysWriter<
+ oio::OneShotWriter<CosWriter>,
+ oio::MultipartUploadWriter<CosWriter>,
+ oio::AppendObjectWriter<CosWriter>,
+>;
+
pub struct CosWriter {
core: Arc<CosCore>,
@@ -44,15 +51,15 @@ impl CosWriter {
}
#[async_trait]
-impl oio::MultipartUploadWrite for CosWriter {
- async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for CosWriter {
+ async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
let mut req = self.core.cos_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- body,
+ AsyncBody::Stream(stream),
)?;
self.core.sign(&mut req).await?;
@@ -69,7 +76,10 @@ impl oio::MultipartUploadWrite for CosWriter {
_ => Err(parse_error(resp).await?),
}
}
+}
+#[async_trait]
+impl oio::MultipartUploadWrite for CosWriter {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index e90308eda..78ecf0976 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@@ -32,8 +33,14 @@ use super::error::parse_error;
use super::pager::ObsPager;
use super::writer::ObsWriter;
use crate::raw::*;
+use crate::services::obs::writer::ObsWriters;
use crate::*;
+/// The minimum multipart size of OBS is 5 MiB.
+///
+/// ref:
<https://support.huaweicloud.com/intl/en-us/ugobs-obs/obs_41_0021.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
+
/// Huawei Cloud OBS services support.
///
/// # Capabilities
@@ -91,9 +98,6 @@ use crate::*;
/// Ok(())
/// }
/// ```
-
-const DEFAULT_WRITE_MIN_SIZE: usize = 100 * 1024;
-
/// Huawei-Cloud Object Storage Service (OBS) support
#[derive(Default, Clone)]
pub struct ObsBuilder {
@@ -103,9 +107,6 @@ pub struct ObsBuilder {
secret_access_key: Option<String>,
bucket: Option<String>,
http_client: Option<HttpClient>,
- /// the part size of obs multipart upload, which should be 100 KiB to 5
GiB.
- /// There is no minimum size limit on the last part of your multipart
upload
- write_min_size: Option<usize>,
}
impl Debug for ObsBuilder {
@@ -190,14 +191,6 @@ impl ObsBuilder {
self.http_client = Some(client);
self
}
-
- /// set the minimum size of unsized write, it should be greater than 100
KB.
- /// Reference: [Huawei Obs multipart upload
limits](https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0099.html)
- pub fn write_min_size(&mut self, write_min_size: usize) -> &mut Self {
- self.write_min_size = Some(write_min_size);
-
- self
- }
}
impl Builder for ObsBuilder {
@@ -213,8 +206,6 @@ impl Builder for ObsBuilder {
map.get("access_key_id").map(|v| builder.access_key_id(v));
map.get("secret_access_key")
.map(|v| builder.secret_access_key(v));
- map.get("write_min_size")
- .map(|v| builder.write_min_size(v.parse().expect("input must be a
number")));
builder
}
@@ -296,14 +287,6 @@ impl Builder for ObsBuilder {
&endpoint
}
});
- let write_min_size =
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
- if write_min_size < 100 * 1024 {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "The write minimum buffer size is misconfigured",
- )
- .with_context("service", Scheme::Obs));
- }
debug!("backend build finished");
Ok(ObsBackend {
@@ -314,7 +297,6 @@ impl Builder for ObsBuilder {
signer,
loader,
client,
- write_min_size,
}),
})
}
@@ -330,10 +312,7 @@ pub struct ObsBackend {
impl Accessor for ObsBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<
- oio::MultipartUploadWriter<ObsWriter>,
- oio::AppendObjectWriter<ObsWriter>,
- >;
+ type Writer = oio::TwoWaysWriter<ObsWriters,
oio::AtLeastBufWriter<ObsWriters>>;
type BlockingWriter = ();
type Pager = ObsPager;
type BlockingPager = ();
@@ -452,19 +431,26 @@ impl Accessor for ObsBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let writer = ObsWriter::new(self.core.clone(), path, args.clone());
- let tw = if args.append() {
+ let w = if args.append() {
+ ObsWriters::Three(oio::AppendObjectWriter::new(writer))
+ } else if args.content_length().is_some() {
+ ObsWriters::One(oio::OneShotWriter::new(writer))
+ } else {
+ ObsWriters::Two(oio::MultipartUploadWriter::new(writer))
+ };
+
+ let w = if let Some(buffer_size) = args.buffer_size() {
+ let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
let w =
-
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+ oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
- oio::TwoWaysWriter::Right(w)
+ oio::TwoWaysWriter::Two(w)
} else {
- let w = oio::MultipartUploadWriter::new(writer,
args.content_length())
- .with_write_min_size(self.core.write_min_size);
-
- oio::TwoWaysWriter::Left(w)
+ oio::TwoWaysWriter::One(w)
};
- return Ok((RpWrite::default(), tw));
+ Ok((RpWrite::default(), w))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index 52604fd50..3d8ba1daf 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -45,7 +45,6 @@ pub struct ObsCore {
pub signer: HuaweicloudObsSigner,
pub loader: HuaweicloudObsCredentialLoader,
pub client: HttpClient,
- pub write_min_size: usize,
}
impl Debug for ObsCore {
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index efcfd3ab6..f8078a955 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -23,10 +23,16 @@ use http::StatusCode;
use super::core::*;
use super::error::parse_error;
-use crate::raw::oio::MultipartUploadPart;
+use crate::raw::oio::{MultipartUploadPart, Streamer};
use crate::raw::*;
use crate::*;
+pub type ObsWriters = oio::ThreeWaysWriter<
+ oio::OneShotWriter<ObsWriter>,
+ oio::MultipartUploadWriter<ObsWriter>,
+ oio::AppendObjectWriter<ObsWriter>,
+>;
+
pub struct ObsWriter {
core: Arc<ObsCore>,
@@ -43,15 +49,16 @@ impl ObsWriter {
}
}
}
+
#[async_trait]
-impl oio::MultipartUploadWrite for ObsWriter {
- async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for ObsWriter {
+ async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
let mut req = self.core.obs_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
self.op.cache_control(),
- body,
+ AsyncBody::Stream(stream),
)?;
self.core.sign(&mut req).await?;
@@ -68,7 +75,10 @@ impl oio::MultipartUploadWrite for ObsWriter {
_ => Err(parse_error(resp).await?),
}
}
+}
+#[async_trait]
+impl oio::MultipartUploadWrite for ObsWriter {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6704d158b..1aef92b77 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::max;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -35,9 +36,13 @@ use super::error::parse_error;
use super::pager::OssPager;
use super::writer::OssWriter;
use crate::raw::*;
+use crate::services::oss::writer::OssWriters;
use crate::*;
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of OSS is 100 KiB.
+///
+/// ref:
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
+const MINIMUM_MULTIPART_SIZE: usize = 100 * 1024;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
/// Aliyun Object Storage Service (OSS) support
@@ -343,14 +348,6 @@ impl Builder for OssBuilder {
let signer = AliyunOssSigner::new(bucket);
- let write_min_size =
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
- if write_min_size < 5 * 1024 * 1024 {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "The write minimum buffer size is misconfigured",
- )
- .with_context("service", Scheme::Oss));
- }
let batch_max_operations = self
.batch_max_operations
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -368,7 +365,6 @@ impl Builder for OssBuilder {
client,
server_side_encryption,
server_side_encryption_key_id,
- write_min_size,
batch_max_operations,
}),
})
@@ -385,10 +381,7 @@ pub struct OssBackend {
impl Accessor for OssBackend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::TwoWaysWriter<
- oio::MultipartUploadWriter<OssWriter>,
- oio::AppendObjectWriter<OssWriter>,
- >;
+ type Writer = oio::TwoWaysWriter<OssWriters,
oio::AtLeastBufWriter<OssWriters>>;
type BlockingWriter = ();
type Pager = OssPager;
type BlockingPager = ();
@@ -480,19 +473,26 @@ impl Accessor for OssBackend {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let writer = OssWriter::new(self.core.clone(), path, args.clone());
- let tw = if args.append() {
+ let w = if args.append() {
+ OssWriters::Three(oio::AppendObjectWriter::new(writer))
+ } else if args.content_length().is_some() {
+ OssWriters::One(oio::OneShotWriter::new(writer))
+ } else {
+ OssWriters::Two(oio::MultipartUploadWriter::new(writer))
+ };
+
+ let w = if let Some(buffer_size) = args.buffer_size() {
+ let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
let w =
-
oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size);
+ oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
- oio::TwoWaysWriter::Right(w)
+ oio::TwoWaysWriter::Two(w)
} else {
- let w = oio::MultipartUploadWriter::new(writer,
args.content_length())
- .with_write_min_size(self.core.write_min_size);
-
- oio::TwoWaysWriter::Left(w)
+ oio::TwoWaysWriter::One(w)
};
- return Ok((RpWrite::default(), tw));
+ Ok((RpWrite::default(), w))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 6c570fb17..a81db97fa 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -64,7 +64,6 @@ pub struct OssCore {
pub client: HttpClient,
pub loader: AliyunLoader,
pub signer: AliyunOssSigner,
- pub write_min_size: usize,
pub batch_max_operations: usize,
}
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index 9faad249f..27aa09011 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -23,9 +23,16 @@ use http::StatusCode;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
+pub type OssWriters = oio::ThreeWaysWriter<
+ oio::OneShotWriter<OssWriter>,
+ oio::MultipartUploadWriter<OssWriter>,
+ oio::AppendObjectWriter<OssWriter>,
+>;
+
pub struct OssWriter {
core: Arc<OssCore>,
@@ -44,15 +51,15 @@ impl OssWriter {
}
#[async_trait]
-impl oio::MultipartUploadWrite for OssWriter {
- async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for OssWriter {
+ async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
let mut req = self.core.oss_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- body,
+ AsyncBody::Stream(stream),
false,
)?;
@@ -70,7 +77,10 @@ impl oio::MultipartUploadWrite for OssWriter {
_ => Err(parse_error(resp).await?),
}
}
+}
+#[async_trait]
+impl oio::MultipartUploadWrite for OssWriter {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 83ff8a7d0..700359009 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp::max;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
@@ -43,6 +44,7 @@ use super::error::parse_s3_error_code;
use super::pager::S3Pager;
use super::writer::S3Writer;
use crate::raw::*;
+use crate::services::s3::writer::S3Writers;
use crate::*;
/// Allow constructing correct region endpoint if user gives a global endpoint.
@@ -56,7 +58,10 @@ static ENDPOINT_TEMPLATES: Lazy<HashMap<&'static str,
&'static str>> = Lazy::new
m
});
-const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
+/// The minimum multipart size of S3 is 5 MiB.
+///
+/// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
+const MINIMUM_MULTIPART_SIZE: usize = 5 * 1024 * 1024;
const DEFAULT_BATCH_MAX_OPERATIONS: usize = 1000;
/// Aws S3 and compatible services (including minio, digitalocean space,
Tencent Cloud Object Storage(COS) and so on) support.
@@ -847,14 +852,6 @@ impl Builder for S3Builder {
let signer = AwsV4Signer::new("s3", ®ion);
- let write_min_size =
self.write_min_size.unwrap_or(DEFAULT_WRITE_MIN_SIZE);
- if write_min_size < 5 * 1024 * 1024 {
- return Err(Error::new(
- ErrorKind::ConfigInvalid,
- "The write minimum buffer size is misconfigured",
- )
- .with_context("service", Scheme::S3));
- }
let batch_max_operations = self
.batch_max_operations
.unwrap_or(DEFAULT_BATCH_MAX_OPERATIONS);
@@ -874,7 +871,6 @@ impl Builder for S3Builder {
signer,
loader,
client,
- write_min_size,
batch_max_operations,
}),
})
@@ -891,7 +887,7 @@ pub struct S3Backend {
impl Accessor for S3Backend {
type Reader = IncomingAsyncBody;
type BlockingReader = ();
- type Writer = oio::MultipartUploadWriter<S3Writer>;
+ type Writer = oio::TwoWaysWriter<S3Writers,
oio::AtLeastBufWriter<S3Writers>>;
type BlockingWriter = ();
type Pager = S3Pager;
type BlockingPager = ();
@@ -979,10 +975,26 @@ impl Accessor for S3Backend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- Ok((
- RpWrite::default(),
- S3Writer::new(self.core.clone(), path, args),
- ))
+ let writer = S3Writer::new(self.core.clone(), path, args.clone());
+
+ let w = if args.content_length().is_some() {
+ S3Writers::One(oio::OneShotWriter::new(writer))
+ } else {
+ S3Writers::Two(oio::MultipartUploadWriter::new(writer))
+ };
+
+ let w = if let Some(buffer_size) = args.buffer_size() {
+ let buffer_size = max(MINIMUM_MULTIPART_SIZE, buffer_size);
+
+ let w =
+ oio::AtLeastBufWriter::new(w,
buffer_size).with_total_size(args.content_length());
+
+ oio::TwoWaysWriter::Two(w)
+ } else {
+ oio::TwoWaysWriter::One(w)
+ };
+
+ Ok((RpWrite::default(), w))
}
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 36321280f..89989bcb7 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -82,7 +82,6 @@ pub struct S3Core {
pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,
pub client: HttpClient,
- pub write_min_size: usize,
pub batch_max_operations: usize,
}
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index 508716d14..a27341d71 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -23,9 +23,13 @@ use http::StatusCode;
use super::core::*;
use super::error::parse_error;
+use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;
+pub type S3Writers =
+ oio::TwoWaysWriter<oio::OneShotWriter<S3Writer>,
oio::MultipartUploadWriter<S3Writer>>;
+
pub struct S3Writer {
core: Arc<S3Core>,
@@ -34,30 +38,25 @@ pub struct S3Writer {
}
impl S3Writer {
- pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) ->
oio::MultipartUploadWriter<Self> {
- let write_min_size = core.write_min_size;
-
- let total_size = op.content_length();
- let s3_writer = S3Writer {
+ pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
+ S3Writer {
core,
path: path.to_string(),
op,
- };
-
- oio::MultipartUploadWriter::new(s3_writer,
total_size).with_write_min_size(write_min_size)
+ }
}
}
#[async_trait]
-impl oio::MultipartUploadWrite for S3Writer {
- async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+impl oio::OneShotWrite for S3Writer {
+ async fn write_once(&self, size: u64, stream: Streamer) -> Result<()> {
let mut req = self.core.s3_put_object_request(
&self.path,
Some(size),
self.op.content_type(),
self.op.content_disposition(),
self.op.cache_control(),
- body,
+ AsyncBody::Stream(stream),
)?;
self.core.sign(&mut req).await?;
@@ -74,7 +73,10 @@ impl oio::MultipartUploadWrite for S3Writer {
_ => Err(parse_error(resp).await?),
}
}
+}
+#[async_trait]
+impl oio::MultipartUploadWrite for S3Writer {
async fn initiate_part(&self) -> Result<String> {
let resp = self
.core
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index d3676f1ce..10a9c061c 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -395,6 +395,19 @@ impl FutureWrite {
self
}
+ /// Set the buffer size of op.
+ ///
+ /// If buffer size is set, the data will be buffered by the underlying
writer.
+ ///
+ /// ## NOTE
+ ///
+ /// Service could have their own minimum buffer size while perform write
operations like
+ /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ pub fn buffer_size(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
+ self
+ }
+
/// Set the content length of op.
///
/// If the content length is not set, the content length will be
@@ -457,6 +470,19 @@ impl FutureWriter {
self
}
+ /// Set the buffer size of op.
+ ///
+ /// If buffer size is set, the data will be buffered by the underlying
writer.
+ ///
+ /// ## NOTE
+ ///
+ /// Service could have their own minimum buffer size while perform write
operations like
+ /// multipart uploads. So the buffer size may be larger than the given
buffer size.
+ pub fn buffer_size(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_buffer_size(v));
+ self
+ }
+
/// Set the content length of op.
///
/// If the content length is not set, the content length will be
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index a1effab43..ae8825984 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -1233,7 +1233,7 @@ pub async fn test_writer_futures_copy(op: Operator) ->
Result<()> {
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
- let mut w = op.writer(&path).await?;
+ let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
// Wrap a buf reader here to make sure content is read in 1MiB chunks.
let mut cursor = BufReader::with_capacity(1024 * 1024,
Cursor::new(content.clone()));
@@ -1266,7 +1266,7 @@ pub async fn test_fuzz_unsized_writer(op: Operator) ->
Result<()> {
let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
- let mut w = op.writer(&path).await?;
+ let mut w = op.writer_with(&path).buffer_size(8 * 1024 * 1024).await?;
for _ in 0..100 {
match fuzzer.fuzz() {