This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ef9aab9c061648b08c351766ef37ddd12d0012c8 Author: Xuanwo <[email protected]> AuthorDate: Thu Oct 26 22:08:20 2023 +0800 fix read to end Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/read/api.rs | 102 +++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 57 deletions(-) diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs index 3fd1459da..0b9aaf770 100644 --- a/core/src/raw/oio/read/api.rs +++ b/core/src/raw/oio/read/api.rs @@ -206,6 +206,8 @@ pub trait ReadExt: Read { reader: self, buf, start, + length: start, + next: MIN_READ_TO_END_GROW_SIZE, } } } @@ -266,12 +268,19 @@ where } } +/// The MIN read to end grow size. +const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024; +/// The MAX read to end grow size. +const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024; + /// Make this future `!Unpin` for compatibility with async trait methods. #[pin_project(!Unpin)] pub struct ReadToEndFuture<'a, R: Read + Unpin + ?Sized> { reader: &'a mut R, buf: &'a mut Vec<u8>, start: usize, + length: usize, + next: usize, } impl<R> Future for ReadToEndFuture<'_, R> @@ -283,40 +292,39 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize>> { let this = self.project(); - let mut g = ReadToEndGuard { - len: this.buf.len(), - buf: this.buf, - next: MIN_READ_TO_END_GROW_SIZE, - }; - loop { - if g.buf.capacity() - g.buf.len() < g.next { - g.buf.reserve(g.next); + if this.buf.capacity() == *this.length { + this.buf.reserve(*this.next); // # Safety // // We make sure that the length of buf is maintained correctly. #[allow(clippy::uninit_vec)] unsafe { - g.buf.set_len(g.buf.capacity()); + this.buf.set_len(this.buf.capacity()); } } - let buf = &mut g.buf[g.len..]; + let buf = &mut this.buf[*this.length..]; match ready!(this.reader.poll_read(cx, buf)) { - Ok(0) => return Poll::Ready(Ok(g.len - *this.start)), + Ok(0) => { + unsafe { + this.buf.set_len(*this.length); + } + return Poll::Ready(Ok(*this.length - *this.start)); + } Ok(n) => { - g.next = if n >= g.next { - cmp::min(g.next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) - } else if n >= g.next / 2 { - g.next + *this.next = if n >= *this.next { + cmp::min((*this.next).saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) + } else if n >= *this.next / 2 { + *this.next } else { - cmp::max(g.next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) + cmp::max((*this.next).saturating_div(2), MIN_READ_TO_END_GROW_SIZE) }; // We can't allow bogus values from read. If it is too large, the returned vec could have its length // set past its capacity, or if it overflows the vec could be shortened which could create an invalid // string if this is called via read_to_string. assert!(n <= buf.len()); - g.len += n; + *this.length += n; } Err(e) => return Poll::Ready(Err(e)), } @@ -324,28 +332,6 @@ where } } -const MIN_READ_TO_END_GROW_SIZE: usize = 8 * 1024; -const MAX_READ_TO_END_GROW_SIZE: usize = 4 * 1024 * 1024; - -/// ReadToEndGuard makes sure that the buf length is maintained correctly. -struct ReadToEndGuard<'a> { - buf: &'a mut Vec<u8>, - /// Store the real length of buf. - len: usize, - next: usize, -} - -impl Drop for ReadToEndGuard<'_> { - /// # Safety - /// - /// We make sure that the length of buf is maintained correctly. - fn drop(&mut self) { - unsafe { - self.buf.set_len(self.len); - } - } -} - /// BlockingReader is a boxed dyn `BlockingRead`. pub type BlockingReader = Box<dyn BlockingRead>; @@ -371,42 +357,44 @@ pub trait BlockingRead: Send + Sync { /// Read all data of current reader to the end of buf. fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> { - let start_len = buf.len(); - let mut g = ReadToEndGuard { - len: buf.len(), - buf, - next: MIN_READ_TO_END_GROW_SIZE, - }; + let start = buf.len(); + let mut next = MAX_READ_TO_END_GROW_SIZE; + let mut length = start; loop { - if g.buf.capacity() - g.buf.len() < g.next { - g.buf.reserve(g.next); + if buf.capacity() == length { + buf.reserve(next); // # Safety // // We make sure that the length of buf is maintained correctly. #[allow(clippy::uninit_vec)] unsafe { - g.buf.set_len(g.buf.capacity()); + buf.set_len(buf.capacity()); } } - let buf = &mut g.buf[g.len..]; - match self.read(buf) { - Ok(0) => return Ok(g.len - start_len), + let bs = &mut buf[length..]; + match self.read(bs) { + Ok(0) => { + unsafe { + buf.set_len(length); + } + return Ok(length - start); + } Ok(n) => { - g.next = if n >= g.next { - cmp::min(g.next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) - } else if n >= g.next / 2 { - g.next + next = if n >= next { + cmp::min(next.saturating_mul(2), MAX_READ_TO_END_GROW_SIZE) + } else if n >= next / 2 { + next } else { - cmp::max(g.next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) + cmp::max(next.saturating_div(2), MIN_READ_TO_END_GROW_SIZE) }; // We can't allow bogus values from read. If it is too large, the returned vec could have its length // set past its capacity, or if it overflows the vec could be shortened which could create an invalid // string if this is called via read_to_string. assert!(n <= buf.len()); - g.len += n; + length += n; } Err(e) => return Err(e), }
