This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch buffer-refactor in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 73fa30953b8b50682c8bae68325f7bc38d650f94 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 22 16:12:15 2023 +0800 Add compose Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/at_least_buf_write.rs | 1 + core/src/raw/oio/write/compose_write.rs | 136 +++++++++++++++++++++++++++ core/src/raw/oio/write/mod.rs | 5 +- core/src/raw/oio/write/two_ways_write.rs | 64 ------------- core/src/services/cos/backend.rs | 4 +- core/src/services/obs/backend.rs | 4 +- core/src/services/oss/backend.rs | 4 +- 7 files changed, 146 insertions(+), 72 deletions(-) diff --git a/core/src/raw/oio/write/at_least_buf_write.rs b/core/src/raw/oio/write/at_least_buf_write.rs index 91da42bbc..7a295c313 100644 --- a/core/src/raw/oio/write/at_least_buf_write.rs +++ b/core/src/raw/oio/write/at_least_buf_write.rs @@ -27,6 +27,7 @@ use bytes::Bytes; pub struct AtLeastBufWriter<W: oio::Write> { inner: W, + /// The size for buffer, we will flush the underlying storage if the buffer is full. size: usize, buf: oio::ChunkedCursor, } diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs new file mode 100644 index 000000000..30f70fa86 --- /dev/null +++ b/core/src/raw/oio/write/compose_write.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`type_alias_impl_trait`](https://github.com/rust-lang/rust/issues/63063) is not stable yet. +//! So we can't write the following code: +//! +//! ```rust,skip +//! impl Accessor for S3Backend { +//! type Writer = impl oio::Write; +//! } +//! ``` +//! +//! Which means we have to write the type directly like: +//! +//! ```rust,skip +//! impl Accessor for OssBackend { +//! type Writer = oio::TwoWaysWriter< +//! oio::MultipartUploadWriter<OssWriter>, +//! oio::AppendObjectWriter<OssWriter>, +//! >; +//! } +//! ``` +//! +//! This module is used to provide some enums for the above code. We should remove this module once +//! type_alias_impl_trait has been stabilized. + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::oio::Streamer; +use crate::raw::*; +use crate::*; + +/// TwoWaysWrite is used to implement [`Write`] based on two ways. +/// +/// Users can wrap two different writers together. +pub enum TwoWaysWriter<ONE: oio::Write, TWO: oio::Write> { + /// The first type for the [`TwoWaysWriter`]. + One(ONE), + /// The second type for the [`TwoWaysWriter`]. + Two(TWO), +} + +#[async_trait] +impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWaysWriter<ONE, TWO> { + async fn write(&mut self, bs: Bytes) -> Result<()> { + match self { + Self::One(one) => one.write(bs).await, + Self::Two(two) => two.write(bs).await, + } + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + match self { + Self::One(one) => one.sink(size, s).await, + Self::Two(two) => two.sink(size, s).await, + } + } + + async fn abort(&mut self) -> Result<()> { + match self { + Self::One(one) => one.abort().await, + Self::Two(two) => two.abort().await, + } + } + + async fn close(&mut self) -> Result<()> { + match self { + Self::One(one) => one.close().await, + Self::Two(two) => two.close().await, + } + } +} + +/// ThreeWaysWriter is used to implement [`Write`] based on three ways. +/// +/// Users can wrap three different writers together. +pub enum ThreeWaysWriter<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> { + /// The first type for the [`ThreeWaysWriter`]. + One(ONE), + /// The second type for the [`ThreeWaysWriter`]. + Two(TWO), + /// The third type for the [`ThreeWaysWriter`]. + Three(THREE), +} + +#[async_trait] +impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write + for ThreeWaysWriter<ONE, TWO, THREE> +{ + async fn write(&mut self, bs: Bytes) -> Result<()> { + match self { + Self::One(one) => one.write(bs).await, + Self::Two(two) => two.write(bs).await, + Self::Three(three) => three.write(bs).await, + } + } + + async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { + match self { + Self::One(one) => one.sink(size, s).await, + Self::Two(two) => two.sink(size, s).await, + Self::Three(three) => three.sink(size, s).await, + } + } + + async fn abort(&mut self) -> Result<()> { + match self { + Self::One(one) => one.abort().await, + Self::Two(two) => two.abort().await, + Self::Three(three) => three.abort().await, + } + } + + async fn close(&mut self) -> Result<()> { + match self { + Self::One(one) => one.close().await, + Self::Two(two) => two.close().await, + Self::Three(three) => three.close().await, + } + } +} diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs index 7994e2078..49dd94b2b 100644 --- a/core/src/raw/oio/write/mod.rs +++ b/core/src/raw/oio/write/mod.rs @@ -22,8 +22,9 @@ pub use api::Write; pub use api::WriteOperation; pub use api::Writer; -mod two_ways_write; -pub use two_ways_write::TwoWaysWriter; +mod compose_write; +pub use compose_write::ThreeWaysWriter; +pub use compose_write::TwoWaysWriter; mod multipart_upload_write; pub use multipart_upload_write::MultipartUploadPart; diff --git a/core/src/raw/oio/write/two_ways_write.rs b/core/src/raw/oio/write/two_ways_write.rs deleted file mode 100644 index 470655783..000000000 --- a/core/src/raw/oio/write/two_ways_write.rs +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use async_trait::async_trait; -use bytes::Bytes; - -use crate::raw::oio::Streamer; -use crate::raw::*; -use crate::*; - -/// TwoWaysWrite is used to implement [`Write`] based on two ways. -/// -/// Users can wrap two different writers together. -pub enum TwoWaysWriter<L: oio::Write, R: oio::Write> { - /// The left side for the [`TwoWaysWriter`]. - Left(L), - /// The right side for the [`TwoWaysWriter`]. - Right(R), -} - -#[async_trait] -impl<L: oio::Write, R: oio::Write> oio::Write for TwoWaysWriter<L, R> { - async fn write(&mut self, bs: Bytes) -> Result<()> { - match self { - Self::Left(l) => l.write(bs).await, - Self::Right(r) => r.write(bs).await, - } - } - - async fn sink(&mut self, size: u64, s: Streamer) -> Result<()> { - match self { - Self::Left(l) => l.sink(size, s).await, - Self::Right(r) => r.sink(size, s).await, - } - } - - async fn abort(&mut self) -> Result<()> { - match self { - Self::Left(l) => l.abort().await, - Self::Right(r) => r.abort().await, - } - } - - async fn close(&mut self) -> Result<()> { - match self { - Self::Left(l) => l.close().await, - Self::Right(r) => r.close().await, - } - } -} diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 62a968ea0..851e79e53 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -362,11 +362,11 @@ impl Accessor for CosBackend { let w = oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); - oio::TwoWaysWriter::Right(w) + oio::TwoWaysWriter::Two(w) } else { let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::Left(w) + oio::TwoWaysWriter::One(w) }; return Ok((RpWrite::default(), tw)); diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index ce2e83057..dc7676d4a 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -456,11 +456,11 @@ impl Accessor for ObsBackend { let w = oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); - oio::TwoWaysWriter::Right(w) + oio::TwoWaysWriter::Two(w) } else { let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::Left(w) + oio::TwoWaysWriter::One(w) }; return Ok((RpWrite::default(), tw)); diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 1b6d1f33e..029b3b52b 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -484,11 +484,11 @@ impl Accessor for OssBackend { let w = oio::AppendObjectWriter::new(writer).with_write_min_size(self.core.write_min_size); - oio::TwoWaysWriter::Right(w) + oio::TwoWaysWriter::Two(w) } else { let w = oio::MultipartUploadWriter::new(writer); - oio::TwoWaysWriter::Left(w) + oio::TwoWaysWriter::One(w) }; return Ok((RpWrite::default(), tw));
