This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a67f1f1e8 Fix ObjectMeta::size for range requests (#5272) (#5276)
5a67f1f1e8 is described below

commit 5a67f1f1e8fda9329df7edda42f82219a975ddc7
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Jan 3 19:34:16 2024 +0000

    Fix ObjectMeta::size for range requests (#5272) (#5276)
    
    * Fix ObjectMeta::size for range requests (#5272)
    
    * Docs
    
    * Update object_store/src/lib.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Add tests
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 object_store/src/client/get.rs | 243 +++++++++++++++++++++++++++++++++++++----
 object_store/src/lib.rs        |  17 ++-
 2 files changed, 236 insertions(+), 24 deletions(-)

diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 5f9cac9b42..b7e7f24b29 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -15,13 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::ops::Range;
+
 use crate::client::header::{header_meta, HeaderConfig};
 use crate::path::Path;
-use crate::{Error, GetOptions, GetResult};
-use crate::{GetResultPayload, Result};
+use crate::{Error, GetOptions, GetResult, GetResultPayload, Result};
 use async_trait::async_trait;
 use futures::{StreamExt, TryStreamExt};
+use hyper::header::CONTENT_RANGE;
+use hyper::StatusCode;
+use reqwest::header::ToStrError;
 use reqwest::Response;
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
 
 /// A client that can perform a get request
 #[async_trait]
@@ -45,25 +50,221 @@ impl<T: GetClient> GetClientExt for T {
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let range = options.range.clone();
         let response = self.get_request(location, options).await?;
-        let meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG).map_err(|e| {
-            Error::Generic {
-                store: T::STORE,
-                source: Box::new(e),
-            }
-        })?;
-
-        let stream = response
-            .bytes_stream()
-            .map_err(|source| Error::Generic {
-                store: T::STORE,
-                source: Box::new(source),
-            })
-            .boxed();
-
-        Ok(GetResult {
-            range: range.unwrap_or(0..meta.size),
-            payload: GetResultPayload::Stream(stream),
-            meta,
+        get_result::<T>(location, range, response).map_err(|e| 
crate::Error::Generic {
+            store: T::STORE,
+            source: Box::new(e),
+        })
+    }
+}
+
+struct ContentRange {
+    /// The range of the object returned
+    range: Range<usize>,
+    /// The total size of the object being requested
+    size: usize,
+}
+
+impl ContentRange {
+    /// Parse a content range of the form `bytes 
<range-start>-<range-end>/<size>`
+    ///
+    /// 
<https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Range>
+    fn from_str(s: &str) -> Option<Self> {
+        let rem = s.trim().strip_prefix("bytes ")?;
+        let (range, size) = rem.split_once('/')?;
+        let size = size.parse().ok()?;
+
+        let (start_s, end_s) = range.split_once('-')?;
+
+        let start = start_s.parse().ok()?;
+        let end: usize = end_s.parse().ok()?;
+
+        Some(Self {
+            size,
+            range: start..end + 1,
         })
     }
 }
+
+/// A specialized `Error` for get-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum GetResultError {
+    #[snafu(context(false))]
+    Header {
+        source: crate::client::header::Error,
+    },
+
+    #[snafu(display("Received non-partial response when range requested"))]
+    NotPartial,
+
+    #[snafu(display("Content-Range header not present in partial response"))]
+    NoContentRange,
+
+    #[snafu(display("Failed to parse value for CONTENT_RANGE header: 
\"{value}\""))]
+    ParseContentRange { value: String },
+
+    #[snafu(display("Content-Range header contained non UTF-8 characters"))]
+    InvalidContentRange { source: ToStrError },
+
+    #[snafu(display("Requested {expected:?}, got {actual:?}"))]
+    UnexpectedRange {
+        expected: Range<usize>,
+        actual: Range<usize>,
+    },
+}
+
+fn get_result<T: GetClient>(
+    location: &Path,
+    range: Option<Range<usize>>,
+    response: Response,
+) -> Result<GetResult, GetResultError> {
+    let mut meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG)?;
+
+    // ensure that we receive the range we asked for
+    let range = if let Some(expected) = range {
+        ensure!(
+            response.status() == StatusCode::PARTIAL_CONTENT,
+            NotPartialSnafu
+        );
+        let val = response
+            .headers()
+            .get(CONTENT_RANGE)
+            .context(NoContentRangeSnafu)?;
+
+        let value = val.to_str().context(InvalidContentRangeSnafu)?;
+        let value = 
ContentRange::from_str(value).context(ParseContentRangeSnafu { value })?;
+        let actual = value.range;
+
+        ensure!(
+            actual == expected,
+            UnexpectedRangeSnafu { expected, actual }
+        );
+
+        // Update size to reflect full size of object (#5272)
+        meta.size = value.size;
+        actual
+    } else {
+        0..meta.size
+    };
+
+    let stream = response
+        .bytes_stream()
+        .map_err(|source| Error::Generic {
+            store: T::STORE,
+            source: Box::new(source),
+        })
+        .boxed();
+
+    Ok(GetResult {
+        range,
+        meta,
+        payload: GetResultPayload::Stream(stream),
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use hyper::http;
+    use hyper::http::header::*;
+
+    struct TestClient {}
+
+    #[async_trait]
+    impl GetClient for TestClient {
+        const STORE: &'static str = "TEST";
+
+        const HEADER_CONFIG: HeaderConfig = HeaderConfig {
+            etag_required: false,
+            last_modified_required: false,
+            version_header: None,
+        };
+
+        async fn get_request(&self, _: &Path, _: GetOptions) -> 
Result<Response> {
+            unimplemented!()
+        }
+    }
+
+    fn make_response(
+        object_size: usize,
+        range: Option<Range<usize>>,
+        status: StatusCode,
+        content_range: Option<&str>,
+    ) -> Response {
+        let mut builder = http::Response::builder();
+        if let Some(range) = content_range {
+            builder = builder.header(CONTENT_RANGE, range);
+        }
+
+        let body = match range {
+            Some(range) => vec![0_u8; range.end - range.start],
+            None => vec![0_u8; object_size],
+        };
+
+        builder
+            .status(status)
+            .header(CONTENT_LENGTH, object_size)
+            .body(body)
+            .unwrap()
+            .into()
+    }
+
+    #[tokio::test]
+    async fn test_get_result() {
+        let path = Path::from("test");
+
+        let resp = make_response(12, None, StatusCode::OK, None);
+        let res = get_result::<TestClient>(&path, None, resp).unwrap();
+        assert_eq!(res.meta.size, 12);
+        assert_eq!(res.range, 0..12);
+        let bytes = res.bytes().await.unwrap();
+        assert_eq!(bytes.len(), 12);
+
+        let resp = make_response(
+            12,
+            Some(2..3),
+            StatusCode::PARTIAL_CONTENT,
+            Some("bytes 2-2/12"),
+        );
+        let res = get_result::<TestClient>(&path, Some(2..3), resp).unwrap();
+        assert_eq!(res.meta.size, 12);
+        assert_eq!(res.range, 2..3);
+        let bytes = res.bytes().await.unwrap();
+        assert_eq!(bytes.len(), 1);
+
+        let resp = make_response(12, Some(2..3), StatusCode::OK, None);
+        let err = get_result::<TestClient>(&path, Some(2..3), 
resp).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Received non-partial response when range requested"
+        );
+
+        let resp = make_response(
+            12,
+            Some(2..3),
+            StatusCode::PARTIAL_CONTENT,
+            Some("bytes 2-3/12"),
+        );
+        let err = get_result::<TestClient>(&path, Some(2..3), 
resp).unwrap_err();
+        assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
+
+        let resp = make_response(
+            12,
+            Some(2..3),
+            StatusCode::PARTIAL_CONTENT,
+            Some("bytes 2-2/*"),
+        );
+        let err = get_result::<TestClient>(&path, Some(2..3), 
resp).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
+        );
+
+        let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, 
None);
+        let err = get_result::<TestClient>(&path, Some(2..3), 
resp).unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Content-Range header not present in partial response"
+        );
+    }
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 632e949582..b438254bdd 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1303,12 +1303,23 @@ mod tests {
         let range = 3..7;
         let range_result = storage.get_range(&location, range.clone()).await;
 
+        let bytes = range_result.unwrap();
+        assert_eq!(bytes, expected_data.slice(range.clone()));
+
+        let opts = GetOptions {
+            range: Some(2..5),
+            ..Default::default()
+        };
+        let result = storage.get_opts(&location, opts).await.unwrap();
+        // Data is `"arbitrary data"`, length 14 bytes
+        assert_eq!(result.meta.size, 14); // Should return full object size 
(#5272)
+        assert_eq!(result.range, 2..5);
+        let bytes = result.bytes().await.unwrap();
+        assert_eq!(bytes, b"bit".as_ref());
+
         let out_of_range = 200..300;
         let out_of_range_result = storage.get_range(&location, 
out_of_range).await;
 
-        let bytes = range_result.unwrap();
-        assert_eq!(bytes, expected_data.slice(range));
-
         // Should be a non-fatal error
         out_of_range_result.unwrap_err();
 

Reply via email to