This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 535e1208d feat: Add content range support for RpRead (#3777)
535e1208d is described below

commit 535e1208d79ab3e217afed4122037c7789986708
Author: Xuanwo <[email protected]>
AuthorDate: Tue Dec 19 10:45:10 2023 +0800

    feat: Add content range support for RpRead (#3777)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/read/range_read.rs      | 92 +++++++++++++++++++++-----------
 core/src/raw/rps.rs                      | 24 ++++++++-
 core/src/services/azblob/backend.rs      | 11 +++-
 core/src/services/azdls/backend.rs       | 11 +++-
 core/src/services/azfile/backend.rs      | 11 +++-
 core/src/services/b2/backend.rs          | 11 +++-
 core/src/services/cos/backend.rs         | 11 +++-
 core/src/services/dropbox/backend.rs     |  5 +-
 core/src/services/gdrive/backend.rs      |  6 ++-
 core/src/services/ghac/backend.rs        | 11 +++-
 core/src/services/http/backend.rs        | 11 +++-
 core/src/services/huggingface/backend.rs | 11 +++-
 core/src/services/obs/backend.rs         | 11 +++-
 core/src/services/onedrive/backend.rs    |  6 ++-
 core/src/services/oss/backend.rs         | 11 +++-
 core/src/services/s3/backend.rs          | 11 +++-
 core/src/services/seafile/backend.rs     |  6 ++-
 core/src/services/swift/backend.rs       | 11 +++-
 core/src/services/webdav/backend.rs      | 11 +++-
 core/src/services/webhdfs/backend.rs     | 11 +++-
 20 files changed, 230 insertions(+), 63 deletions(-)

diff --git a/core/src/raw/oio/read/range_read.rs 
b/core/src/raw/oio/read/range_read.rs
index 3759c3dee..b267402d2 100644
--- a/core/src/raw/oio/read/range_read.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -96,11 +96,14 @@ where
         }
     }
 
-    /// Fill current reader's range by total_size.
-    fn fill_range(&mut self, total_size: u64) -> Result<()> {
+    /// Ensure current reader's offset is valid via total_size.
+    fn ensure_offset(&mut self, total_size: u64) -> Result<()> {
         (self.offset, self.size) = match (self.offset, self.size) {
             (None, Some(size)) => {
                 if size > total_size {
+                    // If returns an error, we should reset
+                    // state to Idle so that we can retry it.
+                    self.state = State::Idle;
                     return Err(Error::new(
                         ErrorKind::InvalidInput,
                         "read to a negative or overflowing position is 
invalid",
@@ -125,6 +128,48 @@ where
         Ok(())
     }
 
+    /// Ensure size will use the information returned by RpRead to calculate 
the correct size for reader.
+    ///
+    /// - If `RpRead` returns `range`, we can calculate the correct size by 
`range.size()`.
+    /// - If `RpRead` returns `size`, we can use it's as the returning body's 
size.
+    fn ensure_size(&mut self, total_size: Option<u64>, content_size: 
Option<u64>) {
+        if let Some(total_size) = total_size {
+            // It's valid for reader to seek to a position that out of the 
content length.
+            // We should return `Ok(0)` instead of an error at this case to 
align fs behavior.
+            let size = total_size
+                .checked_sub(self.offset.expect("reader offset must be valid"))
+                .unwrap_or_default();
+
+            // Ensure size when:
+            //
+            // - reader's size is unknown.
+            // - reader's size is larger than file's size.
+            if self.size.is_none() || Some(size) < self.size {
+                self.size = Some(size);
+                return;
+            }
+        }
+
+        if let Some(content_size) = content_size {
+            if content_size == 0 {
+                // Skip size set if content size is 0 since it could be 
invalid.
+                //
+                // For example, users seek to `u64::MAX` and calling read.
+                return;
+            }
+
+            let calculated_size = content_size + self.cur;
+
+            // Ensure size when:
+            //
+            // - reader's size is unknown.
+            // - reader's size is larger than file's size.
+            if self.size.is_none() || Some(calculated_size) < self.size {
+                self.size = Some(calculated_size);
+            }
+        }
+    }
+
     /// Calculate the current range, maybe sent as next read request.
     ///
     /// # Panics
@@ -256,12 +301,7 @@ where
                 })?;
 
                 let length = rp.into_metadata().content_length();
-                self.fill_range(length).map_err(|err| {
-                    // If stat future returns an error, we should reset
-                    // state to Idle so that we can retry it.
-                    self.state = State::Idle;
-                    err
-                })?;
+                self.ensure_offset(length)?;
 
                 self.state = State::Idle;
                 self.poll_read(cx, buf)
@@ -274,12 +314,8 @@ where
                     err
                 })?;
 
-                // Set size if read returns size hint.
-                if let Some(size) = rp.size() {
-                    if size != 0 && self.size.is_none() {
-                        self.size = Some(size + self.cur);
-                    }
-                }
+                self.ensure_size(rp.range().unwrap_or_default().size(), 
rp.size());
+
                 self.state = State::Read(r);
                 self.poll_read(cx, buf)
             }
@@ -339,7 +375,7 @@ where
                 })?;
 
                 let length = rp.into_metadata().content_length();
