This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch refactor-writer
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 5fb041f9825d437641a4e136da996489244db9ed
Author: Xuanwo <[email protected]>
AuthorDate: Mon Sep 4 18:03:27 2023 +0800

    Save code
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/benches/oio/write.rs                    |   8 +-
 core/src/raw/oio/cursor.rs                   |  14 ---
 core/src/raw/oio/write/at_least_buf_write.rs |   2 +-
 core/src/raw/oio/write/exact_buf_write.rs    | 173 +++++++++++++++------------
 4 files changed, 107 insertions(+), 90 deletions(-)

diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 6e26ce7e0..506a974f0 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::{Buf, Bytes};
 use criterion::Criterion;
 use once_cell::sync::Lazy;
 use opendal::raw::oio::AtLeastBufWriter;
@@ -71,7 +72,12 @@ pub fn bench_exact_buf_write(c: &mut Criterion) {
         group.bench_with_input(size.to_string(), &content, |b, content| {
             b.to_async(&*TOKIO).iter(|| async {
                 let mut w = ExactBufWriter::new(BlackHoleWriter, 256 * 1024);
-                w.write(content.clone()).await.unwrap();
+
+                let mut bs = Bytes::from(content.clone());
+                while !bs.is_empty() {
+                    let n = w.write(bs.clone()).await.unwrap();
+                    bs.advance(n as usize);
+                }
                 w.close().await.unwrap();
             })
         });
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index 88a713068..796b2a349 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -343,20 +343,6 @@ impl oio::Stream for ChunkedCursor {
     }
 }
 
-impl oio::Read for ChunkedCursor {
-    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
-        todo!()
-    }
-
-    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
-        todo!()
-    }
-
-    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
-        todo!()
-    }
-}
-
 /// VectorCursor is the cursor for [`Vec<Bytes>`] that implements 
[`oio::Stream`]
 pub struct VectorCursor {
     inner: VecDeque<Bytes>,
diff --git a/core/src/raw/oio/write/at_least_buf_write.rs 
b/core/src/raw/oio/write/at_least_buf_write.rs
index 99c20ee46..ebfff8ffd 100644
--- a/core/src/raw/oio/write/at_least_buf_write.rs
+++ b/core/src/raw/oio/write/at_least_buf_write.rs
@@ -90,7 +90,7 @@ impl<W: oio::Write> oio::Write for AtLeastBufWriter<W> {
             })
     }
 
-    async fn pipe(&mut self, size: u64, s: oio::Reader) -> Result<u64> {
+    async fn pipe(&mut self, _: u64, s: oio::Reader) -> Result<u64> {
         todo!()
     }
 
diff --git a/core/src/raw/oio/write/exact_buf_write.rs 
b/core/src/raw/oio/write/exact_buf_write.rs
index e77330e0b..57118fce8 100644
--- a/core/src/raw/oio/write/exact_buf_write.rs
+++ b/core/src/raw/oio/write/exact_buf_write.rs
@@ -18,10 +18,10 @@
 use std::cmp::min;
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use tokio::io::ReadBuf;
 
-use crate::raw::oio::StreamExt;
-use crate::raw::oio::Streamer;
+use crate::raw::oio::ReadExt;
 use crate::raw::*;
 use crate::*;
 
@@ -41,9 +41,7 @@ pub struct ExactBufWriter<W: oio::Write> {
 
     /// The size for buffer, we will flush the underlying storage at the size 
of this buffer.
     buffer_size: usize,
-    buffer: oio::ChunkedCursor,
-
-    buffer_stream: Option<Streamer>,
+    buffer: Buffer,
 }
 
 impl<W: oio::Write> ExactBufWriter<W> {
@@ -52,91 +50,108 @@ impl<W: oio::Write> ExactBufWriter<W> {
         Self {
             inner,
             buffer_size,
-            buffer: oio::ChunkedCursor::new(),
-            buffer_stream: None,
-        }
-    }
-
-    /// Next bytes is used to fetch bytes from buffer or input streamer.
-    ///
-    /// We need this function because we need to make sure our write is 
reentrant.
-    /// We can't mutate state unless we are sure that the write is successful.
-    async fn next_bytes(&mut self, s: &mut Streamer) -> Option<Result<Bytes>> {
-        match self.buffer_stream.as_mut() {
-            None => s.next().await,
-            Some(bs) => match bs.next().await {
-                None => {
-                    self.buffer_stream = None;
-                    s.next().await
-                }
-                Some(v) => Some(v),
-            },
+            buffer: Buffer::Filling(BytesMut::new()),
         }
     }
+}
 
-    fn chain_stream(&mut self, s: Streamer) {
-        self.buffer_stream = match self.buffer_stream.take() {
-            Some(stream) => Some(Box::new(stream.chain(s))),
-            None => Some(s),
-        }
-    }
+enum Buffer {
+    Filling(BytesMut),
+    Consuming(Bytes),
 }
 
 #[async_trait]
 impl<W: oio::Write> oio::Write for ExactBufWriter<W> {
-    async fn write(&mut self, bs: Bytes) -> Result<u64> {
-        self.pipe(bs.len() as u64, Box::new(oio::Cursor::from(bs)))
-            .await
+    async fn write(&mut self, mut bs: Bytes) -> Result<u64> {
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    if fill.len() == self.buffer_size {
+                        self.buffer = Buffer::Consuming(fill.split().freeze());
+                        continue;
+                    }
+
+                    let size = min(self.buffer_size - fill.len(), bs.len());
+                    fill.extend_from_slice(&bs[..size]);
+                    bs.advance(size);
+                    return Ok(size as u64);
+                }
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    self.buffer = Buffer::Filling(BytesMut::new());
+                }
+            }
+        }
     }
 
