This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 535e1208d feat: Add content range support for RpRead (#3777)
535e1208d is described below
commit 535e1208d79ab3e217afed4122037c7789986708
Author: Xuanwo <[email protected]>
AuthorDate: Tue Dec 19 10:45:10 2023 +0800
feat: Add content range support for RpRead (#3777)
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/oio/read/range_read.rs | 92 +++++++++++++++++++++-----------
core/src/raw/rps.rs | 24 ++++++++-
core/src/services/azblob/backend.rs | 11 +++-
core/src/services/azdls/backend.rs | 11 +++-
core/src/services/azfile/backend.rs | 11 +++-
core/src/services/b2/backend.rs | 11 +++-
core/src/services/cos/backend.rs | 11 +++-
core/src/services/dropbox/backend.rs | 5 +-
core/src/services/gdrive/backend.rs | 6 ++-
core/src/services/ghac/backend.rs | 11 +++-
core/src/services/http/backend.rs | 11 +++-
core/src/services/huggingface/backend.rs | 11 +++-
core/src/services/obs/backend.rs | 11 +++-
core/src/services/onedrive/backend.rs | 6 ++-
core/src/services/oss/backend.rs | 11 +++-
core/src/services/s3/backend.rs | 11 +++-
core/src/services/seafile/backend.rs | 6 ++-
core/src/services/swift/backend.rs | 11 +++-
core/src/services/webdav/backend.rs | 11 +++-
core/src/services/webhdfs/backend.rs | 11 +++-
20 files changed, 230 insertions(+), 63 deletions(-)
diff --git a/core/src/raw/oio/read/range_read.rs
b/core/src/raw/oio/read/range_read.rs
index 3759c3dee..b267402d2 100644
--- a/core/src/raw/oio/read/range_read.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -96,11 +96,14 @@ where
}
}
- /// Fill current reader's range by total_size.
- fn fill_range(&mut self, total_size: u64) -> Result<()> {
+ /// Ensure current reader's offset is valid via total_size.
+ fn ensure_offset(&mut self, total_size: u64) -> Result<()> {
(self.offset, self.size) = match (self.offset, self.size) {
(None, Some(size)) => {
if size > total_size {
+ // If returns an error, we should reset
+ // state to Idle so that we can retry it.
+ self.state = State::Idle;
return Err(Error::new(
ErrorKind::InvalidInput,
"read to a negative or overflowing position is
invalid",
@@ -125,6 +128,48 @@ where
Ok(())
}
+ /// Ensure size will use the information returned by RpRead to calculate
the correct size for reader.
+ ///
+ /// - If `RpRead` returns `range`, we can calculate the correct size by
`range.size()`.
+ /// - If `RpRead` returns `size`, we can use it's as the returning body's
size.
+ fn ensure_size(&mut self, total_size: Option<u64>, content_size:
Option<u64>) {
+ if let Some(total_size) = total_size {
+ // It's valid for reader to seek to a position that out of the
content length.
+ // We should return `Ok(0)` instead of an error at this case to
align fs behavior.
+ let size = total_size
+ .checked_sub(self.offset.expect("reader offset must be valid"))
+ .unwrap_or_default();
+
+ // Ensure size when:
+ //
+ // - reader's size is unknown.
+ // - reader's size is larger than file's size.
+ if self.size.is_none() || Some(size) < self.size {
+ self.size = Some(size);
+ return;
+ }
+ }
+
+ if let Some(content_size) = content_size {
+ if content_size == 0 {
+ // Skip size set if content size is 0 since it could be
invalid.
+ //
+ // For example, users seek to `u64::MAX` and calling read.
+ return;
+ }
+
+ let calculated_size = content_size + self.cur;
+
+ // Ensure size when:
+ //
+ // - reader's size is unknown.
+ // - reader's size is larger than file's size.
+ if self.size.is_none() || Some(calculated_size) < self.size {
+ self.size = Some(calculated_size);
+ }
+ }
+ }
+
/// Calculate the current range, maybe sent as next read request.
///
/// # Panics
@@ -256,12 +301,7 @@ where
})?;
let length = rp.into_metadata().content_length();
- self.fill_range(length).map_err(|err| {
- // If stat future returns an error, we should reset
- // state to Idle so that we can retry it.
- self.state = State::Idle;
- err
- })?;
+ self.ensure_offset(length)?;
self.state = State::Idle;
self.poll_read(cx, buf)
@@ -274,12 +314,8 @@ where
err
})?;
- // Set size if read returns size hint.
- if let Some(size) = rp.size() {
- if size != 0 && self.size.is_none() {
- self.size = Some(size + self.cur);
- }
- }
+ self.ensure_size(rp.range().unwrap_or_default().size(),
rp.size());
+
self.state = State::Read(r);
self.poll_read(cx, buf)
}
@@ -339,7 +375,7 @@ where
})?;
let length = rp.into_metadata().content_length();
- self.fill_range(length)?;
+ self.ensure_offset(length)?;
self.state = State::Idle;
self.poll_seek(cx, pos)
@@ -391,7 +427,7 @@ where
})?;
let length = rp.into_metadata().content_length();
- self.fill_range(length)?;
+ self.ensure_offset(length)?;
self.state = State::Idle;
self.poll_next(cx)
@@ -405,11 +441,8 @@ where
})?;
// Set size if read returns size hint.
- if let Some(size) = rp.size() {
- if size != 0 && self.size.is_none() {
- self.size = Some(size + self.cur);
- }
- }
+ self.ensure_size(rp.range().unwrap_or_default().size(),
rp.size());
+
self.state = State::Read(r);
self.poll_next(cx)
}
@@ -450,17 +483,13 @@ where
let rp = self.stat_action()?;
let length = rp.into_metadata().content_length();
- self.fill_range(length)?;
+ self.ensure_offset(length)?;
}
let (rp, r) = self.read_action()?;
// Set size if read returns size hint.
- if let Some(size) = rp.size() {
- if size != 0 && self.size.is_none() {
- self.size = Some(size + self.cur);
- }
- }
+ self.ensure_size(rp.range().unwrap_or_default().size(),
rp.size());
self.state = State::Read(r);
self.read(buf)
@@ -502,7 +531,7 @@ where
} else {
let rp = self.stat_action()?;
let length = rp.into_metadata().content_length();
- self.fill_range(length)?;
+ self.ensure_offset(length)?;
let size = self.size.expect("size must be valid
after fill_range");
(size as i64, n)
@@ -561,13 +590,16 @@ where
};
let length = rp.into_metadata().content_length();
- if let Err(err) = self.fill_range(length) {
+ if let Err(err) = self.ensure_offset(length) {
return Some(Err(err));
}
}
let r = match self.read_action() {
- Ok((_, r)) => r,
+ Ok((rp, r)) => {
+
self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
+ r
+ }
Err(err) => return Some(Err(err)),
};
self.state = State::Read(r);
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index 4cba4b8cc..7df8ee896 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -17,6 +17,7 @@
use http::Request;
+use crate::raw::*;
use crate::*;
/// Reply for `create_dir` operation
@@ -107,6 +108,14 @@ pub struct RpRead {
/// It's ok to leave size as empty, but it's recommended to set size if
possible. We will use
/// this size as hint to do some optimization like avoid an extra stat or
read.
size: Option<u64>,
+ /// Range is the range of the reader returned by this read operation.
+ ///
+ /// - `Some(range)` means the reader's content range inside the whole file.
+ /// - `None` means the reader's content range is unknown.
+ ///
+ /// It's ok to leave range as empty, but it's recommended to set range if
possible. We will use
+ /// this range as hint to do some optimization like avoid an extra stat or
read.
+ range: Option<BytesContentRange>,
}
impl RpRead {
@@ -128,6 +137,20 @@ impl RpRead {
self.size = size;
self
}
+
+ /// Got the range of the reader returned by this read operation.
+ ///
+ /// - `Some(range)` means the reader has content range inside the whole
file.
+ /// - `None` means the reader has unknown size.
+ pub fn range(&self) -> Option<BytesContentRange> {
+ self.range
+ }
+
+ /// Set the range of the reader returned by this read operation.
+ pub fn with_range(mut self, range: Option<BytesContentRange>) -> Self {
+ self.range = range;
+ self
+ }
}
/// Reply for `batch` operation.
@@ -231,7 +254,6 @@ mod tests {
use http::Uri;
use super::*;
- use crate::raw::*;
#[test]
fn test_presigned_request_convert() -> Result<()> {
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index bf77111e0..fdce556bb 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -600,9 +600,16 @@ impl Accessor for AzblobBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/azdls/backend.rs
b/core/src/services/azdls/backend.rs
index 164a2d7f7..77240e72e 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -293,9 +293,16 @@ impl Accessor for AzdlsBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/azfile/backend.rs
b/core/src/services/azfile/backend.rs
index f8b7f9418..92cc4f2ab 100644
--- a/core/src/services/azfile/backend.rs
+++ b/core/src/services/azfile/backend.rs
@@ -316,9 +316,16 @@ impl Accessor for AzfileBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index 4c6c8660d..f85d4aefc 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -330,9 +330,16 @@ impl Accessor for B2Backend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 1abc9c5a6..a31e9818d 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -309,9 +309,16 @@ impl Accessor for CosBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/dropbox/backend.rs
b/core/src/services/dropbox/backend.rs
index 652b89b48..9774deed7 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -102,7 +102,10 @@ impl Accessor for DropboxBackend {
let status = resp.status();
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(),
resp.into_body())),
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
+ }
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index c2e6a874e..eb38e75d5 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -115,7 +115,11 @@ impl Accessor for GdriveBackend {
match status {
StatusCode::OK => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/ghac/backend.rs
b/core/src/services/ghac/backend.rs
index cd5047781..546ca7ec7 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -275,9 +275,16 @@ impl Accessor for GhacBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/http/backend.rs
b/core/src/services/http/backend.rs
index 9b1da6f23..f295333ba 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -259,9 +259,16 @@ impl Accessor for HttpBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/huggingface/backend.rs
b/core/src/services/huggingface/backend.rs
index b9d98867e..0974395ea 100644
--- a/core/src/services/huggingface/backend.rs
+++ b/core/src/services/huggingface/backend.rs
@@ -276,9 +276,16 @@ impl Accessor for HuggingfaceBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index d72cca73c..ce562dd51 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -336,9 +336,16 @@ impl Accessor for ObsBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/onedrive/backend.rs
b/core/src/services/onedrive/backend.rs
index db7ed22c3..f491c9810 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -94,7 +94,11 @@ impl Accessor for OnedriveBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 42a8b9d19..6530453bd 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -458,9 +458,16 @@ impl Accessor for OssBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 4d9522620..6bde0532c 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1039,9 +1039,16 @@ impl Accessor for S3Backend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/seafile/backend.rs
b/core/src/services/seafile/backend.rs
index 63f2f7122..34641c567 100644
--- a/core/src/services/seafile/backend.rs
+++ b/core/src/services/seafile/backend.rs
@@ -298,7 +298,11 @@ impl Accessor for SeafileBackend {
match status {
StatusCode::OK => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
}
_ => Err(parse_error(resp).await?),
}
diff --git a/core/src/services/swift/backend.rs
b/core/src/services/swift/backend.rs
index 6ae2a3b6a..01e123a4d 100644
--- a/core/src/services/swift/backend.rs
+++ b/core/src/services/swift/backend.rs
@@ -250,9 +250,16 @@ impl Accessor for SwiftBackend {
match resp.status() {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index b751cf026..a7e461cb4 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -268,9 +268,16 @@ impl Accessor for WebdavBackend {
match status {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
_ => Err(parse_error(resp).await?),
}
}
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 8a6a9f297..2759c1e6a 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -466,7 +466,11 @@ impl Accessor for WebhdfsBackend {
match resp.status() {
StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
let size = parse_content_length(resp.headers())?;
- Ok((RpRead::new().with_size(size), resp.into_body()))
+ let range = parse_content_range(resp.headers())?;
+ Ok((
+ RpRead::new().with_size(size).with_range(range),
+ resp.into_body(),
+ ))
}
// WebHDFS will returns 403 when range is outside of the end.
StatusCode::FORBIDDEN => {
@@ -479,7 +483,10 @@ impl Accessor for WebhdfsBackend {
Err(parse_error_msg(parts, &s)?)
}
}
- StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
+ StatusCode::RANGE_NOT_SATISFIABLE => {
+ resp.into_body().consume().await?;
+ Ok((RpRead::new().with_size(Some(0)),
IncomingAsyncBody::empty()))
+ }
_ => Err(parse_error(resp).await?),
}
}