This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new cec4d72211 feat: implement concurrent `RangeWriter` (#3923)
cec4d72211 is described below

commit cec4d72211ca3e182c7ddfd719035223f0af16fd
Author: Weny Xu <[email protected]>
AuthorDate: Sat Jan 6 13:48:37 2024 +0900

    feat: implement concurrent `RangeWriter` (#3923)
    
    * feat: implement concurrent `RangeWriter`
    
    * chore: rename `written` to `offset`
---
 core/src/raw/oio/write/range_write.rs | 198 ++++++++++++++++++----------------
 core/src/services/gcs/backend.rs      |   3 +-
 2 files changed, 106 insertions(+), 95 deletions(-)

diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 8e6c72f61e..7e2f9f0883 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -15,12 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::pin::Pin;
+use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
+use futures::Future;
 use futures::FutureExt;
+use futures::StreamExt;
 
 use crate::raw::oio::WriteBuf;
 use crate::raw::*;
@@ -64,7 +68,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
     async fn write_range(
         &self,
         location: &str,
-        written: u64,
+        offset: u64,
         size: u64,
         body: AsyncBody,
     ) -> Result<()>;
@@ -73,7 +77,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
     async fn complete_range(
         &self,
         location: &str,
-        written: u64,
+        offset: u64,
         size: u64,
         body: AsyncBody,
     ) -> Result<()>;
@@ -82,105 +86,123 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
     async fn abort_range(&self, location: &str) -> Result<()>;
 }
 
+struct WriteRangeFuture(BoxedFuture<Result<u64>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
WriteRangeFuture.
+unsafe impl Send for WriteRangeFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteRangeFuture.
+unsafe impl Sync for WriteRangeFuture {}
+
+impl Future for WriteRangeFuture {
+    type Output = Result<u64>;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.get_mut().0.poll_unpin(cx)
+    }
+}
+
 /// RangeWriter will implements [`Write`] based on range write.
 pub struct RangeWriter<W: RangeWrite> {
     location: Option<String>,
-    written: u64,
+    next_offset: u64,
     buffer: Option<oio::ChunkedBytes>,
+    futures: ConcurrentFutures<WriteRangeFuture>,
 
-    state: State<W>,
+    w: Arc<W>,
+    state: State,
 }
 
-enum State<W> {
-    Idle(Option<W>),
-    Init(BoxedFuture<(W, Result<String>)>),
-    Write(BoxedFuture<(W, Result<u64>)>),
-    Complete(BoxedFuture<(W, Result<()>)>),
-    Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+    Idle,
+    Init(BoxedFuture<Result<String>>),
+    Complete(BoxedFuture<Result<()>>),
+    Abort(BoxedFuture<Result<()>>),
 }
 
 /// # Safety
 ///
 /// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: RangeWrite> Send for State<S> {}
+unsafe impl Send for State {}
 
 /// # Safety
 ///
 /// We will only take `&mut Self` reference for State.
-unsafe impl<W: RangeWrite> Sync for State<W> {}
+unsafe impl Sync for State {}
 
 impl<W: RangeWrite> RangeWriter<W> {
     /// Create a new MultipartUploadWriter.
-    pub fn new(inner: W) -> Self {
+    pub fn new(inner: W, concurrent: usize) -> Self {
         Self {
-            state: State::Idle(Some(inner)),
+            state: State::Idle,
+            w: Arc::new(inner),
 
+            futures: ConcurrentFutures::new(1.max(concurrent)),
             buffer: None,
             location: None,
-            written: 0,
+            next_offset: 0,
         }
     }
+
+    fn fill_cache(&mut self, bs: &dyn WriteBuf) -> usize {
+        let size = bs.remaining();
+        let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+        assert!(self.buffer.is_none());
+        self.buffer = Some(bs);
+        size
+    }
 }
 
 impl<W: RangeWrite> oio::Write for RangeWriter<W> {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
         loop {
             match &mut self.state {
-                State::Idle(w) => {
+                State::Idle => {
                     match self.location.clone() {
                         Some(location) => {
-                            let written = self.written;
-
-                            let buffer = self.buffer.clone().expect("cache 
must be valid").clone();
-                            let w = w.take().expect("writer must be valid");
-                            self.state = State::Write(Box::pin(async move {
-                                let size = buffer.len() as u64;
-                                let res = w
-                                    .write_range(
+                            if self.futures.has_remaining() {
+                                let cache = self.buffer.take().expect("cache 
must be valid");
+                                let size = cache.len() as u64;
+                                let offset = self.next_offset;
+                                self.next_offset += size;
+                                let w = self.w.clone();
+                                
self.futures.push(WriteRangeFuture(Box::pin(async move {
+                                    w.write_range(
                                         &location,
-                                        written,
+                                        offset,
                                         size,
-                                        AsyncBody::ChunkedBytes(buffer),
+                                        AsyncBody::ChunkedBytes(cache),
                                     )
-                                    .await;
-
-                                (w, res.map(|_| size))
-                            }));
+                                    .await
+                                    .map(|_| size)
+                                })));
+                                let size = self.fill_cache(bs);
+                                return Poll::Ready(Ok(size));
+                            } else if let Some(size) = 
ready!(self.futures.poll_next_unpin(cx)) {
+                                self.next_offset += size?;
+                            }
                         }
                         None => {
                             // Fill cache with the first write.
                             if self.buffer.is_none() {
-                                let size = bs.remaining();
-                                let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
-                                self.buffer = Some(cb);
+                                let size = self.fill_cache(bs);
                                 return Poll::Ready(Ok(size));
                             }
 
-                            let w = w.take().expect("writer must be valid");
-                            self.state = State::Init(Box::pin(async move {
-                                let location = w.initiate_range().await;
-                                (w, location)
-                            }));
+                            let w = self.w.clone();
+                            self.state =
+                                State::Init(Box::pin(async move { 
w.initiate_range().await }));
                         }
                     }
                 }
                 State::Init(fut) => {
-                    let (w, res) = ready!(fut.poll_unpin(cx));
-                    self.state = State::Idle(Some(w));
+                    let res = ready!(fut.poll_unpin(cx));
+                    self.state = State::Idle;
                     self.location = Some(res?);
                 }
-                State::Write(fut) => {
-                    let (w, size) = ready!(fut.as_mut().poll(cx));
-                    self.state = State::Idle(Some(w));
-                    // Update the written.
-                    self.written += size?;
-
-                    // Replace the cache when last write succeeded
-                    let size = bs.remaining();
-                    let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
-                    self.buffer = Some(cb);
-                    return Poll::Ready(Ok(size));
-                }
                 State::Complete(_) => {
                     unreachable!("RangeWriter must not go into State::Complete 
during poll_write")
                 }
@@ -194,23 +216,26 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match &mut self.state {
-                State::Idle(w) => {
-                    let w = w.take().expect("writer must be valid");
+                State::Idle => {
+                    let w = self.w.clone();
                     match self.location.clone() {
                         Some(location) => {
-                            let written = self.written;
-                            match self.buffer.clone() {
+                            if !self.futures.is_empty() {
+                                while let Some(size) = 
ready!(self.futures.poll_next_unpin(cx)) {
+                                    self.next_offset += size?;
+                                }
+                            }
+                            match self.buffer.take() {
                                 Some(bs) => {
+                                    let offset = self.next_offset;
                                     self.state = 
State::Complete(Box::pin(async move {
-                                        let res = w
-                                            .complete_range(
-                                                &location,
-                                                written,
-                                                bs.len() as u64,
-                                                AsyncBody::ChunkedBytes(bs),
-                                            )
-                                            .await;
-                                        (w, res)
+                                        w.complete_range(
+                                            &location,
+                                            offset,
+                                            bs.len() as u64,
+                                            AsyncBody::ChunkedBytes(bs),
+                                        )
+                                        .await
                                     }));
                                 }
                                 None => {
@@ -222,17 +247,13 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                             Some(bs) => {
                                 self.state = State::Complete(Box::pin(async 
move {
                                     let size = bs.len();
-                                    let res = w
-                                        .write_once(size as u64, 
AsyncBody::ChunkedBytes(bs))
-                                        .await;
-                                    (w, res)
+                                    w.write_once(size as u64, 
AsyncBody::ChunkedBytes(bs)).await
                                 }));
                             }
                             None => {
                                 // Call write_once if there is no data in 
buffer and no location.
                                 self.state = State::Complete(Box::pin(async 
move {
-                                    let res = w.write_once(0, 
AsyncBody::Empty).await;
-                                    (w, res)
+                                    w.write_once(0, AsyncBody::Empty).await
                                 }));
                             }
                         },
@@ -241,12 +262,9 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                 State::Init(_) => {
                     unreachable!("RangeWriter must not go into State::Init 
during poll_close")
                 }
-                State::Write(_) => {
-                    unreachable!("RangeWriter must not go into State::Write 
during poll_close")
-                }
                 State::Complete(fut) => {
-                    let (w, res) = ready!(fut.poll_unpin(cx));
-                    self.state = State::Idle(Some(w));
+                    let res = ready!(fut.poll_unpin(cx));
+                    self.state = State::Idle;
                     return Poll::Ready(res);
                 }
                 State::Abort(_) => {
@@ -259,32 +277,24 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match &mut self.state {
-                State::Idle(w) => {
-                    let w = w.take().unwrap();
-                    match self.location.clone() {
-                        Some(location) => {
-                            let fut = async move {
-                                let res = w.abort_range(&location).await;
-
-                                (w, res)
-                            };
-                            self.state = State::Abort(Box::pin(fut));
-                        }
-                        None => return Poll::Ready(Ok(())),
+                State::Idle => match self.location.clone() {
+                    Some(location) => {
+                        let w = self.w.clone();
+                        self.futures.clear();
+                        self.state =
+                            State::Abort(Box::pin(async move { 
w.abort_range(&location).await }));
                     }
-                }
+                    None => return Poll::Ready(Ok(())),
+                },
                 State::Init(_) => {
                     unreachable!("RangeWriter must not go into State::Init 
during poll_close")
                 }
-                State::Write(_) => {
-                    unreachable!("RangeWriter must not go into State::Write 
during poll_close")
-                }
                 State::Complete(_) => {
                     unreachable!("RangeWriter must not go into State::Complete 
during poll_close")
                 }
                 State::Abort(fut) => {
-                    let (w, res) = ready!(fut.poll_unpin(cx));
-                    self.state = State::Idle(Some(w));
+                    let res = ready!(fut.poll_unpin(cx));
+                    self.state = State::Idle;
                     // We should check res first before clean up cache.
                     res?;
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 3142f8a9b8..79ae7a8ee4 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -420,8 +420,9 @@ impl Accessor for GcsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let concurrent = args.concurrent();
         let w = GcsWriter::new(self.core.clone(), path, args);
-        let w = oio::RangeWriter::new(w);
+        let w = oio::RangeWriter::new(w, concurrent);
 
         Ok((RpWrite::default(), w))
     }

Reply via email to