-    /// # TODO
-    ///
-    /// We know every stream size, we can collect them into a buffer without 
chain them every time.
     async fn pipe(&mut self, _: u64, mut s: oio::Reader) -> Result<u64> {
-        todo!()
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    if fill.len() == self.buffer_size {
+                        self.buffer = Buffer::Consuming(fill.split().freeze());
+                        continue;
+                    }
+
+                    // Reserve to buffer size.
+                    fill.reserve(self.buffer_size - fill.len());
+                    let dst = fill.spare_capacity_mut();
+                    let dst_len = dst.len();
+                    let mut buf = ReadBuf::uninit(dst);
+
+                    // Safety: the input buffer is created 
with_capacity(length).
+                    unsafe { buf.assume_init(dst_len) };
+
+                    let n = s.read(buf.filled_mut()).await?;
+
+                    // Safety: read makes sure this buffer has been filled.
+                    unsafe { fill.advance_mut(n) };
+
+                    return Ok(n as u64);
+                }
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    self.buffer = Buffer::Filling(BytesMut::new());
+                }
+            }
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.buffer.clear();
-        self.buffer_stream = None;
-
+        self.buffer = Buffer::Filling(BytesMut::new());
         self.inner.abort().await
     }
 
     async fn close(&mut self) -> Result<()> {
-        while let Some(stream) = self.buffer_stream.as_mut() {
-            let bs = stream.next().await.transpose()?;
-            match bs {
-                None => {
-                    self.buffer_stream = None;
+        loop {
+            match &mut self.buffer {
+                Buffer::Filling(fill) => {
+                    self.buffer = Buffer::Consuming(fill.split().freeze());
+                    continue;
                 }
-                Some(bs) => {
-                    self.buffer.push(bs);
+                Buffer::Consuming(consume) => {
+                    // Make sure filled buffer has been flushed.
+                    //
+                    // TODO: maybe we can re-fill it after a successful write.
+                    while !consume.is_empty() {
+                        let n = self.inner.write(consume.clone()).await?;
+                        consume.advance(n as usize);
+                    }
+                    break;
                 }
             }
-
-            if self.buffer.len() >= self.buffer_size {
-                let mut buf = self.buffer.clone();
-                let to_write = buf.split_to(self.buffer_size);
-                self.inner
-                    .pipe(to_write.len() as u64, Box::new(to_write))
-                    .await
-                    // Replace buffer with remaining if the write is 
successful.
-                    .map(|_| {
-                        self.buffer = buf;
-                    })?;
-            }
-        }
-
-        while !self.buffer.is_empty() {
-            let mut buf = self.buffer.clone();
-            let to_write = buf.split_to(min(self.buffer_size, buf.len()));
-
-            self.inner
-                .pipe(to_write.len() as u64, Box::new(to_write))
-                .await
-                // Replace buffer with remaining if the write is successful.
-                .map(|_| self.buffer = buf)?;
         }
 
         self.inner.close().await
@@ -172,7 +187,7 @@ mod tests {
 
         async fn pipe(&mut self, size: u64, mut s: oio::Reader) -> Result<u64> 
{
             let mut bs = vec![];
-            s.read_to_end(&mut bs).await?;
+            s.read_to_end(&mut bs).await.unwrap();
             assert_eq!(bs.len() as u64, size);
             self.write(bs.into()).await
         }
@@ -200,7 +215,12 @@ mod tests {
 
         let mut w = ExactBufWriter::new(MockWriter { buf: vec![] }, 10);
 
-        w.write(Bytes::from(expected.clone())).await?;
+        let mut bs = Bytes::from(expected.clone());
+        while !bs.is_empty() {
+            let n = w.write(bs.clone()).await?;
+            bs.advance(n as usize);
+        }
+
         w.close().await?;
 
         assert_eq!(w.inner.buf.len(), expected.len());
@@ -233,7 +253,12 @@ mod tests {
             rng.fill_bytes(&mut content);
 
             expected.extend_from_slice(&content);
-            writer.write(Bytes::from(content)).await?;
+
+            let mut bs = Bytes::from(content.clone());
+            while !bs.is_empty() {
+                let n = writer.write(bs.clone()).await?;
+                bs.advance(n as usize);
+            }
         }
         writer.close().await?;
 

Reply via email to