This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ef9aab9c061648b08c351766ef37ddd12d0012c8
Author: Xuanwo <[email protected]>
AuthorDate: Thu Oct 26 22:08:20 2023 +0800

    fix read to end
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/read/api.rs | 102 +++++++++++++++++++------------------------
 1 file changed, 45 insertions(+), 57 deletions(-)

diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 3fd1459da..0b9aaf770 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -206,6 +206,8 @@ pub trait ReadExt: Read {
             reader: self,
             buf,
             start,
+            length: start,
+            next: MIN_READ_TO_END_GROW_SIZE,
         }
     }
 }
@@ -266,12 +268,19 @@ where
     }
 }
 
+/// The MIN read to end grow size.
+const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024;
+/// The MAX read to end grow size.
+const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024;
+
 /// Make this future `!Unpin` for compatibility with async trait methods.
 #[pin_project(!Unpin)]
 pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> {
     reader: &'a mut R,
     buf: &'a mut Vec<u8>,
     start: usize,
+    length: usize,
+    next: usize,
 }
 
 impl<R> Future for ReadToEndFuture<'_, R>
@@ -283,40 +292,39 @@ where
     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> 
{
         let this = self.project();
 
-        let mut g = ReadToEndGuard {
-            len: this.buf.len(),
-            buf: this.buf,
-            next: MIN_READ_TO_END_GROW_SIZE,
-        };
-
         loop {
-            if g.buf.capacity() - g.buf.len() < g.next {
-                g.buf.reserve(g.next);
+            if this.buf.capacity() == *this.length {
+                this.buf.reserve(*this.next);
                 // # Safety
                 //
                 // We make sure that the length of buf is maintained correctly.
                 #[allow(clippy::uninit_vec)]
                 unsafe {
-                    g.buf.set_len(g.buf.capacity());
+                    this.buf.set_len(this.buf.capacity());
                 }
             }
 
-            let buf = &mut g.buf[g.len..];
+            let buf = &mut this.buf[*this.length..];
             match ready!(this.reader.poll_read(cx, buf)) {
-                Ok(0) => return Poll::Ready(Ok(g.len - *this.start)),
+                Ok(0) => {
+                    unsafe {
+                        this.buf.set_len(*this.length);
+                    }
+                    return Poll::Ready(Ok(*this.length - *this.start));
+                }
                 Ok(n) => {
-                    g.next = if n >= g.next {
-                        cmp::min(g.next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
-                    } else if n >= g.next / 2 {
-                        g.next
+                    *this.next = if n >= *this.next {
+                        cmp::min((*this.next).saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
+                    } else if n >= *this.next / 2 {
+                        *this.next
                     } else {
-                        cmp::max(g.next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
+                        cmp::max((*this.next).saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
                     };
                     // We can't allow bogus values from read. If it is too 
large, the returned vec could have its length
                     // set past its capacity, or if it overflows the vec could 
be shortened which could create an invalid
                     // string if this is called via read_to_string.
                     assert!(n <= buf.len());
-                    g.len += n;
+                    *this.length += n;
                 }
                 Err(e) => return Poll::Ready(Err(e)),
             }
@@ -324,28 +332,6 @@ where
     }
 }
 
-const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024;
-const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024;
-
-/// ReadToEndGuard makes sure that the buf length is maintained correctly.
-struct ReadToEndGuard<'a> {
-    buf: &'a mut Vec<u8>,
-    /// Store the real length of buf.
-    len: usize,
-    next: usize,
-}
-
-impl Drop for ReadToEndGuard<'_> {
-    /// # Safety
-    ///
-    /// We make sure that the length of buf is maintained correctly.
-    fn drop(&mut self) {
-        unsafe {
-            self.buf.set_len(self.len);
-        }
-    }
-}
-
 /// BlockingReader is a boxed dyn `BlockingRead`.
 pub type BlockingReader = Box<dyn BlockingRead>;
 
@@ -371,42 +357,44 @@ pub trait BlockingRead: Send + Sync {
 
     /// Read all data of current reader to the end of buf.
     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
-        let start_len = buf.len();
-        let mut g = ReadToEndGuard {
-            len: buf.len(),
-            buf,
-            next: MIN_READ_TO_END_GROW_SIZE,
-        };
+        let start = buf.len();
+        let mut next = MAX_READ_TO_END_GROW_SIZE;
+        let mut length = start;
 
         loop {
-            if g.buf.capacity() - g.buf.len() < g.next {
-                g.buf.reserve(g.next);
+            if buf.capacity() == length {
+                buf.reserve(next);
                 // # Safety
                 //
                 // We make sure that the length of buf is maintained correctly.
                 #[allow(clippy::uninit_vec)]
                 unsafe {
-                    g.buf.set_len(g.buf.capacity());
+                    buf.set_len(buf.capacity());
                 }
             }
 
-            let buf = &mut g.buf[g.len..];
-            match self.read(buf) {
-                Ok(0) => return Ok(g.len - start_len),
+            let bs = &mut buf[length..];
+            match self.read(bs) {
+                Ok(0) => {
+                    unsafe {
+                        buf.set_len(length);
+                    }
+                    return Ok(length - start);
+                }
                 Ok(n) => {
-                    g.next = if n >= g.next {
-                        cmp::min(g.next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
-                    } else if n >= g.next / 2 {
-                        g.next
+                    next = if n >= next {
+                        cmp::min(next.saturating_mul(2), 
MAX_READ_TO_END_GROW_SIZE)
+                    } else if n >= next / 2 {
+                        next
                     } else {
-                        cmp::max(g.next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
+                        cmp::max(next.saturating_div(2), 
MIN_READ_TO_END_GROW_SIZE)
                     };
 
                     // We can't allow bogus values from read. If it is too 
large, the returned vec could have its length
                     // set past its capacity, or if it overflows the vec could 
be shortened which could create an invalid
                     // string if this is called via read_to_string.
                     assert!(n <= buf.len());
-                    g.len += n;
+                    length += n;
                 }
                 Err(e) => return Err(e),
             }

Reply via email to