This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5a67f1f1e8 Fix ObjectMeta::size for range requests (#5272) (#5276)
5a67f1f1e8 is described below
commit 5a67f1f1e8fda9329df7edda42f82219a975ddc7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Jan 3 19:34:16 2024 +0000
Fix ObjectMeta::size for range requests (#5272) (#5276)
* Fix ObjectMeta::size for range requests (#5272)
* Docs
* Update object_store/src/lib.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Add tests
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
object_store/src/client/get.rs | 243 +++++++++++++++++++++++++++++++++++++----
object_store/src/lib.rs | 17 ++-
2 files changed, 236 insertions(+), 24 deletions(-)
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 5f9cac9b42..b7e7f24b29 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use std::ops::Range;
+
use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
-use crate::{Error, GetOptions, GetResult};
-use crate::{GetResultPayload, Result};
+use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
+use hyper::header::CONTENT_RANGE;
+use hyper::StatusCode;
+use reqwest::header::ToStrError;
use reqwest::Response;
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
/// A client that can perform a get request
#[async_trait]
@@ -45,25 +50,221 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
- let meta = header_meta(location, response.headers(),
T::HEADER_CONFIG).map_err(|e| {
- Error::Generic {
- store: T::STORE,
- source: Box::new(e),
- }
- })?;
-
- let stream = response
- .bytes_stream()
- .map_err(|source| Error::Generic {
- store: T::STORE,
- source: Box::new(source),
- })
- .boxed();
-
- Ok(GetResult {
- range: range.unwrap_or(0..meta.size),
- payload: GetResultPayload::Stream(stream),
- meta,
+ get_result::<T>(location, range, response).map_err(|e|
crate::Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ })
+ }
+}
+
+struct ContentRange {
+ /// The range of the object returned
+ range: Range<usize>,
+ /// The total size of the object being requested
+ size: usize,
+}
+
+impl ContentRange {
+ /// Parse a content range of the form `bytes
<range-start>-<range-end>/<size>`
+ ///
+ ///
<https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
+ fn from_str(s: &str) -> Option<Self> {
+ let rem = s.trim().strip_prefix("bytes ")?;
+ let (range, size) = rem.split_once('/')?;
+ let size = size.parse().ok()?;
+
+ let (start_s, end_s) = range.split_once('-')?;
+
+ let start = start_s.parse().ok()?;
+ let end: usize = end_s.parse().ok()?;
+
+ Some(Self {
+ size,
+ range: start..end + 1,
})
}
}
+
+/// A specialized `Error` for get-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum GetResultError {
+ #[snafu(context(false))]
+ Header {
+ source: crate::client::header::Error,
+ },
+
+ #[snafu(display("Received non-partial response when range requested"))]
+ NotPartial,
+
+ #[snafu(display("Content-Range header not present in partial response"))]
+ NoContentRange,
+
+ #[snafu(display("Failed to parse value for CONTENT_RANGE header:
\"{value}\""))]
+ ParseContentRange { value: String },
+
+ #[snafu(display("Content-Range header contained non UTF-8 characters"))]
+ InvalidContentRange { source: ToStrError },
+
+ #[snafu(display("Requested {expected:?}, got {actual:?}"))]
+ UnexpectedRange {
+ expected: Range<usize>,
+ actual: Range<usize>,
+ },
+}
+
+fn get_result<T: GetClient>(
+ location: &Path,
+ range: Option<Range<usize>>,
+ response: Response,
+) -> Result<GetResult, GetResultError> {
+ let mut meta = header_meta(location, response.headers(),
T::HEADER_CONFIG)?;
+
+ // ensure that we receive the range we asked for
+ let range = if let Some(expected) = range {
+ ensure!(
+ response.status() == StatusCode::PARTIAL_CONTENT,
+ NotPartialSnafu
+ );
+ let val = response
+ .headers()
+ .get(CONTENT_RANGE)
+ .context(NoContentRangeSnafu)?;
+
+ let value = val.to_str().context(InvalidContentRangeSnafu)?;
+ let value =
ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
+ let actual = value.range;
+
+ ensure!(
+ actual == expected,
+ UnexpectedRangeSnafu { expected, actual }
+ );
+
+ // Update size to reflect full size of object (#5272)
+ meta.size = value.size;
+ actual
+ } else {
+ 0..meta.size
+ };
+
+ let stream = response
+ .bytes_stream()
+ .map_err(|source| Error::Generic {
+ store: T::STORE,
+ source: Box::new(source),
+ })
+ .boxed();
+
+ Ok(GetResult {
+ range,
+ meta,
+ payload: GetResultPayload::Stream(stream),
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use hyper::http;
+ use hyper::http::header::*;
+
+ struct TestClient {}
+
+ #[async_trait]
+ impl GetClient for TestClient {
+ const STORE: &'static str = "TEST";
+
+ const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+ etag_required: false,
+ last_modified_required: false,
+ version_header: None,
+ };
+
+ async fn get_request(&self, _: &Path, _: GetOptions) ->
Result<Response> {
+ unimplemented!()
+ }
+ }
+
+ fn make_response(
+ object_size: usize,
+ range: Option<Range<usize>>,
+ status: StatusCode,
+ content_range: Option<&str>,
+ ) -> Response {
+ let mut builder = http::Response::builder();
+ if let Some(range) = content_range {
+ builder = builder.header(CONTENT_RANGE, range);
+ }
+
+ let body = match range {
+ Some(range) => vec![0_u8; range.end - range.start],
+ None => vec![0_u8; object_size],
+ };
+
+ builder
+ .status(status)
+ .header(CONTENT_LENGTH, object_size)
+ .body(body)
+ .unwrap()
+ .into()
+ }
+
+ #[tokio::test]
+ async fn test_get_result() {
+ let path = Path::from("test");
+
+ let resp = make_response(12, None, StatusCode::OK, None);
+ let res = get_result::<TestClient>(&path, None, resp).unwrap();
+ assert_eq!(res.meta.size, 12);
+ assert_eq!(res.range, 0..12);
+ let bytes = res.bytes().await.unwrap();
+ assert_eq!(bytes.len(), 12);
+
+ let resp = make_response(
+ 12,
+ Some(2..3),
+ StatusCode::PARTIAL_CONTENT,
+ Some("bytes 2-2/12"),
+ );
+ let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
+ assert_eq!(res.meta.size, 12);
+ assert_eq!(res.range, 2..3);
+ let bytes = res.bytes().await.unwrap();
+ assert_eq!(bytes.len(), 1);
+
+ let resp = make_response(12, Some(2..3), StatusCode::OK, None);
+ let err = get_result::<TestClient>(&path, Some(2..3),
resp).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Received non-partial response when range requested"
+ );
+
+ let resp = make_response(
+ 12,
+ Some(2..3),
+ StatusCode::PARTIAL_CONTENT,
+ Some("bytes 2-3/12"),
+ );
+ let err = get_result::<TestClient>(&path, Some(2..3),
resp).unwrap_err();
+ assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
+
+ let resp = make_response(
+ 12,
+ Some(2..3),
+ StatusCode::PARTIAL_CONTENT,
+ Some("bytes 2-2/*"),
+ );
+ let err = get_result::<TestClient>(&path, Some(2..3),
resp).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
+ );
+
+ let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT,
None);
+ let err = get_result::<TestClient>(&path, Some(2..3),
resp).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Content-Range header not present in partial response"
+ );
+ }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 632e949582..b438254bdd 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1303,12 +1303,23 @@ mod tests {
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;
+ let bytes = range_result.unwrap();
+ assert_eq!(bytes, expected_data.slice(range.clone()));
+
+ let opts = GetOptions {
+ range: Some(2..5),
+ ..Default::default()
+ };
+ let result = storage.get_opts(&location, opts).await.unwrap();
+ // Data is `"arbitrary data"`, length 14 bytes
+ assert_eq!(result.meta.size, 14); // Should return full object size
(#5272)
+ assert_eq!(result.range, 2..5);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"bit".as_ref());
+
let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location,
out_of_range).await;
- let bytes = range_result.unwrap();
- assert_eq!(bytes, expected_data.slice(range));
-
// Should be a non-fatal error
out_of_range_result.unwrap_err();