This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new cec4d72211 feat: implement concurrent `RangeWriter` (#3923)
cec4d72211 is described below
commit cec4d72211ca3e182c7ddfd719035223f0af16fd
Author: Weny Xu <[email protected]>
AuthorDate: Sat Jan 6 13:48:37 2024 +0900
feat: implement concurrent `RangeWriter` (#3923)
* feat: implement concurrent `RangeWriter`
* chore: rename `written` to `offset`
---
core/src/raw/oio/write/range_write.rs | 198 ++++++++++++++++++----------------
core/src/services/gcs/backend.rs | 3 +-
2 files changed, 106 insertions(+), 95 deletions(-)
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 8e6c72f61e..7e2f9f0883 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -15,12 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use std::pin::Pin;
+use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
+use futures::Future;
use futures::FutureExt;
+use futures::StreamExt;
use crate::raw::oio::WriteBuf;
use crate::raw::*;
@@ -64,7 +68,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
async fn write_range(
&self,
location: &str,
- written: u64,
+ offset: u64,
size: u64,
body: AsyncBody,
) -> Result<()>;
@@ -73,7 +77,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
async fn complete_range(
&self,
location: &str,
- written: u64,
+ offset: u64,
size: u64,
body: AsyncBody,
) -> Result<()>;
@@ -82,105 +86,123 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
async fn abort_range(&self, location: &str) -> Result<()>;
}
+struct WriteRangeFuture(BoxedFuture<Result<u64>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteRangeFuture.
+unsafe impl Send for WriteRangeFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteRangeFuture.
+unsafe impl Sync for WriteRangeFuture {}
+
+impl Future for WriteRangeFuture {
+ type Output = Result<u64>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
/// RangeWriter will implements [`Write`] based on range write.
pub struct RangeWriter<W: RangeWrite> {
location: Option<String>,
- written: u64,
+ next_offset: u64,
buffer: Option<oio::ChunkedBytes>,
+ futures: ConcurrentFutures<WriteRangeFuture>,
- state: State<W>,
+ w: Arc<W>,
+ state: State,
}
-enum State<W> {
- Idle(Option<W>),
- Init(BoxedFuture<(W, Result<String>)>),
- Write(BoxedFuture<(W, Result<u64>)>),
- Complete(BoxedFuture<(W, Result<()>)>),
- Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+ Idle,
+ Init(BoxedFuture<Result<String>>),
+ Complete(BoxedFuture<Result<()>>),
+ Abort(BoxedFuture<Result<()>>),
}
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: RangeWrite> Send for State<S> {}
+unsafe impl Send for State {}
/// # Safety
///
/// We will only take `&mut Self` reference for State.
-unsafe impl<W: RangeWrite> Sync for State<W> {}
+unsafe impl Sync for State {}
impl<W: RangeWrite> RangeWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W) -> Self {
+ pub fn new(inner: W, concurrent: usize) -> Self {
Self {
- state: State::Idle(Some(inner)),
+ state: State::Idle,
+ w: Arc::new(inner),
+ futures: ConcurrentFutures::new(1.max(concurrent)),
buffer: None,
location: None,
- written: 0,
+ next_offset: 0,
}
}
+
+ fn fill_cache(&mut self, bs: &dyn WriteBuf) -> usize {
+ let size = bs.remaining();
+ let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ assert!(self.buffer.is_none());
+ self.buffer = Some(bs);
+ size
+ }
}
impl<W: RangeWrite> oio::Write for RangeWriter<W> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) ->
Poll<Result<usize>> {
loop {
match &mut self.state {
- State::Idle(w) => {
+ State::Idle => {
match self.location.clone() {
Some(location) => {
- let written = self.written;
-
- let buffer = self.buffer.clone().expect("cache
must be valid").clone();
- let w = w.take().expect("writer must be valid");
- self.state = State::Write(Box::pin(async move {
- let size = buffer.len() as u64;
- let res = w
- .write_range(
+ if self.futures.has_remaining() {
+ let cache = self.buffer.take().expect("cache
must be valid");
+ let size = cache.len() as u64;
+ let offset = self.next_offset;
+ self.next_offset += size;
+ let w = self.w.clone();
+
self.futures.push(WriteRangeFuture(Box::pin(async move {
+ w.write_range(
&location,
- written,
+ offset,
size,
- AsyncBody::ChunkedBytes(buffer),
+ AsyncBody::ChunkedBytes(cache),
)
- .await;
-
- (w, res.map(|_| size))
- }));
+ .await
+ .map(|_| size)
+ })));
+ let size = self.fill_cache(bs);
+ return Poll::Ready(Ok(size));
+ } else if let Some(size) =
ready!(self.futures.poll_next_unpin(cx)) {
+ self.next_offset += size?;
+ }
}
None => {
// Fill cache with the first write.
if self.buffer.is_none() {
- let size = bs.remaining();
- let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
- self.buffer = Some(cb);
+ let size = self.fill_cache(bs);
return Poll::Ready(Ok(size));
}
- let w = w.take().expect("writer must be valid");
- self.state = State::Init(Box::pin(async move {
- let location = w.initiate_range().await;
- (w, location)
- }));
+ let w = self.w.clone();
+ self.state =
+ State::Init(Box::pin(async move {
w.initiate_range().await }));
}
}
}
State::Init(fut) => {
- let (w, res) = ready!(fut.poll_unpin(cx));
- self.state = State::Idle(Some(w));
+ let res = ready!(fut.poll_unpin(cx));
+ self.state = State::Idle;
self.location = Some(res?);
}
- State::Write(fut) => {
- let (w, size) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(w));
- // Update the written.
- self.written += size?;
-
- // Replace the cache when last write succeeded
- let size = bs.remaining();
- let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
- self.buffer = Some(cb);
- return Poll::Ready(Ok(size));
- }
State::Complete(_) => {
unreachable!("RangeWriter must not go into State::Complete
during poll_write")
}
@@ -194,23 +216,26 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match &mut self.state {
- State::Idle(w) => {
- let w = w.take().expect("writer must be valid");
+ State::Idle => {
+ let w = self.w.clone();
match self.location.clone() {
Some(location) => {
- let written = self.written;
- match self.buffer.clone() {
+ if !self.futures.is_empty() {
+ while let Some(size) =
ready!(self.futures.poll_next_unpin(cx)) {
+ self.next_offset += size?;
+ }
+ }
+ match self.buffer.take() {
Some(bs) => {
+ let offset = self.next_offset;
self.state =
State::Complete(Box::pin(async move {
- let res = w
- .complete_range(
- &location,
- written,
- bs.len() as u64,
- AsyncBody::ChunkedBytes(bs),
- )
- .await;
- (w, res)
+ w.complete_range(
+ &location,
+ offset,
+ bs.len() as u64,
+ AsyncBody::ChunkedBytes(bs),
+ )
+ .await
}));
}
None => {
@@ -222,17 +247,13 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
Some(bs) => {
self.state = State::Complete(Box::pin(async
move {
let size = bs.len();
- let res = w
- .write_once(size as u64,
AsyncBody::ChunkedBytes(bs))
- .await;
- (w, res)
+ w.write_once(size as u64,
AsyncBody::ChunkedBytes(bs)).await
}));
}
None => {
// Call write_once if there is no data in
buffer and no location.
self.state = State::Complete(Box::pin(async
move {
- let res = w.write_once(0,
AsyncBody::Empty).await;
- (w, res)
+ w.write_once(0, AsyncBody::Empty).await
}));
}
},
@@ -241,12 +262,9 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Init(_) => {
unreachable!("RangeWriter must not go into State::Init
during poll_close")
}
- State::Write(_) => {
- unreachable!("RangeWriter must not go into State::Write
during poll_close")
- }
State::Complete(fut) => {
- let (w, res) = ready!(fut.poll_unpin(cx));
- self.state = State::Idle(Some(w));
+ let res = ready!(fut.poll_unpin(cx));
+ self.state = State::Idle;
return Poll::Ready(res);
}
State::Abort(_) => {
@@ -259,32 +277,24 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match &mut self.state {
- State::Idle(w) => {
- let w = w.take().unwrap();
- match self.location.clone() {
- Some(location) => {
- let fut = async move {
- let res = w.abort_range(&location).await;
-
- (w, res)
- };
- self.state = State::Abort(Box::pin(fut));
- }
- None => return Poll::Ready(Ok(())),
+ State::Idle => match self.location.clone() {
+ Some(location) => {
+ let w = self.w.clone();
+ self.futures.clear();
+ self.state =
+ State::Abort(Box::pin(async move {
w.abort_range(&location).await }));
}
- }
+ None => return Poll::Ready(Ok(())),
+ },
State::Init(_) => {
unreachable!("RangeWriter must not go into State::Init
during poll_close")
}
- State::Write(_) => {
- unreachable!("RangeWriter must not go into State::Write
during poll_close")
- }
State::Complete(_) => {
unreachable!("RangeWriter must not go into State::Complete
during poll_close")
}
State::Abort(fut) => {
- let (w, res) = ready!(fut.poll_unpin(cx));
- self.state = State::Idle(Some(w));
+ let res = ready!(fut.poll_unpin(cx));
+ self.state = State::Idle;
// We should check res first before clean up cache.
res?;
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 3142f8a9b8..79ae7a8ee4 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -420,8 +420,9 @@ impl Accessor for GcsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let concurrent = args.concurrent();
let w = GcsWriter::new(self.core.clone(), path, args);
- let w = oio::RangeWriter::new(w);
+ let w = oio::RangeWriter::new(w, concurrent);
Ok((RpWrite::default(), w))
}