This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 07be9dc2db9df1fd18a398d23478431c59300af8 Author: Xuanwo <[email protected]> AuthorDate: Thu Oct 26 22:37:08 2023 +0800 Fix 416 not handled Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/http_util/body.rs | 16 +++++++++++++++- core/src/raw/oio/read/range_read.rs | 8 ++++---- core/src/services/azblob/backend.rs | 1 + core/src/services/cos/backend.rs | 1 + core/src/services/gcs/backend.rs | 2 ++ core/src/services/http/backend.rs | 1 + core/src/services/obs/backend.rs | 1 + core/src/services/oss/backend.rs | 1 + core/src/services/s3/backend.rs | 1 + 9 files changed, 27 insertions(+), 5 deletions(-) diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index 474f74897..8b7d5da5a 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -78,6 +78,16 @@ impl IncomingAsyncBody { } } + /// Create an empty IncomingAsyncBody. + pub(crate) fn empty() -> Self { + Self { + inner: Box::new(()), + size: Some(0), + consumed: 0, + chunk: None, + } + } + /// Consume the entire body. pub async fn consume(mut self) -> Result<()> { use oio::ReadExt; @@ -145,7 +155,7 @@ impl IncomingAsyncBody { impl oio::Read for IncomingAsyncBody { fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> { - if buf.is_empty() { + if buf.is_empty() || self.size == Some(0) { return Poll::Ready(Ok(0)); } @@ -179,6 +189,10 @@ impl oio::Read for IncomingAsyncBody { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + if self.size == Some(0) { + return Poll::Ready(None); + } + if let Some(bs) = self.chunk.take() { return Poll::Ready(Some(Ok(bs))); } diff --git a/core/src/raw/oio/read/range_read.rs b/core/src/raw/oio/read/range_read.rs index e6967cea5..205b89a6f 100644 --- a/core/src/raw/oio/read/range_read.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -233,7 +233,7 @@ where match &mut self.state { State::Idle => { // Sanity check for normal cases. - if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) { + if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) { return Poll::Ready(Ok(0)); } @@ -362,7 +362,7 @@ where match &mut self.state { State::Idle => { // Sanity check for normal cases. - if self.cur > self.size.unwrap_or(u64::MAX) { + if self.cur >= self.size.unwrap_or(u64::MAX) { return Poll::Ready(None); } @@ -428,7 +428,7 @@ where match &mut self.state { State::Idle => { // Sanity check for normal cases. - if buf.is_empty() || self.cur > self.size.unwrap_or(u64::MAX) { + if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) { return Ok(0); } @@ -528,7 +528,7 @@ where match &mut self.state { State::Idle => { // Sanity check for normal cases. - if self.cur > self.size.unwrap_or(u64::MAX) { + if self.cur >= self.size.unwrap_or(u64::MAX) { return None; } diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 375a375d8..6e8dd69ee 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -587,6 +587,7 @@ impl Accessor for AzblobBackend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index 5ffe4be23..aef237a1a 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -333,6 +333,7 @@ impl Accessor for CosBackend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index c667920d9..40f7f6a9b 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -390,6 +390,8 @@ impl Accessor for GcsBackend { if resp.status().is_success() { Ok((RpRead::new(), resp.into_body())) + } else if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE { + Ok((RpRead::new(), IncomingAsyncBody::empty())) } else { Err(parse_error(resp).await?) } diff --git a/core/src/services/http/backend.rs b/core/src/services/http/backend.rs index 2877d5aa2..6b2388862 100644 --- a/core/src/services/http/backend.rs +++ b/core/src/services/http/backend.rs @@ -238,6 +238,7 @@ impl Accessor for HttpBackend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 18bcefb52..98e51b9db 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -360,6 +360,7 @@ impl Accessor for ObsBackend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 043993699..12f2d2a25 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -473,6 +473,7 @@ impl Accessor for OssBackend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index e6b862982..eeadf18d7 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -970,6 +970,7 @@ impl Accessor for S3Backend { match status { StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok((RpRead::new(), resp.into_body())), + StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), IncomingAsyncBody::empty())), _ => Err(parse_error(resp).await?), } }
