This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 3556f64151e833085334389f4e6eae0d56aea034
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 25 17:30:32 2023 +0800

    refactor by range
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs                        |  49 +--
 .../raw/oio/read/into_seekable_read_by_range.rs    | 396 ++++++++++++++++-----
 core/src/raw/ops.rs                                |  17 +
 3 files changed, 323 insertions(+), 139 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 36502fff6..c0fe7b480 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -162,9 +162,7 @@ impl<A: Accessor> CompleteAccessor<A> {
         let seekable = capability.read_can_seek;
         let streamable = capability.read_can_next;
 
-        let range = args.range();
-        let (rp, r) = self.inner.read(path, args).await?;
-        let content_length = rp.metadata().content_length();
+        let (rp, r) = self.inner.read(path, args.clone()).await?;
 
         match (seekable, streamable) {
             (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
@@ -173,24 +171,7 @@ impl<A: Accessor> CompleteAccessor<A> {
                 Ok((rp, CompleteReader::NeedStreamable(r)))
             }
             _ => {
-                let (offset, size) = match (range.offset(), range.size()) {
-                    (Some(offset), _) => (offset, content_length),
-                    (None, None) => (0, content_length),
-                    (None, Some(size)) => {
-                        // TODO: we can read content range to calculate
-                        // the total content length.
-                        let om = self.inner.stat(path, 
OpStat::new()).await?.into_metadata();
-                        let total_size = om.content_length();
-                        let (offset, size) = if size > total_size {
-                            (0, total_size)
-                        } else {
-                            (total_size - size, size)
-                        };
-
-                        (offset, size)
-                    }
-                };
-                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, r, offset, size);
+                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, args);
 
                 if streamable {
                     Ok((rp, CompleteReader::NeedSeekable(r)))
@@ -215,9 +196,7 @@ impl<A: Accessor> CompleteAccessor<A> {
         let seekable = capability.read_can_seek;
         let streamable = capability.read_can_next;
 
-        let range = args.range();
-        let (rp, r) = self.inner.blocking_read(path, args)?;
-        let content_length = rp.metadata().content_length();
+        let (rp, r) = self.inner.blocking_read(path, args.clone())?;
 
         match (seekable, streamable) {
             (true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
@@ -226,27 +205,7 @@ impl<A: Accessor> CompleteAccessor<A> {
                 Ok((rp, CompleteReader::NeedStreamable(r)))
             }
             _ => {
-                let (offset, size) = match (range.offset(), range.size()) {
-                    (Some(offset), _) => (offset, content_length),
-                    (None, None) => (0, content_length),
-                    (None, Some(size)) => {
-                        // TODO: we can read content range to calculate
-                        // the total content length.
-                        let om = self
-                            .inner
-                            .blocking_stat(path, OpStat::new())?
-                            .into_metadata();
-                        let total_size = om.content_length();
-                        let (offset, size) = if size > total_size {
-                            (0, total_size)
-                        } else {
-                            (total_size - size, size)
-                        };
-
-                        (offset, size)
-                    }
-                };
-                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, r, offset, size);
+                let r = oio::into_seekable_read_by_range(self.inner.clone(), 
path, args);
 
                 if streamable {
                     Ok((rp, CompleteReader::NeedSeekable(r)))
diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs 
b/core/src/raw/oio/read/into_seekable_read_by_range.rs
index 5b0c28823..2cfb0c2de 100644
--- a/core/src/raw/oio/read/into_seekable_read_by_range.rs
+++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs
@@ -48,9 +48,15 @@ pub fn into_seekable_read_by_range<A: Accessor, R>(
     path: &str,
     op: OpRead,
 ) -> ByRangeSeekableReader<A, R> {
+    // Normalize range like `..` into `0..` to make sure offset is valid.
+    let (offset, size) = match (op.range().offset(), op.range().size()) {
+        (None, None) => (Some(0), None),
+        v => v,
+    };
+
     ByRangeSeekableReader {
         acc,
-        path: path.to_string(),
+        path: Arc::new(path.to_string()),
         op,
 
         offset,
@@ -64,11 +70,11 @@ pub fn into_seekable_read_by_range<A: Accessor, R>(
 /// ByRangeReader that can do seek on non-seekable reader.
 pub struct ByRangeSeekableReader<A: Accessor, R> {
     acc: Arc<A>,
-    path: String,
+    path: Arc<String>,
     op: OpRead,
 
-    offset: u64,
-    size: u64,
+    offset: Option<u64>,
+    size: Option<u64>,
     cur: u64,
     state: State<R>,
 
@@ -82,9 +88,9 @@ pub struct ByRangeSeekableReader<A: Accessor, R> {
 
 enum State<R> {
     Idle,
-    Stating(BoxFuture<'static, Result<RpStat>>),
-    Sending(BoxFuture<'static, Result<(RpRead, R)>>),
-    Reading(R),
+    SendStat(BoxFuture<'static, Result<RpStat>>),
+    SendRead(BoxFuture<'static, Result<(RpRead, R)>>),
+    Read(R),
 }
 
 /// Safety: State will only be accessed under &mut.
@@ -94,30 +100,49 @@ impl<A, R> ByRangeSeekableReader<A, R>
 where
     A: Accessor,
 {
-    /// calculate the seek position.
-    ///
-    /// This operation will not update the `self.cur`.
-    fn seek_pos(&self, pos: SeekFrom) -> Result<u64> {
-        if let Some(last_pos) = self.last_seek_pos {
-            return Ok(last_pos);
-        }
+    /// Fill current reader's range by total_size.
+    fn fill_range(&mut self, total_size: u64) -> Result<()> {
+        (self.offset, self.size) = match (self.offset, self.size) {
+            (None, Some(size)) => {
+                if size > total_size {
+                    return Err(Error::new(
+                        ErrorKind::InvalidInput,
+                        "read to a negative or overflowing position is 
invalid",
+                    ));
+                }
 
-        let (base, amt) = match pos {
-            SeekFrom::Start(n) => (0, n as i64),
-            SeekFrom::End(n) => (self.size as i64, n),
-            SeekFrom::Current(n) => (self.cur as i64, n),
-        };
+                (Some(total_size - size), Some(size))
+            }
+            (Some(offset), None) => {
+                // It's valid for reader to seek to a position that out of the 
content length.
+                // We should return `Ok(0)` instead of an error at this case 
to align fs behavior.
+                let size = total_size.checked_sub(offset).unwrap_or_default();
 
-        let n = match base.checked_add(amt) {
-            Some(n) if n >= 0 => n as u64,
-            _ => {
-                return Err(Error::new(
-                    ErrorKind::InvalidInput,
-                    "invalid seek to a negative or overflowing position",
-                ))
+                (Some(offset), Some(size))
+            }
+            (Some(offset), Some(size)) => (Some(offset), Some(size)),
+            (None, None) => {
+                unreachable!("fill_range should not reach this case after 
normalization")
             }
         };
-        Ok(n)
+
+        Ok(())
+    }
+
+    /// Calculate the current range, maybe sent as next read request.
+    ///
+    /// # Panics
+    ///
+    /// Offset must be normalized before calling this function.
+    ///
+    /// - `..` should be transformed into `0..`
+    /// - `..size` should be transformed into `(total-size)..total`.
+    fn calculate_range(&self) -> BytesRange {
+        let offset = self
+            .offset
+            .expect("offset must be set before calculating range");
+
+        BytesRange::new(Some(offset + self.cur), self.size.map(|v| v - 
self.cur))
     }
 }
 
@@ -129,13 +154,37 @@ where
     fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
         let acc = self.acc.clone();
         let path = self.path.clone();
-        let op = OpRead::default().with_range(BytesRange::new(
-            Some(self.offset + self.cur),
-            Some(self.size - self.cur),
-        ));
+
+        let mut op = self.op.clone();
+        // cur != 0 means we have read some data out, we should convert
+        // the op into deterministic to avoid ETag changes.
+        if self.cur != 0 {
+            op = op.into_deterministic();
+        }
+        // Alter OpRead with correct calculated range.
+        op = op.with_range(self.calculate_range());
 
         Box::pin(async move { acc.read(&path, op).await })
     }
+
+    fn stat_future(&self) -> BoxFuture<'static, Result<RpStat>> {
+        let acc = self.acc.clone();
+        let path = self.path.clone();
+
+        // Handle if-match and if-none-match correctly.
+        let mut args = OpStat::default();
+        // TODO: stat should support range to check if ETag matches.
+        if self.op.range().is_full() {
+            if let Some(v) = self.op.if_match() {
+                args = args.with_if_match(v);
+            }
+            if let Some(v) = self.op.if_none_match() {
+                args = args.with_if_none_match(v);
+            }
+        }
+
+        Box::pin(async move { acc.stat(&path, args).await })
+    }
 }
 
 impl<A, R> ByRangeSeekableReader<A, R>
@@ -146,13 +195,37 @@ where
     fn read_action(&self) -> Result<(RpRead, R)> {
         let acc = self.acc.clone();
         let path = self.path.clone();
-        let op = OpRead::default().with_range(BytesRange::new(
-            Some(self.offset + self.cur),
-            Some(self.size - self.cur),
-        ));
+
+        let mut op = self.op.clone();
+        // cur != 0 means we have read some data out, we should convert
+        // the op into deterministic to avoid ETag changes.
+        if self.cur != 0 {
+            op = op.into_deterministic();
+        }
+        // Alter OpRead with correct calculated range.
+        op = op.with_range(self.calculate_range());
 
         acc.blocking_read(&path, op)
     }
+
+    fn stat_action(&self) -> Result<RpStat> {
+        let acc = self.acc.clone();
+        let path = self.path.clone();
+
+        // Handle if-match and if-none-match correctly.
+        let mut args = OpStat::default();
+        // TODO: stat should support range to check if ETag matches.
+        if self.op.range().is_full() {
+            if let Some(v) = self.op.if_match() {
+                args = args.with_if_match(v);
+            }
+            if let Some(v) = self.op.if_none_match() {
+                args = args.with_if_none_match(v);
+            }
+        }
+
+        acc.blocking_stat(&path, args)
+    }
 }
 
 impl<A, R> oio::Read for ByRangeSeekableReader<A, R>
@@ -163,17 +236,41 @@ where
     fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
         match &mut self.state {
             State::Idle => {
-                if self.cur >= self.size {
+                // Sanity check for normal cases.
+                if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) {
                     return Poll::Ready(Ok(0));
                 }
 
-                self.state = State::Sending(self.read_future());
+                self.state = if self.offset.is_none() {
+                    // Offset is none means we are doing tailing reading.
+                    // we should stat first to get the correct offset.
+                    State::SendStat(self.stat_future())
+                } else {
+                    State::SendRead(self.read_future())
+                };
+
+                self.poll_read(cx, buf)
+            }
+            State::SendStat(fut) => {
+                let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If stat future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+
+                let length = rp.into_metadata().content_length();
+                self.fill_range(length).map_err(|err| {
+                    // If stat future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+
+                self.state = State::Idle;
                 self.poll_read(cx, buf)
             }
-            State::Sending(fut) => {
-                // TODO
-                //
-                // we can use RpRead returned here to correct size.
+            State::SendRead(fut) => {
                 let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
                     // If read future returns an error, we should reset
                     // state to Idle so that we can retry it.
@@ -181,10 +278,10 @@ where
                     err
                 })?;
 
-                self.state = State::Reading(r);
+                self.state = State::Read(r);
                 self.poll_read(cx, buf)
             }
-            State::Reading(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
+            State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
                 Ok(0) => {
                     // Reset state to Idle after all data has been consumed.
                     self.state = State::Idle;
@@ -202,35 +299,65 @@ where
         }
     }
 
-    fn poll_seek(&mut self, _: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
-        let seek_pos = self.seek_pos(pos)?;
-        self.last_seek_pos = Some(seek_pos);
-
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
         match &mut self.state {
             State::Idle => {
+                let (base, amt) = match pos {
+                    SeekFrom::Start(n) => (0, n as i64),
+                    SeekFrom::End(n) => {
+                        if let Some(size) = self.size {
+                            (size as i64, n)
+                        } else {
+                            self.state = State::SendStat(self.stat_future());
+                            return self.poll_seek(cx, pos);
+                        }
+                    }
+                    SeekFrom::Current(n) => (self.cur as i64, n),
+                };
+
+                let seek_pos = match base.checked_add(amt) {
+                    Some(n) if n >= 0 => n as u64,
+                    _ => {
+                        return Poll::Ready(Err(Error::new(
+                            ErrorKind::InvalidInput,
+                            "invalid seek to a negative or overflowing 
position",
+                        )))
+                    }
+                };
+
                 self.cur = seek_pos;
-                self.last_seek_pos = None;
                 Poll::Ready(Ok(self.cur))
             }
-            State::Sending(_) => {
+            State::SendStat(fut) => {
+                let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If stat future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+
+                let length = rp.into_metadata().content_length();
+                self.fill_range(length)?;
+
+                self.state = State::Idle;
+                self.poll_seek(cx, pos)
+            }
+            State::SendRead(_) => {
                 // It's impossible for us to go into this state while
                 // poll_seek. We can just drop this future and check state.
                 self.state = State::Idle;
-
-                self.cur = seek_pos;
-                self.last_seek_pos = None;
-                Poll::Ready(Ok(self.cur))
+                self.poll_seek(cx, pos)
             }
-            State::Reading(_) => {
-                if seek_pos == self.cur {
-                    self.last_seek_pos = None;
+            State::Read(_) => {
+                // There is an optimization here that we can calculate if 
users trying to seek
+                // the same position, for example, 
`reader.seek(SeekFrom::Current(0))`.
+                // In this case, we can just return current position without 
dropping reader.
+                if pos == SeekFrom::Current(0) || pos == 
SeekFrom::Start(self.cur) {
                     return Poll::Ready(Ok(self.cur));
                 }
 
                 self.state = State::Idle;
-                self.cur = seek_pos;
-                self.last_seek_pos = None;
-                Poll::Ready(Ok(self.cur))
+                self.poll_seek(cx, pos)
             }
         }
     }
@@ -238,17 +365,36 @@ where
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
         match &mut self.state {
             State::Idle => {
-                if self.cur >= self.size {
+                // Sanity check for normal cases.
+                if self.cur > self.size.unwrap_or(u64::MAX) {
                     return Poll::Ready(None);
                 }
 
-                self.state = State::Sending(self.read_future());
+                self.state = if self.offset.is_none() {
+                    // Offset is none means we are doing tailing reading.
+                    // we should stat first to get the correct offset.
+                    State::SendStat(self.stat_future())
+                } else {
+                    State::SendRead(self.read_future())
+                };
+
                 self.poll_next(cx)
             }
-            State::Sending(fut) => {
-                // TODO
-                //
-                // we can use RpRead returned here to correct size.
+            State::SendStat(fut) => {
+                let rp = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
+                    // If stat future returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
+                    err
+                })?;
+
+                let length = rp.into_metadata().content_length();
+                self.fill_range(length)?;
+
+                self.state = State::Idle;
+                self.poll_next(cx)
+            }
+            State::SendRead(fut) => {
                 let (_, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
                     // If read future returns an error, we should reset
                     // state to Idle so that we can retry it.
@@ -256,10 +402,10 @@ where
                     err
                 })?;
 
-                self.state = State::Reading(r);
+                self.state = State::Read(r);
                 self.poll_next(cx)
             }
-            State::Reading(r) => match ready!(Pin::new(r).poll_next(cx)) {
+            State::Read(r) => match ready!(Pin::new(r).poll_next(cx)) {
                 Some(Ok(bs)) => {
                     self.cur += bs.len() as u64;
                     Poll::Ready(Some(Ok(bs)))
@@ -285,15 +431,25 @@ where
     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
         match &mut self.state {
             State::Idle => {
-                if self.cur >= self.size {
+                // Sanity check for normal cases.
+                if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) {
                     return Ok(0);
                 }
 
+                // Offset is none means we are doing tailing reading.
+                // we should stat first to get the correct offset.
+                if self.offset.is_none() {
+                    let rp = self.stat_action()?;
+
+                    let length = rp.into_metadata().content_length();
+                    self.fill_range(length)?;
+                }
+
                 let (_, r) = self.read_action()?;
-                self.state = State::Reading(r);
+                self.state = State::Read(r);
                 self.read(buf)
             }
-            State::Reading(r) => {
+            State::Read(r) => {
                 match r.read(buf) {
                     Ok(0) => {
                         // Reset state to Idle after all data has been 
consumed.
@@ -310,31 +466,64 @@ where
                     }
                 }
             }
-            State::Sending(_) => {
-                unreachable!("It's invalid to go into State::Sending for 
BlockingRead, please report this bug")
+            State::SendStat(_) => {
+                unreachable!("It's invalid to go into State::SendStat for 
BlockingRead, please report this bug")
+            }
+            State::SendRead(_) => {
+                unreachable!("It's invalid to go into State::SendRead for 
BlockingRead, please report this bug")
             }
         }
     }
 
     fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
-        let seek_pos = self.seek_pos(pos)?;
-
         match &mut self.state {
             State::Idle => {
+                let (base, amt) = match pos {
+                    SeekFrom::Start(n) => (0, n as i64),
+                    SeekFrom::End(n) => {
+                        if let Some(size) = self.size {
+                            (size as i64, n)
+                        } else {
+                            let rp = self.stat_action()?;
+                            let length = rp.into_metadata().content_length();
+                            self.fill_range(length)?;
+
+                            let size = self.size.expect("size must be valid 
after fill_range");
+                            (size as i64, n)
+                        }
+                    }
+                    SeekFrom::Current(n) => (self.cur as i64, n),
+                };
+
+                let seek_pos = match base.checked_add(amt) {
+                    Some(n) if n >= 0 => n as u64,
+                    _ => {
+                        return Err(Error::new(
+                            ErrorKind::InvalidInput,
+                            "invalid seek to a negative or overflowing 
position",
+                        ));
+                    }
+                };
+
                 self.cur = seek_pos;
                 Ok(self.cur)
             }
-            State::Reading(_) => {
-                if seek_pos == self.cur {
+            State::Read(_) => {
+                // There is an optimization here that we can calculate if 
users trying to seek
+                // the same position, for example, 
`reader.seek(SeekFrom::Current(0))`.
+                // In this case, we can just return current position without 
dropping reader.
+                if pos == SeekFrom::Current(0) || pos == 
SeekFrom::Start(self.cur) {
                     return Ok(self.cur);
                 }
 
                 self.state = State::Idle;
-                self.cur = seek_pos;
-                Ok(self.cur)
+                self.seek(pos)
+            }
+            State::SendStat(_) => {
+                unreachable!("It's invalid to go into State::SendStat for 
BlockingRead, please report this bug")
             }
-            State::Sending(_) => {
-                unreachable!("It's invalid to go into State::Sending for 
BlockingRead, please report this bug")
+            State::SendRead(_) => {
+                unreachable!("It's invalid to go into State::SendRead for 
BlockingRead, please report this bug")
             }
         }
     }
@@ -342,18 +531,33 @@ where
     fn next(&mut self) -> Option<Result<Bytes>> {
         match &mut self.state {
             State::Idle => {
-                if self.cur >= self.size {
+                // Sanity check for normal cases.
+                if self.cur > self.size.unwrap_or(u64::MAX) {
                     return None;
                 }
 
+                // Offset is none means we are doing tailing reading.
+                // we should stat first to get the correct offset.
+                if self.offset.is_none() {
+                    let rp = match self.stat_action() {
+                        Ok(rp) => rp,
+                        Err(err) => return Some(Err(err)),
+                    };
+
+                    let length = rp.into_metadata().content_length();
+                    if let Err(err) = self.fill_range(length) {
+                        return Some(Err(err));
+                    }
+                }
+
                 let r = match self.read_action() {
                     Ok((_, r)) => r,
                     Err(err) => return Some(Err(err)),
                 };
-                self.state = State::Reading(r);
+                self.state = State::Read(r);
                 self.next()
             }
-            State::Reading(r) => match r.next() {
+            State::Read(r) => match r.next() {
                 Some(Ok(bs)) => {
                     self.cur += bs.len() as u64;
                     Some(Ok(bs))
@@ -367,8 +571,11 @@ where
                     None
                 }
             },
-            State::Sending(_) => {
-                unreachable!("It's invalid to go into State::Sending for 
BlockingRead, please report this bug")
+            State::SendStat(_) => {
+                unreachable!("It's invalid to go into State::SendStat for 
BlockingRead, please report this bug")
+            }
+            State::SendRead(_) => {
+                unreachable!("It's invalid to go into State::SendRead for 
BlockingRead, please report this bug")
             }
         }
     }
@@ -483,11 +690,11 @@ mod tests {
         let (bs, _) = gen_bytes();
         let acc = Arc::new(MockReadService::new(bs.clone()));
 
-        let r = MockReader {
-            inner: futures::io::Cursor::new(bs.to_vec()),
-        };
-        let mut r =
-            Box::new(into_seekable_read_by_range(acc, "x", r, 0, bs.len() as 
u64)) as oio::Reader;
+        let mut r = Box::new(into_seekable_read_by_range(
+            acc,
+            "x",
+            OpRead::default().with_range(BytesRange::from(..)),
+        )) as oio::Reader;
 
         let mut buf = Vec::new();
         r.read_to_end(&mut buf).await?;
@@ -518,10 +725,11 @@ mod tests {
         let (bs, _) = gen_bytes();
         let acc = Arc::new(MockReadService::new(bs.clone()));
 
-        let r = MockReader {
-            inner: futures::io::Cursor::new(bs[4096..4096 + 4096].to_vec()),
-        };
-        let mut r = Box::new(into_seekable_read_by_range(acc, "x", r, 4096, 
4096)) as oio::Reader;
+        let mut r = Box::new(into_seekable_read_by_range(
+            acc,
+            "x",
+            OpRead::default().with_range(BytesRange::from(4096..4096 + 4096)),
+        )) as oio::Reader;
 
         let mut buf = Vec::new();
         r.read_to_end(&mut buf).await?;
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 1e60d329e..0eb3937c6 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -274,6 +274,23 @@ impl OpRead {
         Self::default()
     }
 
+    /// The into_deterministic function transforms the OpRead into a 
deterministic version.
+    ///
+    /// This API is utilized because it allows for internal optimizations such 
as dividing read
+    /// ranges or retrying the read request from where it failed. In these 
scenarios, the expected
+    /// `ETag` value differs from what users specify in `If-Match` or 
`If-None-Match`.Therefore,
+    /// we need to eliminate these conditional headers to ensure that the read 
operation is
+    /// deterministic.
+    ///
+    /// This API is not intended to be used by users and should never be 
exposed.
+    pub(crate) fn into_deterministic(self) -> Self {
+        Self {
+            if_match: None,
+            if_none_match: None,
+            ..self
+        }
+    }
+
     /// Create a new OpRead with range.
     pub fn with_range(mut self, range: BytesRange) -> Self {
         self.br = range;

Reply via email to