-                self.fill_range(length)?;
+                self.ensure_offset(length)?;
 
                 self.state = State::Idle;
                 self.poll_seek(cx, pos)
@@ -391,7 +427,7 @@ where
                 })?;
 
                 let length = rp.into_metadata().content_length();
-                self.fill_range(length)?;
+                self.ensure_offset(length)?;
 
                 self.state = State::Idle;
                 self.poll_next(cx)
@@ -405,11 +441,8 @@ where
                 })?;
 
                 // Set size if read returns size hint.
-                if let Some(size) = rp.size() {
-                    if size != 0 && self.size.is_none() {
-                        self.size = Some(size + self.cur);
-                    }
-                }
+                self.ensure_size(rp.range().unwrap_or_default().size(), 
rp.size());
+
                 self.state = State::Read(r);
                 self.poll_next(cx)
             }
@@ -450,17 +483,13 @@ where
                     let rp = self.stat_action()?;
 
                     let length = rp.into_metadata().content_length();
-                    self.fill_range(length)?;
+                    self.ensure_offset(length)?;
                 }
 
                 let (rp, r) = self.read_action()?;
 
                 // Set size if read returns size hint.
-                if let Some(size) = rp.size() {
-                    if size != 0 && self.size.is_none() {
-                        self.size = Some(size + self.cur);
-                    }
-                }
+                self.ensure_size(rp.range().unwrap_or_default().size(), 
rp.size());
 
                 self.state = State::Read(r);
                 self.read(buf)
