This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 3556f64151e833085334389f4e6eae0d56aea034 Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 17:30:32 2023 +0800 refactor by range Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 49 +-- .../raw/oio/read/into_seekable_read_by_range.rs | 396 ++++++++++++++++----- core/src/raw/ops.rs | 17 + 3 files changed, 323 insertions(+), 139 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 36502fff6..c0fe7b480 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -162,9 +162,7 @@ impl<A: Accessor> CompleteAccessor<A> { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; - let range = args.range(); - let (rp, r) = self.inner.read(path, args).await?; - let content_length = rp.metadata().content_length(); + let (rp, r) = self.inner.read(path, args.clone()).await?; match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), @@ -173,24 +171,7 @@ impl<A: Accessor> CompleteAccessor<A> { Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { - let (offset, size) = match (range.offset(), range.size()) { - (Some(offset), _) => (offset, content_length), - (None, None) => (0, content_length), - (None, Some(size)) => { - // TODO: we can read content range to calculate - // the total content length. - let om = self.inner.stat(path, OpStat::new()).await?.into_metadata(); - let total_size = om.content_length(); - let (offset, size) = if size > total_size { - (0, total_size) - } else { - (total_size - size, size) - }; - - (offset, size) - } - }; - let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size); + let r = oio::into_seekable_read_by_range(self.inner.clone(), path, args); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) @@ -215,9 +196,7 @@ impl<A: Accessor> CompleteAccessor<A> { let seekable = capability.read_can_seek; let streamable = capability.read_can_next; - let range = args.range(); - let (rp, r) = self.inner.blocking_read(path, args)?; - let content_length = rp.metadata().content_length(); + let (rp, r) = self.inner.blocking_read(path, args.clone())?; match (seekable, streamable) { (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))), @@ -226,27 +205,7 @@ impl<A: Accessor> CompleteAccessor<A> { Ok((rp, CompleteReader::NeedStreamable(r))) } _ => { - let (offset, size) = match (range.offset(), range.size()) { - (Some(offset), _) => (offset, content_length), - (None, None) => (0, content_length), - (None, Some(size)) => { - // TODO: we can read content range to calculate - // the total content length. - let om = self - .inner - .blocking_stat(path, OpStat::new())? - .into_metadata(); - let total_size = om.content_length(); - let (offset, size) = if size > total_size { - (0, total_size) - } else { - (total_size - size, size) - }; - - (offset, size) - } - }; - let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size); + let r = oio::into_seekable_read_by_range(self.inner.clone(), path, args); if streamable { Ok((rp, CompleteReader::NeedSeekable(r))) diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs b/core/src/raw/oio/read/into_seekable_read_by_range.rs index 5b0c28823..2cfb0c2de 100644 --- a/core/src/raw/oio/read/into_seekable_read_by_range.rs +++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs @@ -48,9 +48,15 @@ pub fn into_seekable_read_by_range<A: Accessor, R>( path: &str, op: OpRead, ) -> ByRangeSeekableReader<A, R> { + // Normalize range like `..` into `0..` to make sure offset is valid. + let (offset, size) = match (op.range().offset(), op.range().size()) { + (None, None) => (Some(0), None), + v => v, + }; + ByRangeSeekableReader { acc, - path: path.to_string(), + path: Arc::new(path.to_string()), op, offset, @@ -64,11 +70,11 @@ pub fn into_seekable_read_by_range<A: Accessor, R>( /// ByRangeReader that can do seek on non-seekable reader. pub struct ByRangeSeekableReader<A: Accessor, R> { acc: Arc<A>, - path: String, + path: Arc<String>, op: OpRead, - offset: u64, - size: u64, + offset: Option<u64>, + size: Option<u64>, cur: u64, state: State<R>, @@ -82,9 +88,9 @@ pub struct ByRangeSeekableReader<A: Accessor, R> { enum State<R> { Idle, - Stating(BoxFuture<'static, Result<RpStat>>), - Sending(BoxFuture<'static, Result<(RpRead, R)>>), - Reading(R), + SendStat(BoxFuture<'static, Result<RpStat>>), + SendRead(BoxFuture<'static, Result<(RpRead, R)>>), + Read(R), } /// Safety: State will only be accessed under &mut. @@ -94,30 +100,49 @@ impl<A, R> ByRangeSeekableReader<A, R> where A: Accessor, { - /// calculate the seek position. - /// - /// This operation will not update the `self.cur`. - fn seek_pos(&self, pos: SeekFrom) -> Result<u64> { - if let Some(last_pos) = self.last_seek_pos { - return Ok(last_pos); - } + /// Fill current reader's range by total_size. + fn fill_range(&mut self, total_size: u64) -> Result<()> { + (self.offset, self.size) = match (self.offset, self.size) { + (None, Some(size)) => { + if size > total_size { + return Err(Error::new( + ErrorKind::InvalidInput, + "read to a negative or overflowing position is invalid", + )); + } - let (base, amt) = match pos { - SeekFrom::Start(n) => (0, n as i64), - SeekFrom::End(n) => (self.size as i64, n), - SeekFrom::Current(n) => (self.cur as i64, n), - }; + (Some(total_size - size), Some(size)) + } + (Some(offset), None) => { + // It's valid for reader to seek to a position that out of the content length. + // We should return `Ok(0)` instead of an error at this case to align fs behavior. + let size = total_size.checked_sub(offset).unwrap_or_default(); - let n = match base.checked_add(amt) { - Some(n) if n >= 0 => n as u64, - _ => { - return Err(Error::new( - ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )) + (Some(offset), Some(size)) + } + (Some(offset), Some(size)) => (Some(offset), Some(size)), + (None, None) => { + unreachable!("fill_range should not reach this case after normalization") } }; - Ok(n) + + Ok(()) + } + + /// Calculate the current range, maybe sent as next read request. + /// + /// # Panics + /// + /// Offset must be normalized before calling this function. + /// + /// - `..` should be transformed into `0..` + /// - `..size` should be transformed into `(total-size)..total`. + fn calculate_range(&self) -> BytesRange { + let offset = self + .offset + .expect("offset must be set before calculating range"); + + BytesRange::new(Some(offset + self.cur), self.size.map(|v| v - self.cur)) } } @@ -129,13 +154,37 @@ where fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> { let acc = self.acc.clone(); let path = self.path.clone(); - let op = OpRead::default().with_range(BytesRange::new( - Some(self.offset + self.cur), - Some(self.size - self.cur), - )); + + let mut op = self.op.clone(); + // cur != 0 means we have read some data out, we should convert + // the op into deterministic to avoid ETag changes. + if self.cur != 0 { + op = op.into_deterministic(); + } + // Alter OpRead with correct calculated range. + op = op.with_range(self.calculate_range()); Box::pin(async move { acc.read(&path, op).await }) } + + fn stat_future(&self) -> BoxFuture<'static, Result<RpStat>> { + let acc = self.acc.clone(); + let path = self.path.clone(); + + // Handle if-match and if-none-match correctly. + let mut args = OpStat::default(); + // TODO: stat should support range to check if ETag matches. + if self.op.range().is_full() { + if let Some(v) = self.op.if_match() { + args = args.with_if_match(v); + } + if let Some(v) = self.op.if_none_match() { + args = args.with_if_none_match(v); + } + } + + Box::pin(async move { acc.stat(&path, args).await }) + } } impl<A, R> ByRangeSeekableReader<A, R> @@ -146,13 +195,37 @@ where fn read_action(&self) -> Result<(RpRead, R)> { let acc = self.acc.clone(); let path = self.path.clone(); - let op = OpRead::default().with_range(BytesRange::new( - Some(self.offset + self.cur), - Some(self.size - self.cur), - )); + + let mut op = self.op.clone(); + // cur != 0 means we have read some data out, we should convert + // the op into deterministic to avoid ETag changes. + if self.cur != 0 { + op = op.into_deterministic(); + } + // Alter OpRead with correct calculated range. + op = op.with_range(self.calculate_range()); acc.blocking_read(&path, op) } + + fn stat_action(&self) -> Result<RpStat> { + let acc = self.acc.clone(); + let path = self.path.clone(); + + // Handle if-match and if-none-match correctly. + let mut args = OpStat::default(); + // TODO: stat should support range to check if ETag matches. + if self.op.range().is_full() { + if let Some(v) = self.op.if_match() { + args = args.with_if_match(v); + } + if let Some(v) = self.op.if_none_match() { + args = args.with_if_none_match(v); + } + } + + acc.blocking_stat(&path, args) + } } impl<A, R> oio::Read for ByRangeSeekableReader<A, R> @@ -163,17 +236,41 @@ where fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { match &mut self.state { State::Idle => { - if self.cur >= self.size { + // Sanity check for normal cases. + if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) { return Poll::Ready(Ok(0)); } - self.state = State::Sending(self.read_future()); + self.state = if self.offset.is_none() { + // Offset is none means we are doing tailing reading. + // we should stat first to get the correct offset. + State::SendStat(self.stat_future()) + } else { + State::SendRead(self.read_future()) + }; + + self.poll_read(cx, buf) + } + State::SendStat(fut) => { + let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If stat future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + + let length = rp.into_metadata().content_length(); + self.fill_range(length).map_err(|err| { + // If stat future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + + self.state = State::Idle; self.poll_read(cx, buf) } - State::Sending(fut) => { - // TODO - // - // we can use RpRead returned here to correct size. + State::SendRead(fut) => { let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { // If read future returns an error, we should reset // state to Idle so that we can retry it. @@ -181,10 +278,10 @@ where err })?; - self.state = State::Reading(r); + self.state = State::Read(r); self.poll_read(cx, buf) } - State::Reading(r) => match ready!(Pin::new(r).poll_read(cx, buf)) { + State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) { Ok(0) => { // Reset state to Idle after all data has been consumed. self.state = State::Idle; @@ -202,35 +299,65 @@ where } } - fn poll_seek(&mut self, _: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { - let seek_pos = self.seek_pos(pos)?; - self.last_seek_pos = Some(seek_pos); - + fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<Result<u64>> { match &mut self.state { State::Idle => { + let (base, amt) = match pos { + SeekFrom::Start(n) => (0, n as i64), + SeekFrom::End(n) => { + if let Some(size) = self.size { + (size as i64, n) + } else { + self.state = State::SendStat(self.stat_future()); + return self.poll_seek(cx, pos); + } + } + SeekFrom::Current(n) => (self.cur as i64, n), + }; + + let seek_pos = match base.checked_add(amt) { + Some(n) if n >= 0 => n as u64, + _ => { + return Poll::Ready(Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + ))) + } + }; + self.cur = seek_pos; - self.last_seek_pos = None; Poll::Ready(Ok(self.cur)) } - State::Sending(_) => { + State::SendStat(fut) => { + let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If stat future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + + let length = rp.into_metadata().content_length(); + self.fill_range(length)?; + + self.state = State::Idle; + self.poll_seek(cx, pos) + } + State::SendRead(_) => { // It's impossible for us to go into this state while // poll_seek. We can just drop this future and check state. self.state = State::Idle; - - self.cur = seek_pos; - self.last_seek_pos = None; - Poll::Ready(Ok(self.cur)) + self.poll_seek(cx, pos) } - State::Reading(_) => { - if seek_pos == self.cur { - self.last_seek_pos = None; + State::Read(_) => { + // There is an optimization here that we can calculate if users trying to seek + // the same position, for example, `reader.seek(SeekFrom::Current(0))`. + // In this case, we can just return current position without dropping reader. + if pos == SeekFrom::Current(0) || pos == SeekFrom::Start(self.cur) { return Poll::Ready(Ok(self.cur)); } self.state = State::Idle; - self.cur = seek_pos; - self.last_seek_pos = None; - Poll::Ready(Ok(self.cur)) + self.poll_seek(cx, pos) } } } @@ -238,17 +365,36 @@ where fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { match &mut self.state { State::Idle => { - if self.cur >= self.size { + // Sanity check for normal cases. + if self.cur > self.size.unwrap_or(u64::MAX) { return Poll::Ready(None); } - self.state = State::Sending(self.read_future()); + self.state = if self.offset.is_none() { + // Offset is none means we are doing tailing reading. + // we should stat first to get the correct offset. + State::SendStat(self.stat_future()) + } else { + State::SendRead(self.read_future()) + }; + self.poll_next(cx) } - State::Sending(fut) => { - // TODO - // - // we can use RpRead returned here to correct size. + State::SendStat(fut) => { + let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| { + // If stat future returns an error, we should reset + // state to Idle so that we can retry it. + self.state = State::Idle; + err + })?; + + let length = rp.into_metadata().content_length(); + self.fill_range(length)?; + + self.state = State::Idle; + self.poll_next(cx) + } + State::SendRead(fut) => { let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| { // If read future returns an error, we should reset // state to Idle so that we can retry it. @@ -256,10 +402,10 @@ where err })?; - self.state = State::Reading(r); + self.state = State::Read(r); self.poll_next(cx) } - State::Reading(r) => match ready!(Pin::new(r).poll_next(cx)) { + State::Read(r) => match ready!(Pin::new(r).poll_next(cx)) { Some(Ok(bs)) => { self.cur += bs.len() as u64; Poll::Ready(Some(Ok(bs))) @@ -285,15 +431,25 @@ where fn read(&mut self, buf: &mut [u8]) -> Result<usize> { match &mut self.state { State::Idle => { - if self.cur >= self.size { + // Sanity check for normal cases. + if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) { return Ok(0); } + // Offset is none means we are doing tailing reading. + // we should stat first to get the correct offset. + if self.offset.is_none() { + let rp = self.stat_action()?; + + let length = rp.into_metadata().content_length(); + self.fill_range(length)?; + } + let (_, r) = self.read_action()?; - self.state = State::Reading(r); + self.state = State::Read(r); self.read(buf) } - State::Reading(r) => { + State::Read(r) => { match r.read(buf) { Ok(0) => { // Reset state to Idle after all data has been consumed. @@ -310,31 +466,64 @@ where } } } - State::Sending(_) => { - unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug") + State::SendStat(_) => { + unreachable!("It's invalid to go into State::SendStat for BlockingRead, please report this bug") + } + State::SendRead(_) => { + unreachable!("It's invalid to go into State::SendRead for BlockingRead, please report this bug") } } } fn seek(&mut self, pos: SeekFrom) -> Result<u64> { - let seek_pos = self.seek_pos(pos)?; - match &mut self.state { State::Idle => { + let (base, amt) = match pos { + SeekFrom::Start(n) => (0, n as i64), + SeekFrom::End(n) => { + if let Some(size) = self.size { + (size as i64, n) + } else { + let rp = self.stat_action()?; + let length = rp.into_metadata().content_length(); + self.fill_range(length)?; + + let size = self.size.expect("size must be valid after fill_range"); + (size as i64, n) + } + } + SeekFrom::Current(n) => (self.cur as i64, n), + }; + + let seek_pos = match base.checked_add(amt) { + Some(n) if n >= 0 => n as u64, + _ => { + return Err(Error::new( + ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )); + } + }; + self.cur = seek_pos; Ok(self.cur) } - State::Reading(_) => { - if seek_pos == self.cur { + State::Read(_) => { + // There is an optimization here that we can calculate if users trying to seek + // the same position, for example, `reader.seek(SeekFrom::Current(0))`. + // In this case, we can just return current position without dropping reader. + if pos == SeekFrom::Current(0) || pos == SeekFrom::Start(self.cur) { return Ok(self.cur); } self.state = State::Idle; - self.cur = seek_pos; - Ok(self.cur) + self.seek(pos) + } + State::SendStat(_) => { + unreachable!("It's invalid to go into State::SendStat for BlockingRead, please report this bug") } - State::Sending(_) => { - unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug") + State::SendRead(_) => { + unreachable!("It's invalid to go into State::SendRead for BlockingRead, please report this bug") } } } @@ -342,18 +531,33 @@ where fn next(&mut self) -> Option<Result<Bytes>> { match &mut self.state { State::Idle => { - if self.cur >= self.size { + // Sanity check for normal cases. + if self.cur > self.size.unwrap_or(u64::MAX) { return None; } + // Offset is none means we are doing tailing reading. + // we should stat first to get the correct offset. + if self.offset.is_none() { + let rp = match self.stat_action() { + Ok(rp) => rp, + Err(err) => return Some(Err(err)), + }; + + let length = rp.into_metadata().content_length(); + if let Err(err) = self.fill_range(length) { + return Some(Err(err)); + } + } + let r = match self.read_action() { Ok((_, r)) => r, Err(err) => return Some(Err(err)), }; - self.state = State::Reading(r); + self.state = State::Read(r); self.next() } - State::Reading(r) => match r.next() { + State::Read(r) => match r.next() { Some(Ok(bs)) => { self.cur += bs.len() as u64; Some(Ok(bs)) @@ -367,8 +571,11 @@ where None } }, - State::Sending(_) => { - unreachable!("It's invalid to go into State::Sending for BlockingRead, please report this bug") + State::SendStat(_) => { + unreachable!("It's invalid to go into State::SendStat for BlockingRead, please report this bug") + } + State::SendRead(_) => { + unreachable!("It's invalid to go into State::SendRead for BlockingRead, please report this bug") } } } @@ -483,11 +690,11 @@ mod tests { let (bs, _) = gen_bytes(); let acc = Arc::new(MockReadService::new(bs.clone())); - let r = MockReader { - inner: futures::io::Cursor::new(bs.to_vec()), - }; - let mut r = - Box::new(into_seekable_read_by_range(acc, "x", r, 0, bs.len() as u64)) as oio::Reader; + let mut r = Box::new(into_seekable_read_by_range( + acc, + "x", + OpRead::default().with_range(BytesRange::from(..)), + )) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; @@ -518,10 +725,11 @@ mod tests { let (bs, _) = gen_bytes(); let acc = Arc::new(MockReadService::new(bs.clone())); - let r = MockReader { - inner: futures::io::Cursor::new(bs[4096..4096 + 4096].to_vec()), - }; - let mut r = Box::new(into_seekable_read_by_range(acc, "x", r, 4096, 4096)) as oio::Reader; + let mut r = Box::new(into_seekable_read_by_range( + acc, + "x", + OpRead::default().with_range(BytesRange::from(4096..4096 + 4096)), + )) as oio::Reader; let mut buf = Vec::new(); r.read_to_end(&mut buf).await?; diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 1e60d329e..0eb3937c6 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -274,6 +274,23 @@ impl OpRead { Self::default() } + /// The into_deterministic function transforms the OpRead into a deterministic version. + /// + /// This API is utilized because it allows for internal optimizations such as dividing read + /// ranges or retrying the read request from where it failed. In these scenarios, the expected + /// `ETag` value differs from what users specify in `If-Match` or `If-None-Match`.Therefore, + /// we need to eliminate these conditional headers to ensure that the read operation is + /// deterministic. + /// + /// This API is not intended to be used by users and should never be exposed. + pub(crate) fn into_deterministic(self) -> Self { + Self { + if_match: None, + if_none_match: None, + ..self + } + } + /// Create a new OpRead with range. pub fn with_range(mut self, range: BytesRange) -> Self { self.br = range;
