This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a0504b  Retry streaming get requests (#15) (#383)
7a0504b is described below

commit 7a0504b4924fcecee17d768fd7190b8f71b0877f
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu May 29 09:27:49 2025 +0100

    Retry streaming get requests (#15) (#383)
    
    * Retry streaming get requests (#15)
    
    * Add tests
    
    * Fix size hint
    
    * Fix wasm32
---
 src/aws/client.rs         |  16 +-
 src/aws/dynamo.rs         |   9 +-
 src/aws/mod.rs            |  11 +-
 src/azure/client.rs       |  18 +-
 src/client/get.rs         | 595 +++++++++++++++++++++++++++++++++-------------
 src/client/http/body.rs   |   8 +
 src/client/mock_server.rs |  20 +-
 src/client/retry.rs       | 209 ++++++++--------
 src/gcp/client.rs         |  16 +-
 src/http/client.rs        |  16 +-
 10 files changed, 633 insertions(+), 285 deletions(-)

diff --git a/src/aws/client.rs b/src/aws/client.rs
index a1fb463..ce5a091 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -27,7 +27,7 @@ use crate::client::get::GetClient;
 use crate::client::header::{get_etag, HeaderConfig};
 use crate::client::header::{get_put_result, get_version};
 use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
 use crate::client::s3::{
     CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
     InitiateMultipartUploadResult, ListResponse, PartMetadata,
@@ -837,8 +837,17 @@ impl GetClient for S3Client {
         user_defined_metadata_prefix: 
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
     };
 
+    fn retry_config(&self) -> &RetryConfig {
+        &self.config.retry_config
+    }
+
     /// Make an S3 GET request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
-    async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<HttpResponse> {
+    async fn get_request(
+        &self,
+        ctx: &mut RetryContext,
+        path: &Path,
+        options: GetOptions,
+    ) -> Result<HttpResponse> {
         let credential = self.config.get_session_credential().await?;
         let url = self.config.path_url(path);
         let method = match options.head {
@@ -863,7 +872,8 @@ impl GetClient for S3Client {
         let response = builder
             .with_get_options(options)
             .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry(&self.config.retry_config)
+            .retryable_request()
+            .send(ctx)
             .await
             .map_err(|e| e.error(STORE, path.to_string()))?;
 
diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs
index 2823860..a6775ef 100644
--- a/src/aws/dynamo.rs
+++ b/src/aws/dynamo.rs
@@ -20,6 +20,7 @@
 use std::borrow::Cow;
 use std::collections::HashMap;
 use std::future::Future;
+use std::sync::Arc;
 use std::time::{Duration, Instant};
 
 use chrono::Utc;
@@ -181,7 +182,7 @@ impl DynamoCommit {
 
     pub(crate) async fn copy_if_not_exists(
         &self,
-        client: &S3Client,
+        client: &Arc<S3Client>,
         from: &Path,
         to: &Path,
     ) -> Result<()> {
@@ -195,7 +196,7 @@ impl DynamoCommit {
     #[allow(clippy::future_not_send)] // Generics confound this lint
     pub(crate) async fn conditional_op<F, Fut, T>(
         &self,
-        client: &S3Client,
+        client: &Arc<S3Client>,
         to: &Path,
         etag: Option<&str>,
         op: F,
@@ -348,7 +349,7 @@ enum TryLockResult {
 }
 
 /// Validates that `path` has the given `etag` or doesn't exist if `None`
-async fn check_precondition(client: &S3Client, path: &Path, etag: 
Option<&str>) -> Result<()> {
+async fn check_precondition(client: &Arc<S3Client>, path: &Path, etag: 
Option<&str>) -> Result<()> {
     let options = GetOptions {
         head: true,
         ..Default::default()
@@ -543,7 +544,7 @@ mod tests {
     ///
     /// This is a function called by s3_test to avoid test concurrency issues
     pub(crate) async fn integration_test(integration: &AmazonS3, d: 
&DynamoCommit) {
-        let client = integration.client.as_ref();
+        let client = &integration.client;
 
         let src = Path::from("dynamo_path_src");
         integration.put(&src, "asd".into()).await.unwrap();
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 83edf84..6a5b849 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -512,6 +512,7 @@ impl PaginatedListStore for AmazonS3 {
 mod tests {
     use super::*;
     use crate::client::get::GetClient;
+    use crate::client::retry::RetryContext;
     use crate::client::SpawnedReqwestConnector;
     use crate::integration::*;
     use crate::tests::*;
@@ -761,11 +762,14 @@ mod tests {
         upload.complete().await.unwrap();
 
         for location in &locations {
+            let mut context = 
RetryContext::new(&store.client.config.retry_config);
+
             let res = store
                 .client
-                .get_request(location, GetOptions::default())
+                .get_request(&mut context, location, GetOptions::default())
                 .await
                 .unwrap();
+
             let headers = res.headers();
             assert_eq!(
                 headers
@@ -817,11 +821,14 @@ mod tests {
 
         // Test get with sse-c.
         for location in &locations {
+            let mut context = 
RetryContext::new(&store.client.config.retry_config);
+
             let res = store
                 .client
-                .get_request(location, GetOptions::default())
+                .get_request(&mut context, location, GetOptions::default())
                 .await
                 .unwrap();
+
             let headers = res.headers();
             assert_eq!(
                 headers
diff --git a/src/azure/client.rs b/src/azure/client.rs
index 329cdd4..428a99b 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -22,7 +22,7 @@ use crate::client::builder::HttpRequestBuilder;
 use crate::client::get::GetClient;
 use crate::client::header::{get_put_result, HeaderConfig};
 use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
 use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpRequest, 
HttpResponse};
 use crate::list::{PaginatedListOptions, PaginatedListResult};
 use crate::multipart::PartId;
@@ -896,10 +896,19 @@ impl GetClient for AzureClient {
         user_defined_metadata_prefix: 
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
     };
 
+    fn retry_config(&self) -> &RetryConfig {
+        &self.config.retry_config
+    }
+
     /// Make an Azure GET request
     /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
     /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
-    async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<HttpResponse> {
+    async fn get_request(
+        &self,
+        ctx: &mut RetryContext,
+        path: &Path,
+        options: GetOptions,
+    ) -> Result<HttpResponse> {
         // As of 2024-01-02, Azure does not support suffix requests,
         // so we should fail fast here rather than sending one
         if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
@@ -929,12 +938,13 @@ impl GetClient for AzureClient {
             .as_deref()
             .map(|c| c.sensitive_request())
             .unwrap_or_default();
+
         let response = builder
             .with_get_options(options)
             .with_azure_authorization(&credential, &self.config.account)
-            .retryable(&self.config.retry_config)
+            .retryable_request()
             .sensitive(sensitive)
-            .send()
+            .send(ctx)
             .await
             .map_err(|source| {
                 let path = path.as_ref().into();
diff --git a/src/client/get.rs b/src/client/get.rs
index 4c65c6d..51d4e1b 100644
--- a/src/client/get.rs
+++ b/src/client/get.rs
@@ -15,20 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ops::Range;
-
-use crate::client::header::{header_meta, HeaderConfig};
-use crate::client::HttpResponse;
+use crate::client::header::{get_etag, header_meta, HeaderConfig};
+use crate::client::retry::RetryContext;
+use crate::client::{HttpResponse, HttpResponseBody};
 use crate::path::Path;
-use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult, 
GetResultPayload, Result};
+use crate::{
+    Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, 
ObjectMeta, Result,
+    RetryConfig,
+};
 use async_trait::async_trait;
-use futures::{StreamExt, TryStreamExt};
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::StreamExt;
 use http::header::{
     CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, 
CONTENT_RANGE,
     CONTENT_TYPE,
 };
 use http::StatusCode;
+use http_body_util::BodyExt;
 use reqwest::header::ToStrError;
+use std::ops::Range;
+use std::sync::Arc;
+use tracing::info;
 
 /// A client that can perform a get request
 #[async_trait]
@@ -38,7 +46,14 @@ pub(crate) trait GetClient: Send + Sync + 'static {
     /// Configure the [`HeaderConfig`] for this client
     const HEADER_CONFIG: HeaderConfig;
 
-    async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<HttpResponse>;
+    fn retry_config(&self) -> &RetryConfig;
+
+    async fn get_request(
+        &self,
+        ctx: &mut RetryContext,
+        path: &Path,
+        options: GetOptions,
+    ) -> Result<HttpResponse>;
 }
 
 /// Extension trait for [`GetClient`] that adds common retrieval functionality
@@ -48,20 +63,16 @@ pub(crate) trait GetClientExt {
 }
 
 #[async_trait]
-impl<T: GetClient> GetClientExt for T {
+impl<T: GetClient> GetClientExt for Arc<T> {
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
-        let range = options.range.clone();
-        if let Some(r) = range.as_ref() {
-            r.is_valid().map_err(|e| crate::Error::Generic {
-                store: T::STORE,
-                source: Box::new(e),
-            })?;
-        }
-        let response = self.get_request(location, options).await?;
-        get_result::<T>(location, range, response).map_err(|e| 
crate::Error::Generic {
-            store: T::STORE,
-            source: Box::new(e),
-        })
+        let ctx = GetContext {
+            location: location.clone(),
+            options,
+            client: Self::clone(self),
+            retry_ctx: RetryContext::new(self.retry_config()),
+        };
+
+        ctx.get_result().await
     }
 }
 
@@ -145,40 +156,132 @@ enum GetResultError {
     },
 }
 
-fn get_result<T: GetClient>(
-    location: &Path,
-    range: Option<GetRange>,
-    response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
-    let mut meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+    client: Arc<T>,
+    location: Path,
+    options: GetOptions,
+    retry_ctx: RetryContext,
+}
 
-    // ensure that we receive the range we asked for
-    let range = if let Some(expected) = range {
-        if response.status() != StatusCode::PARTIAL_CONTENT {
-            return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+    async fn get_result(mut self) -> Result<GetResult> {
+        if let Some(r) = &self.options.range {
+            r.is_valid().map_err(Self::err)?;
         }
 
-        let val = response
-            .headers()
-            .get(CONTENT_RANGE)
-            .ok_or(GetResultError::NoContentRange)?;
+        let request = self
+            .client
+            .get_request(&mut self.retry_ctx, &self.location, 
self.options.clone())
+            .await?;
+
+        let (parts, body) = request.into_parts();
+        let (range, meta) = get_range_meta(
+            T::HEADER_CONFIG,
+            &self.location,
+            self.options.range.as_ref(),
+            &parts,
+        )
+        .map_err(Self::err)?;
+
+        let attributes = get_attributes(T::HEADER_CONFIG, 
&parts.headers).map_err(Self::err)?;
+        let stream = self.retry_stream(body, meta.e_tag.clone(), 
range.clone());
+
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            meta,
+            range,
+            attributes,
+        })
+    }
 
-        let value = val
-            .to_str()
-            .map_err(|source| GetResultError::InvalidContentRange { source })?;
+    fn retry_stream(
+        self,
+        body: HttpResponseBody,
+        etag: Option<String>,
+        range: Range<u64>,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        futures::stream::try_unfold(
+            (self, body, etag, range),
+            |(mut ctx, mut body, etag, mut range)| async move {
+                while let Some(ret) = body.frame().await {
+                    match (ret, &etag) {
+                        (Ok(frame), _) => match frame.into_data() {
+                            Ok(bytes) => {
+                                range.start += bytes.len() as u64;
+                                return Ok(Some((bytes, (ctx, body, etag, 
range))));
+                            }
+                            Err(_) => continue, // Isn't data frame
+                        },
+                        // Retry all response body errors
+                        (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
+                            let sleep = ctx.retry_ctx.backoff();
+                            info!(
+                                "Encountered error while reading response 
body: {}. Retrying in {}s",
+                                e,
+                                sleep.as_secs_f32()
+                            );
+
+                            tokio::time::sleep(sleep).await;
+
+                            let options = GetOptions {
+                                range: Some(GetRange::Bounded(range.clone())),
+                                ..ctx.options.clone()
+                            };
+
+                            // Note: this will potentially retry internally if 
applicable
+                            let request = ctx
+                                .client
+                                .get_request(&mut ctx.retry_ctx, 
&ctx.location, options)
+                                .await
+                                .map_err(Self::err)?;
+
+                            let (parts, retry_body) = request.into_parts();
+                            let retry_etag = 
get_etag(&parts.headers).map_err(Self::err)?;
+
+                            if etag != &retry_etag {
+                                // Return the original error
+                                return Err(Self::err(e));
+                            }
+
+                            body = retry_body;
+                        }
+                        (Err(e), _) => return Err(Self::err(e)),
+                    }
+                }
+                Ok(None)
+            },
+        )
+            .boxed()
+    }
+
+    fn err<E: std::error::Error + Send + Sync + 'static>(e: E) -> crate::Error 
{
+        crate::Error::Generic {
+            store: T::STORE,
+            source: Box::new(e),
+        }
+    }
+}
 
-        let value = ContentRange::from_str(value).ok_or_else(|| {
-            let value = value.into();
-            GetResultError::ParseContentRange { value }
-        })?;
+fn get_range_meta(
+    cfg: HeaderConfig,
+    location: &Path,
+    range: Option<&GetRange>,
+    response: &http::response::Parts,
+) -> Result<(Range<u64>, ObjectMeta), GetResultError> {
+    let mut meta = header_meta(location, &response.headers, cfg)?;
+    let range = if let Some(expected) = range {
+        if response.status != StatusCode::PARTIAL_CONTENT {
+            return Err(GetResultError::NotPartial);
+        }
 
+        let value = parse_range(&response.headers)?;
         let actual = value.range;
 
-        // Update size to reflect full size of object (#5272)
+        // Update size to reflect the full size of the object (#5272)
         meta.size = value.size;
 
         let expected = expected.as_range(meta.size)?;
-
         if actual != expected {
             return Err(GetResultError::UnexpectedRange { expected, actual });
         }
@@ -188,6 +291,30 @@ fn get_result<T: GetClient>(
         0..meta.size
     };
 
+    Ok((range, meta))
+}
+
+/// Extracts the [CONTENT_RANGE] header
+fn parse_range(headers: &http::HeaderMap) -> Result<ContentRange, 
GetResultError> {
+    let val = headers
+        .get(CONTENT_RANGE)
+        .ok_or(GetResultError::NoContentRange)?;
+
+    let value = val
+        .to_str()
+        .map_err(|source| GetResultError::InvalidContentRange { source })?;
+
+    ContentRange::from_str(value).ok_or_else(|| {
+        let value = value.into();
+        GetResultError::ParseContentRange { value }
+    })
+}
+
+/// Extracts [`Attributes`] from the response headers
+fn get_attributes(
+    cfg: HeaderConfig,
+    headers: &http::HeaderMap,
+) -> Result<Attributes, GetResultError> {
     macro_rules! parse_attributes {
         ($headers:expr, $(($header:expr, $attr:expr, $map_err:expr)),*) => {{
             let mut attributes = Attributes::new();
@@ -202,7 +329,7 @@ fn get_result<T: GetClient>(
     }
 
     let mut attributes = parse_attributes!(
-        response.headers(),
+        headers,
         (CACHE_CONTROL, Attribute::CacheControl, |source| {
             GetResultError::InvalidCacheControl { source }
         }),
@@ -223,8 +350,8 @@ fn get_result<T: GetClient>(
     );
 
     // Add attributes that match the user-defined metadata prefix (e.g. 
x-amz-meta-)
-    if let Some(prefix) = T::HEADER_CONFIG.user_defined_metadata_prefix {
-        for (key, val) in response.headers() {
+    if let Some(prefix) = cfg.user_defined_metadata_prefix {
+        for (key, val) in headers {
             if let Some(suffix) = key.as_str().strip_prefix(prefix) {
                 if let Ok(val_str) = val.to_str() {
                     attributes.insert(
@@ -239,22 +366,7 @@ fn get_result<T: GetClient>(
             }
         }
     }
-
-    let stream = response
-        .into_body()
-        .bytes_stream()
-        .map_err(|source| crate::Error::Generic {
-            store: T::STORE,
-            source: Box::new(source),
-        })
-        .boxed();
-
-    Ok(GetResult {
-        range,
-        meta,
-        attributes,
-        payload: GetResultPayload::Stream(stream),
-    })
+    Ok(attributes)
 }
 
 #[cfg(test)]
@@ -262,41 +374,17 @@ mod tests {
     use super::*;
     use http::header::*;
 
-    struct TestClient {}
-
-    #[async_trait]
-    impl GetClient for TestClient {
-        const STORE: &'static str = "TEST";
-
-        const HEADER_CONFIG: HeaderConfig = HeaderConfig {
-            etag_required: false,
-            last_modified_required: false,
-            version_header: None,
-            user_defined_metadata_prefix: Some("x-test-meta-"),
-        };
-
-        async fn get_request(&self, _: &Path, _: GetOptions) -> 
Result<HttpResponse> {
-            unimplemented!()
-        }
-    }
-
     fn make_response(
         object_size: usize,
-        range: Option<Range<usize>>,
         status: StatusCode,
         content_range: Option<&str>,
         headers: Option<Vec<(&str, &str)>>,
-    ) -> HttpResponse {
+    ) -> http::response::Parts {
         let mut builder = http::Response::builder();
         if let Some(range) = content_range {
             builder = builder.header(CONTENT_RANGE, range);
         }
 
-        let body = match range {
-            Some(range) => vec![0_u8; range.end - range.start],
-            None => vec![0_u8; object_size],
-        };
-
         if let Some(headers) = headers {
             for (key, value) in headers {
                 builder = builder.header(key, value);
@@ -306,124 +394,305 @@ mod tests {
         builder
             .status(status)
             .header(CONTENT_LENGTH, object_size)
-            .body(body.into())
+            .body(())
             .unwrap()
+            .into_parts()
+            .0
     }
 
+    const CFG: HeaderConfig = HeaderConfig {
+        etag_required: false,
+        last_modified_required: false,
+        version_header: None,
+        user_defined_metadata_prefix: Some("x-test-meta-"),
+    };
+
     #[tokio::test]
-    async fn test_get_result() {
+    async fn test_get_range_meta() {
         let path = Path::from("test");
 
-        let resp = make_response(12, None, StatusCode::OK, None, None);
-        let res = get_result::<TestClient>(&path, None, resp).unwrap();
-        assert_eq!(res.meta.size, 12);
-        assert_eq!(res.range, 0..12);
-        let bytes = res.bytes().await.unwrap();
-        assert_eq!(bytes.len(), 12);
+        let resp = make_response(12, StatusCode::OK, None, None);
+        let (range, meta) = get_range_meta(CFG, &path, None, &resp).unwrap();
+        assert_eq!(meta.size, 12);
+        assert_eq!(range, 0..12);
 
         let get_range = GetRange::from(2..3);
 
-        let resp = make_response(
-            12,
-            Some(2..3),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-2/12"),
-            None,
-        );
-        let res = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap();
-        assert_eq!(res.meta.size, 12);
-        assert_eq!(res.range, 2..3);
-        let bytes = res.bytes().await.unwrap();
-        assert_eq!(bytes.len(), 1);
-
-        let resp = make_response(12, Some(2..3), StatusCode::OK, None, None);
-        let err = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap_err();
+        let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-2/12"), None);
+        let (range, meta) = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap();
+        assert_eq!(meta.size, 12);
+        assert_eq!(range, 2..3);
+
+        let resp = make_response(12, StatusCode::OK, None, None);
+        let err = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap_err();
         assert_eq!(
             err.to_string(),
             "Received non-partial response when range requested"
         );
 
-        let resp = make_response(
-            12,
-            Some(2..3),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-3/12"),
-            None,
-        );
-        let err = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap_err();
+        let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-3/12"), None);
+        let err = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap_err();
         assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
 
-        let resp = make_response(
-            12,
-            Some(2..3),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-2/*"),
-            None,
-        );
-        let err = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap_err();
+        let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-2/*"), None);
+        let err = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap_err();
         assert_eq!(
             err.to_string(),
             "Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
         );
 
-        let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT, 
None, None);
-        let err = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap_err();
+        let resp = make_response(12, StatusCode::PARTIAL_CONTENT, None, None);
+        let err = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap_err();
         assert_eq!(
             err.to_string(),
             "Content-Range header not present in partial response"
         );
 
-        let resp = make_response(
-            2,
-            Some(2..3),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-3/2"),
-            None,
-        );
-        let err = get_result::<TestClient>(&path, Some(get_range.clone()), 
resp).unwrap_err();
+        let resp = make_response(2, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-3/2"), None);
+        let err = get_range_meta(CFG, &path, Some(&get_range), 
&resp).unwrap_err();
         assert_eq!(
             err.to_string(),
             "Wanted range starting at 2, but object was only 2 bytes long"
         );
 
-        let resp = make_response(
-            6,
-            Some(2..6),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-5/6"),
-            None,
-        );
-        let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), 
resp).unwrap();
-        assert_eq!(res.meta.size, 6);
-        assert_eq!(res.range, 2..6);
-        let bytes = res.bytes().await.unwrap();
-        assert_eq!(bytes.len(), 4);
+        let resp = make_response(6, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-5/6"), None);
+        let (range, meta) = get_range_meta(CFG, &path, 
Some(&GetRange::Suffix(4)), &resp).unwrap();
+        assert_eq!(meta.size, 6);
+        assert_eq!(range, 2..6);
 
-        let resp = make_response(
-            6,
-            Some(2..6),
-            StatusCode::PARTIAL_CONTENT,
-            Some("bytes 2-3/6"),
-            None,
-        );
-        let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)), 
resp).unwrap_err();
+        let resp = make_response(6, StatusCode::PARTIAL_CONTENT, Some("bytes 
2-3/6"), None);
+        let err = get_range_meta(CFG, &path, Some(&GetRange::Suffix(4)), 
&resp).unwrap_err();
         assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
+    }
 
+    #[test]
+    fn test_get_attributes() {
         let resp = make_response(
             12,
-            None,
             StatusCode::OK,
             None,
             Some(vec![("x-test-meta-foo", "bar")]),
         );
-        let res = get_result::<TestClient>(&path, None, resp).unwrap();
-        assert_eq!(res.meta.size, 12);
-        assert_eq!(res.range, 0..12);
+
+        let attributes = get_attributes(CFG, &resp.headers).unwrap();
         assert_eq!(
-            res.attributes.get(&Attribute::Metadata("foo".into())),
+            attributes.get(&Attribute::Metadata("foo".into())),
             Some(&"bar".into())
         );
-        let bytes = res.bytes().await.unwrap();
-        assert_eq!(bytes.len(), 12);
+    }
+}
+#[cfg(all(test, feature = "http", not(target_arch = "wasm32")))]
+mod http_tests {
+    use crate::client::mock_server::MockServer;
+    use crate::client::{HttpError, HttpErrorKind, HttpResponseBody};
+    use crate::http::HttpBuilder;
+    use crate::path::Path;
+    use crate::{ClientOptions, ObjectStore, RetryConfig};
+    use bytes::Bytes;
+    use futures::FutureExt;
+    use http::header::{CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE};
+    use http::{Response, StatusCode};
+    use hyper::body::Frame;
+    use std::pin::Pin;
+    use std::task::{ready, Context, Poll};
+    use std::time::Duration;
+
+    #[derive(Debug, thiserror::Error)]
+    #[error("ChunkedErr")]
+    struct ChunkedErr {}
+
+    /// A Body from a list of results
+    ///
+    /// Sleeps between each frame to avoid the HTTP Server coalescing the 
frames
+    struct Chunked {
+        chunks: std::vec::IntoIter<Result<Bytes, ()>>,
+        sleep: Option<Pin<Box<tokio::time::Sleep>>>,
+    }
+
+    impl Chunked {
+        fn new(v: Vec<Result<Bytes, ()>>) -> Self {
+            Self {
+                chunks: v.into_iter(),
+                sleep: None,
+            }
+        }
+    }
+
+    impl hyper::body::Body for Chunked {
+        type Data = Bytes;
+        type Error = HttpError;
+
+        fn poll_frame(
+            mut self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+        ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+            if let Some(sleep) = &mut self.sleep {
+                ready!(sleep.poll_unpin(cx));
+                self.sleep = None;
+            }
+
+            Poll::Ready(match self.chunks.next() {
+                None => None,
+                Some(Ok(b)) => {
+                    self.sleep = 
Some(Box::pin(tokio::time::sleep(Duration::from_millis(1))));
+                    Some(Ok(Frame::data(b)))
+                }
+                Some(Err(_)) => 
Some(Err(HttpError::new(HttpErrorKind::Unknown, ChunkedErr {}))),
+            })
+        }
+    }
+
+    impl From<Chunked> for HttpResponseBody {
+        fn from(value: Chunked) -> Self {
+            Self::new(value)
+        }
+    }
+
+    #[tokio::test]
+    async fn test_stream_retry() {
+        let mock = MockServer::new().await;
+        let retry = RetryConfig {
+            backoff: Default::default(),
+            max_retries: 3,
+            retry_timeout: Duration::from_secs(1000),
+        };
+
+        let options = ClientOptions::new().with_allow_http(true);
+        let store = HttpBuilder::new()
+            .with_client_options(options)
+            .with_retry(retry)
+            .with_url(mock.url())
+            .build()
+            .unwrap();
+
+        let path = Path::from("test");
+
+        // Test basic
+        let resp = Response::builder()
+            .header(CONTENT_LENGTH, 11)
+            .header(ETAG, "123")
+            .body("Hello World".to_string())
+            .unwrap();
+
+        mock.push(resp);
+
+        let b = store.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(b.as_ref(), b"Hello World");
+
+        // Should retry with range
+        mock.push(
+            Response::builder()
+                .header(CONTENT_LENGTH, 10)
+                .header(ETAG, "123")
+                .body(Chunked::new(vec![
+                    Ok(Bytes::from_static(b"banana")),
+                    Err(()),
+                ]))
+                .unwrap(),
+        );
+
+        mock.push_fn(|req| {
+            assert_eq!(
+                req.headers().get(RANGE).unwrap().to_str().unwrap(),
+                "bytes=6-9"
+            );
+
+            Response::builder()
+                .status(StatusCode::PARTIAL_CONTENT)
+                .header(CONTENT_LENGTH, 3)
+                .header(ETAG, "123")
+                .header(CONTENT_RANGE, "bytes 6-9/10")
+                .body("123".to_string())
+                .unwrap()
+        });
+
+        let ret = store.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(ret.as_ref(), b"banana123");
+
+        // Should retry multiple times
+        mock.push(
+            Response::builder()
+                .header(CONTENT_LENGTH, 20)
+                .header(ETAG, "foo")
+                .body(Chunked::new(vec![
+                    Ok(Bytes::from_static(b"hello")),
+                    Err(()),
+                ]))
+                .unwrap(),
+        );
+
+        mock.push_fn(|req| {
+            assert_eq!(
+                req.headers().get(RANGE).unwrap().to_str().unwrap(),
+                "bytes=5-19"
+            );
+
+            Response::builder()
+                .status(StatusCode::PARTIAL_CONTENT)
+                .header(CONTENT_LENGTH, 15)
+                .header(ETAG, "foo")
+                .header(CONTENT_RANGE, "bytes 5-19/20")
+                .body(Chunked::new(vec![Ok(Bytes::from_static(b"baz")), 
Err(())]))
+                .unwrap()
+        });
+
+        mock.push_fn::<_, String>(|req| {
+            assert_eq!(
+                req.headers().get(RANGE).unwrap().to_str().unwrap(),
+                "bytes=8-19"
+            );
+            Response::builder()
+                .status(StatusCode::BAD_GATEWAY)
+                .body("ignored".to_string())
+                .unwrap()
+        });
+
+        mock.push_fn(|req| {
+            assert_eq!(
+                req.headers().get(RANGE).unwrap().to_str().unwrap(),
+                "bytes=8-19"
+            );
+
+            Response::builder()
+                .status(StatusCode::PARTIAL_CONTENT)
+                .header(CONTENT_LENGTH, 12)
+                .header(ETAG, "foo")
+                .header(CONTENT_RANGE, "bytes 8-19/20")
+                .body("123456789012".to_string())
+                .unwrap()
+        });
+
+        let ret = store.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(ret.as_ref(), b"hellobaz123456789012");
+
+        // Should abort if etag doesn't match
+        mock.push(
+            Response::builder()
+                .header(CONTENT_LENGTH, 12)
+                .header(ETAG, "foo")
+                .body(Chunked::new(vec![Ok(Bytes::from_static(b"test")), 
Err(())]))
+                .unwrap(),
+        );
+
+        mock.push_fn(|req| {
+            assert_eq!(
+                req.headers().get(RANGE).unwrap().to_str().unwrap(),
+                "bytes=4-11"
+            );
+
+            Response::builder()
+                .status(StatusCode::PARTIAL_CONTENT)
+                .header(CONTENT_LENGTH, 7)
+                .header(ETAG, "baz")
+                .header(CONTENT_RANGE, "bytes 4-11/12")
+                .body("1234567".to_string())
+                .unwrap()
+        });
+
+        let err = store.get(&path).await.unwrap().bytes().await.unwrap_err();
+        assert_eq!(
+            err.to_string(),
+            "Generic HTTP error: HTTP error: request or response body error"
+        );
     }
 }
diff --git a/src/client/http/body.rs b/src/client/http/body.rs
index e696904..504bebe 100644
--- a/src/client/http/body.rs
+++ b/src/client/http/body.rs
@@ -213,6 +213,14 @@ impl Body for HttpResponseBody {
     ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
         Pin::new(&mut self.0).poll_frame(cx)
     }
+
+    fn is_end_stream(&self) -> bool {
+        self.0.is_end_stream()
+    }
+
+    fn size_hint(&self) -> SizeHint {
+        self.0.size_hint()
+    }
 }
 
 impl From<Bytes> for HttpResponseBody {
diff --git a/src/client/mock_server.rs b/src/client/mock_server.rs
index 8be4a72..9caf731 100644
--- a/src/client/mock_server.rs
+++ b/src/client/mock_server.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::client::{HttpResponse, HttpResponseBody};
 use futures::future::BoxFuture;
 use futures::FutureExt;
 use hyper::body::Incoming;
@@ -33,7 +34,7 @@ use tokio::sync::oneshot;
 use tokio::task::{JoinHandle, JoinSet};
 
 pub(crate) type ResponseFn =
-    Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>> 
+ Send>;
+    Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, HttpResponse> + 
Send>;
 
 /// A mock server
 pub(crate) struct MockServer {
@@ -76,7 +77,7 @@ impl MockServer {
                                 async move {
                                     Ok::<_, Infallible>(match next {
                                         Some(r) => r(req).await,
-                                        None => Response::new("Hello 
World".to_string()),
+                                        None => HttpResponse::new("Hello 
World".to_string().into()),
                                     })
                                 }
                             }),
@@ -102,16 +103,18 @@ impl MockServer {
     }
 
     /// Add a response
-    pub(crate) fn push(&self, response: Response<String>) {
-        self.push_fn(|_| response)
+    pub(crate) fn push<B: Into<HttpResponseBody>>(&self, response: 
Response<B>) {
+        let resp = response.map(Into::into);
+        self.push_fn(|_| resp)
     }
 
     /// Add a response function
-    pub(crate) fn push_fn<F>(&self, f: F)
+    pub(crate) fn push_fn<F, B>(&self, f: F)
     where
-        F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
+        F: FnOnce(Request<Incoming>) -> Response<B> + Send + 'static,
+        B: Into<HttpResponseBody>,
     {
-        let f = Box::new(|req| async move { f(req) }.boxed());
+        let f = Box::new(|req| async move { f(req).map(Into::into) }.boxed());
         self.responses.lock().push_back(f)
     }
 
@@ -120,7 +123,8 @@ impl MockServer {
         F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
         Fut: Future<Output = Response<String>> + Send + 'static,
     {
-        self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
+        let f = Box::new(|r| f(r).map(|b| b.map(Into::into)).boxed());
+        self.responses.lock().push_back(f)
     }
 
     /// Shutdown the mock server
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 7df4f9e..fb09813 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -67,30 +67,35 @@ impl std::fmt::Display for RetryError {
 }
 
 /// Context of the retry loop
-struct RetryContext {
-    method: Method,
-    uri: Option<Uri>,
+///
+/// Most use-cases should use [`RetryExt`] and [`RetryableRequestBuilder`], 
however,
+/// [`RetryContext`] allows preserving retry state across multiple 
[`RetryableRequest`]
+pub(crate) struct RetryContext {
+    backoff: Backoff,
     retries: usize,
     max_retries: usize,
-    start: Instant,
     retry_timeout: Duration,
+    start: Instant,
 }
 
 impl RetryContext {
-    fn err(self, error: RequestError) -> RetryError {
-        RetryError(Box::new(RetryErrorImpl {
-            uri: self.uri,
-            method: self.method,
-            retries: self.retries,
-            max_retries: self.max_retries,
-            elapsed: self.start.elapsed(),
-            retry_timeout: self.retry_timeout,
-            inner: error,
-        }))
+    pub(crate) fn new(config: &RetryConfig) -> Self {
+        Self {
+            max_retries: config.max_retries,
+            retry_timeout: config.retry_timeout,
+            backoff: Backoff::new(&config.backoff),
+            retries: 0,
+            start: Instant::now(),
+        }
+    }
+
+    pub(crate) fn exhausted(&self) -> bool {
+        self.retries >= self.max_retries || self.start.elapsed() > 
self.retry_timeout
     }
 
-    fn exhausted(&self) -> bool {
-        self.retries == self.max_retries || self.start.elapsed() > 
self.retry_timeout
+    pub(crate) fn backoff(&mut self) -> Duration {
+        self.retries += 1;
+        self.backoff.next()
     }
 }
 
@@ -248,81 +253,89 @@ fn body_contains_error(response_body: &str) -> bool {
     response_body.contains("InternalError") || 
response_body.contains("SlowDown")
 }
 
-pub(crate) struct RetryableRequest {
-    client: HttpClient,
-    request: HttpRequest,
-
-    max_retries: usize,
-    retry_timeout: Duration,
-    backoff: Backoff,
-
-    sensitive: bool,
-    idempotent: Option<bool>,
-    retry_on_conflict: bool,
-    payload: Option<PutPayload>,
-
-    retry_error_body: bool,
+/// Combines a [`RetryableRequest`] with a [`RetryContext`]
+pub(crate) struct RetryableRequestBuilder {
+    request: RetryableRequest,
+    context: RetryContext,
 }
 
-impl RetryableRequest {
+impl RetryableRequestBuilder {
     /// Set whether this request is idempotent
     ///
     /// An idempotent request will be retried on timeout even if the request
     /// method is not 
[safe](https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1)
-    pub(crate) fn idempotent(self, idempotent: bool) -> Self {
-        Self {
-            idempotent: Some(idempotent),
-            ..self
-        }
+    pub(crate) fn idempotent(mut self, idempotent: bool) -> Self {
+        self.request.idempotent = Some(idempotent);
+        self
     }
 
     /// Set whether this request should be retried on a 409 Conflict response.
     #[cfg(feature = "aws")]
-    pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
-        Self {
-            retry_on_conflict,
-            ..self
-        }
+    pub(crate) fn retry_on_conflict(mut self, retry_on_conflict: bool) -> Self 
{
+        self.request.retry_on_conflict = retry_on_conflict;
+        self
     }
 
     /// Set whether this request contains sensitive data
     ///
     /// This will avoid printing out the URL in error messages
     #[allow(unused)]
-    pub(crate) fn sensitive(self, sensitive: bool) -> Self {
-        Self { sensitive, ..self }
+    pub(crate) fn sensitive(mut self, sensitive: bool) -> Self {
+        self.request.sensitive = sensitive;
+        self
     }
 
     /// Provide a [`PutPayload`]
-    pub(crate) fn payload(self, payload: Option<PutPayload>) -> Self {
-        Self { payload, ..self }
+    pub(crate) fn payload(mut self, payload: Option<PutPayload>) -> Self {
+        self.request.payload = payload;
+        self
     }
 
     #[allow(unused)]
-    pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
-        Self {
-            retry_error_body,
-            ..self
-        }
+    pub(crate) fn retry_error_body(mut self, retry_error_body: bool) -> Self {
+        self.request.retry_error_body = retry_error_body;
+        self
     }
 
-    pub(crate) async fn send(self) -> Result<HttpResponse> {
-        let mut ctx = RetryContext {
-            retries: 0,
-            uri: (!self.sensitive).then(|| self.request.uri().clone()),
-            method: self.request.method().clone(),
-            max_retries: self.max_retries,
-            start: Instant::now(),
-            retry_timeout: self.retry_timeout,
-        };
+    pub(crate) async fn send(mut self) -> Result<HttpResponse> {
+        self.request.send(&mut self.context).await
+    }
+}
 
-        let mut backoff = self.backoff;
-        let is_idempotent = self
-            .idempotent
-            .unwrap_or_else(|| self.request.method().is_safe());
+/// A retryable request
+pub(crate) struct RetryableRequest {
+    client: HttpClient,
+    http: HttpRequest,
 
+    sensitive: bool,
+    idempotent: Option<bool>,
+    retry_on_conflict: bool,
+    payload: Option<PutPayload>,
+
+    retry_error_body: bool,
+}
+
+impl RetryableRequest {
+    #[allow(unused)]
+    pub(crate) fn sensitive(self, sensitive: bool) -> Self {
+        Self { sensitive, ..self }
+    }
+
+    fn err(&self, error: RequestError, ctx: &RetryContext) -> RetryError {
+        RetryError(Box::new(RetryErrorImpl {
+            uri: (!self.sensitive).then(|| self.http.uri().clone()),
+            method: self.http.method().clone(),
+            retries: ctx.retries,
+            max_retries: ctx.max_retries,
+            elapsed: ctx.start.elapsed(),
+            retry_timeout: ctx.retry_timeout,
+            inner: error,
+        }))
+    }
+
+    pub(crate) async fn send(self, ctx: &mut RetryContext) -> 
Result<HttpResponse> {
         loop {
-            let mut request = self.request.clone();
+            let mut request = self.http.clone();
 
             if let Some(payload) = &self.payload {
                 *request.body_mut() = payload.clone().into();
@@ -343,7 +356,7 @@ impl RetryableRequest {
                         let (parts, body) = r.into_parts();
                         let body = match body.text().await {
                             Ok(body) => body,
-                            Err(e) => return 
Err(ctx.err(RequestError::Http(e))),
+                            Err(e) => return 
Err(self.err(RequestError::Http(e), ctx)),
                         };
 
                         if !body_contains_error(&body) {
@@ -352,11 +365,10 @@ impl RetryableRequest {
                         } else {
                             // Retry as if this was a 5xx response
                             if ctx.exhausted() {
-                                return Err(ctx.err(RequestError::Response { 
body, status }));
+                                return Err(self.err(RequestError::Response { 
body, status }, ctx));
                             }
 
-                            let sleep = backoff.next();
-                            ctx.retries += 1;
+                            let sleep = ctx.backoff();
                             info!(
                                 "Encountered a response status of {} but body 
contains Error, backing off for {} seconds, retry {} of {}",
                                 status,
@@ -367,15 +379,18 @@ impl RetryableRequest {
                             tokio::time::sleep(sleep).await;
                         }
                     } else if status == StatusCode::NOT_MODIFIED {
-                        return Err(ctx.err(RequestError::Status { status, 
body: None }));
+                        return Err(self.err(RequestError::Status { status, 
body: None }, ctx));
                     } else if status.is_redirection() {
                         let is_bare_redirect = 
!r.headers().contains_key(LOCATION);
                         return match is_bare_redirect {
-                            true => Err(ctx.err(RequestError::BareRedirect)),
-                            false => Err(ctx.err(RequestError::Status {
-                                body: None,
-                                status: r.status(),
-                            })),
+                            true => Err(self.err(RequestError::BareRedirect, 
ctx)),
+                            false => Err(self.err(
+                                RequestError::Status {
+                                    body: None,
+                                    status: r.status(),
+                                },
+                                ctx,
+                            )),
                         };
                     } else {
                         let status = r.status();
@@ -393,11 +408,10 @@ impl RetryableRequest {
                                 },
                                 false => RequestError::Status { status, body: 
None },
                             };
-                            return Err(ctx.err(source));
+                            return Err(self.err(source, ctx));
                         };
 
-                        let sleep = backoff.next();
-                        ctx.retries += 1;
+                        let sleep = ctx.backoff();
                         info!(
                             "Encountered server error, backing off for {} 
seconds, retry {} of {}",
                             sleep.as_secs_f32(),
@@ -408,7 +422,9 @@ impl RetryableRequest {
                     }
                 }
                 Err(e) => {
-                    // let e = sanitize_err(e);
+                    let is_idempotent = self
+                        .idempotent
+                        .unwrap_or_else(|| self.http.method().is_safe());
 
                     let do_retry = match e.kind() {
                         HttpErrorKind::Connect | HttpErrorKind::Request => 
true, // Request not sent, can retry
@@ -416,14 +432,10 @@ impl RetryableRequest {
                         HttpErrorKind::Unknown | HttpErrorKind::Decode => 
false,
                     };
 
-                    if ctx.retries == ctx.max_retries
-                        || ctx.start.elapsed() > ctx.retry_timeout
-                        || !do_retry
-                    {
-                        return Err(ctx.err(RequestError::Http(e)));
+                    if ctx.exhausted() || !do_retry {
+                        return Err(self.err(RequestError::Http(e), ctx));
                     }
-                    let sleep = backoff.next();
-                    ctx.retries += 1;
+                    let sleep = ctx.backoff();
                     info!(
                         "Encountered transport error backing off for {} 
seconds, retry {} of {}: {}",
                         sleep.as_secs_f32(),
@@ -439,8 +451,11 @@ impl RetryableRequest {
 }
 
 pub(crate) trait RetryExt {
+    /// Return a [`RetryableRequestBuilder`]
+    fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder;
+
     /// Return a [`RetryableRequest`]
-    fn retryable(self, config: &RetryConfig) -> RetryableRequest;
+    fn retryable_request(self) -> RetryableRequest;
 
     /// Dispatch a request with the given retry configuration
     ///
@@ -451,16 +466,20 @@ pub(crate) trait RetryExt {
 }
 
 impl RetryExt for HttpRequestBuilder {
-    fn retryable(self, config: &RetryConfig) -> RetryableRequest {
+    fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder {
+        RetryableRequestBuilder {
+            request: self.retryable_request(),
+            context: RetryContext::new(config),
+        }
+    }
+
+    fn retryable_request(self) -> RetryableRequest {
         let (client, request) = self.into_parts();
         let request = request.expect("request must be valid");
 
         RetryableRequest {
             client,
-            request,
-            max_retries: config.max_retries,
-            retry_timeout: config.retry_timeout,
-            backoff: Backoff::new(&config.backoff),
+            http: request,
             idempotent: None,
             payload: None,
             sensitive: false,
@@ -524,7 +543,7 @@ mod tests {
         let r = do_request().await.unwrap();
         assert_eq!(r.status(), StatusCode::OK);
 
-        // Returns client errors immediately with status message
+        // Returns client errors immediately with a status message
         mock.push(
             Response::builder()
                 .status(StatusCode::BAD_REQUEST)
@@ -645,13 +664,13 @@ mod tests {
         );
 
         // Panic results in an incomplete message error in the client
-        mock.push_fn(|_| panic!());
+        mock.push_fn::<_, String>(|_| panic!());
         let r = do_request().await.unwrap();
         assert_eq!(r.status(), StatusCode::OK);
 
         // Gives up after retrying multiple panics
         for _ in 0..=retry.max_retries {
-            mock.push_fn(|_| panic!());
+            mock.push_fn::<_, String>(|_| panic!());
         }
         let e = do_request().await.unwrap_err().to_string();
         assert!(
@@ -710,7 +729,7 @@ mod tests {
         assert!(!err.contains("SENSITIVE"), "{err}");
 
         for _ in 0..=retry.max_retries {
-            mock.push_fn(|_| panic!());
+            mock.push_fn::<_, String>(|_| panic!());
         }
 
         let req = client
diff --git a/src/gcp/client.rs b/src/gcp/client.rs
index f815085..0378843 100644
--- a/src/gcp/client.rs
+++ b/src/gcp/client.rs
@@ -19,7 +19,7 @@ use crate::client::builder::HttpRequestBuilder;
 use crate::client::get::GetClient;
 use crate::client::header::{get_put_result, get_version, HeaderConfig};
 use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
 use crate::client::s3::{
     CompleteMultipartUpload, CompleteMultipartUploadResult, 
InitiateMultipartUploadResult,
     ListResponse,
@@ -617,8 +617,17 @@ impl GetClient for GoogleCloudStorageClient {
         user_defined_metadata_prefix: 
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
     };
 
+    fn retry_config(&self) -> &RetryConfig {
+        &self.config.retry_config
+    }
+
     /// Perform a get request 
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
-    async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<HttpResponse> {
+    async fn get_request(
+        &self,
+        ctx: &mut RetryContext,
+        path: &Path,
+        options: GetOptions,
+    ) -> Result<HttpResponse> {
         let credential = self.get_credential().await?;
         let url = self.object_url(path);
 
@@ -636,7 +645,8 @@ impl GetClient for GoogleCloudStorageClient {
         let response = request
             .with_bearer_auth(credential.as_deref())
             .with_get_options(options)
-            .send_retry(&self.config.retry_config)
+            .retryable_request()
+            .send(ctx)
             .await
             .map_err(|source| {
                 let path = path.as_ref().into();
diff --git a/src/http/client.rs b/src/http/client.rs
index 8a96e74..272f7c6 100644
--- a/src/http/client.rs
+++ b/src/http/client.rs
@@ -18,7 +18,7 @@
 use super::STORE;
 use crate::client::get::GetClient;
 use crate::client::header::HeaderConfig;
-use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::client::retry::{self, RetryConfig, RetryContext, RetryExt};
 use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse};
 use crate::path::{Path, DELIMITER};
 use crate::util::deserialize_rfc1123;
@@ -353,7 +353,16 @@ impl GetClient for Client {
         user_defined_metadata_prefix: None,
     };
 
-    async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<HttpResponse> {
+    fn retry_config(&self) -> &RetryConfig {
+        &self.retry_config
+    }
+
+    async fn get_request(
+        &self,
+        ctx: &mut RetryContext,
+        path: &Path,
+        options: GetOptions,
+    ) -> Result<HttpResponse> {
         let url = self.path_url(path);
         let method = match options.head {
             true => Method::HEAD,
@@ -364,7 +373,8 @@ impl GetClient for Client {
 
         let res = builder
             .with_get_options(options)
-            .send_retry(&self.retry_config)
+            .retryable_request()
+            .send(ctx)
             .await
             .map_err(|source| match source.status() {
                 // Some stores return METHOD_NOT_ALLOWED for get on directories

Reply via email to