@@ -502,7 +531,7 @@ where
                         } else {
                             let rp = self.stat_action()?;
                             let length = rp.into_metadata().content_length();
-                            self.fill_range(length)?;
+                            self.ensure_offset(length)?;
 
                             let size = self.size.expect("size must be valid 
after fill_range");
                             (size as i64, n)
@@ -561,13 +590,16 @@ where
                     };
 
                     let length = rp.into_metadata().content_length();
-                    if let Err(err) = self.fill_range(length) {
+                    if let Err(err) = self.ensure_offset(length) {
                         return Some(Err(err));
                     }
                 }
 
                 let r = match self.read_action() {
-                    Ok((_, r)) => r,
+                    Ok((rp, r)) => {
+                        
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
+                        r
+                    }
                     Err(err) => return Some(Err(err)),
                 };
                 self.state = State::Read(r);
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index 4cba4b8cc..7df8ee896 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -17,6 +17,7 @@
 
 use http::Request;
 
+use crate::raw::*;
 use crate::*;
 
 /// Reply for `create_dir` operation
@@ -107,6 +108,14 @@ pub struct RpRead {
     /// It's ok to leave size as empty, but it's recommended to set size if 
possible. We will use
     /// this size as hint to do some optimization like avoid an extra stat or 
read.
     size: Option<u64>,
+    /// Range is the range of the reader returned by this read operation.
+    ///
+    /// - `Some(range)` means the reader's content range inside the whole file.
+    /// - `None` means the reader's content range is unknown.
+    ///
+    /// It's ok to leave range as empty, but it's recommended to set range if 
possible. We will use
+    /// this range as hint to do some optimization like avoid an extra stat or 
read.
+    range: Option<BytesContentRange>,
 }
 
 impl RpRead {
@@ -128,6 +137,20 @@ impl RpRead {
         self.size = size;
         self
     }
+
+    /// Got the range of the reader returned by this read operation.
+    ///
+    /// - `Some(range)` means the reader has content range inside the whole 
file.
+    /// - `None` means the reader has unknown size.
+    pub fn range(&self) -> Option<BytesContentRange> {
+        self.range
+    }
+
+    /// Set the range of the reader returned by this read operation.
+    pub fn with_range(mut self, range: Option<BytesContentRange>) -> Self {
+        self.range = range;
+        self
+    }
 }
 
 /// Reply for `batch` operation.
@@ -231,7 +254,6 @@ mod tests {
     use http::Uri;
 
     use super::*;
-    use crate::raw::*;
 
     #[test]
     fn test_presigned_request_convert() -> Result<()> {
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index bf77111e0..fdce556bb 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -600,9 +600,16 @@ impl Accessor for AzblobBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/azdls/backend.rs 
b/core/src/services/azdls/backend.rs
index 164a2d7f7..77240e72e 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -293,9 +293,16 @@ impl Accessor for AzdlsBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/azfile/backend.rs 
b/core/src/services/azfile/backend.rs
index f8b7f9418..92cc4f2ab 100644
--- a/core/src/services/azfile/backend.rs
+++ b/core/src/services/azfile/backend.rs
@@ -316,9 +316,16 @@ impl Accessor for AzfileBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 4c6c8660d..f85d4aefc 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -330,9 +330,16 @@ impl Accessor for B2Backend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 1abc9c5a6..a31e9818d 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -309,9 +309,16 @@ impl Accessor for CosBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 652b89b48..9774deed7 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -102,7 +102,10 @@ impl Accessor for DropboxBackend {
         let status = resp.status();
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), 
resp.into_body())),
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
+            }
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index c2e6a874e..eb38e75d5 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -115,7 +115,11 @@ impl Accessor for GdriveBackend {
         match status {
             StatusCode::OK => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index cd5047781..546ca7ec7 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -275,9 +275,16 @@ impl Accessor for GhacBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/http/backend.rs 
b/core/src/services/http/backend.rs
index 9b1da6f23..f295333ba 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -259,9 +259,16 @@ impl Accessor for HttpBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/huggingface/backend.rs 
b/core/src/services/huggingface/backend.rs
index b9d98867e..0974395ea 100644
--- a/core/src/services/huggingface/backend.rs
+++ b/core/src/services/huggingface/backend.rs
@@ -276,9 +276,16 @@ impl Accessor for HuggingfaceBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index d72cca73c..ce562dd51 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -336,9 +336,16 @@ impl Accessor for ObsBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index db7ed22c3..f491c9810 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -94,7 +94,11 @@ impl Accessor for OnedriveBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 42a8b9d19..6530453bd 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -458,9 +458,16 @@ impl Accessor for OssBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4d9522620..6bde0532c 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1039,9 +1039,16 @@ impl Accessor for S3Backend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/seafile/backend.rs 
b/core/src/services/seafile/backend.rs
index 63f2f7122..34641c567 100644
--- a/core/src/services/seafile/backend.rs
+++ b/core/src/services/seafile/backend.rs
@@ -298,7 +298,11 @@ impl Accessor for SeafileBackend {
         match status {
             StatusCode::OK => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
             }
             _ => Err(parse_error(resp).await?),
         }
diff --git a/core/src/services/swift/backend.rs 
b/core/src/services/swift/backend.rs
index 6ae2a3b6a..01e123a4d 100644
--- a/core/src/services/swift/backend.rs
+++ b/core/src/services/swift/backend.rs
@@ -250,9 +250,16 @@ impl Accessor for SwiftBackend {
         match resp.status() {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index b751cf026..a7e461cb4 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -268,9 +268,16 @@ impl Accessor for WebdavBackend {
         match status {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
             _ => Err(parse_error(resp).await?),
         }
     }
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 8a6a9f297..2759c1e6a 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -466,7 +466,11 @@ impl Accessor for WebhdfsBackend {
         match resp.status() {
             StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
                 let size = parse_content_length(resp.headers())?;
-                Ok((RpRead::new().with_size(size), resp.into_body()))
+                let range = parse_content_range(resp.headers())?;
+                Ok((
+                    RpRead::new().with_size(size).with_range(range),
+                    resp.into_body(),
+                ))
             }
             // WebHDFS will returns 403 when range is outside of the end.
             StatusCode::FORBIDDEN => {
@@ -479,7 +483,10 @@ impl Accessor for WebhdfsBackend {
                     Err(parse_error_msg(parts, &s)?)
                 }
             }
-            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
+            StatusCode::RANGE_NOT_SATISFIABLE => {
+                resp.into_body().consume().await?;
+                Ok((RpRead::new().with_size(Some(0)), 
IncomingAsyncBody::empty()))
+            }
             _ => Err(parse_error(resp).await?),
         }
     }

Reply via email to