This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 7a0504b Retry streaming get requests (#15) (#383)
7a0504b is described below
commit 7a0504b4924fcecee17d768fd7190b8f71b0877f
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu May 29 09:27:49 2025 +0100
Retry streaming get requests (#15) (#383)
* Retry streaming get requests (#15)
* Add tests
* Fix size hint
* Fix wasm32
---
src/aws/client.rs | 16 +-
src/aws/dynamo.rs | 9 +-
src/aws/mod.rs | 11 +-
src/azure/client.rs | 18 +-
src/client/get.rs | 595 +++++++++++++++++++++++++++++++++-------------
src/client/http/body.rs | 8 +
src/client/mock_server.rs | 20 +-
src/client/retry.rs | 209 ++++++++--------
src/gcp/client.rs | 16 +-
src/http/client.rs | 16 +-
10 files changed, 633 insertions(+), 285 deletions(-)
diff --git a/src/aws/client.rs b/src/aws/client.rs
index a1fb463..ce5a091 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -27,7 +27,7 @@ use crate::client::get::GetClient;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::header::{get_put_result, get_version};
use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse, PartMetadata,
@@ -837,8 +837,17 @@ impl GetClient for S3Client {
user_defined_metadata_prefix:
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
};
+ fn retry_config(&self) -> &RetryConfig {
+ &self.config.retry_config
+ }
+
/// Make an S3 GET request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<HttpResponse> {
+ async fn get_request(
+ &self,
+ ctx: &mut RetryContext,
+ path: &Path,
+ options: GetOptions,
+ ) -> Result<HttpResponse> {
let credential = self.config.get_session_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
@@ -863,7 +872,8 @@ impl GetClient for S3Client {
let response = builder
.with_get_options(options)
.with_aws_sigv4(credential.authorizer(), None)
- .send_retry(&self.config.retry_config)
+ .retryable_request()
+ .send(ctx)
.await
.map_err(|e| e.error(STORE, path.to_string()))?;
diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs
index 2823860..a6775ef 100644
--- a/src/aws/dynamo.rs
+++ b/src/aws/dynamo.rs
@@ -20,6 +20,7 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
+use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::Utc;
@@ -181,7 +182,7 @@ impl DynamoCommit {
pub(crate) async fn copy_if_not_exists(
&self,
- client: &S3Client,
+ client: &Arc<S3Client>,
from: &Path,
to: &Path,
) -> Result<()> {
@@ -195,7 +196,7 @@ impl DynamoCommit {
#[allow(clippy::future_not_send)] // Generics confound this lint
pub(crate) async fn conditional_op<F, Fut, T>(
&self,
- client: &S3Client,
+ client: &Arc<S3Client>,
to: &Path,
etag: Option<&str>,
op: F,
@@ -348,7 +349,7 @@ enum TryLockResult {
}
/// Validates that `path` has the given `etag` or doesn't exist if `None`
-async fn check_precondition(client: &S3Client, path: &Path, etag:
Option<&str>) -> Result<()> {
+async fn check_precondition(client: &Arc<S3Client>, path: &Path, etag:
Option<&str>) -> Result<()> {
let options = GetOptions {
head: true,
..Default::default()
@@ -543,7 +544,7 @@ mod tests {
///
/// This is a function called by s3_test to avoid test concurrency issues
pub(crate) async fn integration_test(integration: &AmazonS3, d:
&DynamoCommit) {
- let client = integration.client.as_ref();
+ let client = &integration.client;
let src = Path::from("dynamo_path_src");
integration.put(&src, "asd".into()).await.unwrap();
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 83edf84..6a5b849 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -512,6 +512,7 @@ impl PaginatedListStore for AmazonS3 {
mod tests {
use super::*;
use crate::client::get::GetClient;
+ use crate::client::retry::RetryContext;
use crate::client::SpawnedReqwestConnector;
use crate::integration::*;
use crate::tests::*;
@@ -761,11 +762,14 @@ mod tests {
upload.complete().await.unwrap();
for location in &locations {
+ let mut context =
RetryContext::new(&store.client.config.retry_config);
+
let res = store
.client
- .get_request(location, GetOptions::default())
+ .get_request(&mut context, location, GetOptions::default())
.await
.unwrap();
+
let headers = res.headers();
assert_eq!(
headers
@@ -817,11 +821,14 @@ mod tests {
// Test get with sse-c.
for location in &locations {
+ let mut context =
RetryContext::new(&store.client.config.retry_config);
+
let res = store
.client
- .get_request(location, GetOptions::default())
+ .get_request(&mut context, location, GetOptions::default())
.await
.unwrap();
+
let headers = res.headers();
assert_eq!(
headers
diff --git a/src/azure/client.rs b/src/azure/client.rs
index 329cdd4..428a99b 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -22,7 +22,7 @@ use crate::client::builder::HttpRequestBuilder;
use crate::client::get::GetClient;
use crate::client::header::{get_put_result, HeaderConfig};
use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpRequest,
HttpResponse};
use crate::list::{PaginatedListOptions, PaginatedListResult};
use crate::multipart::PartId;
@@ -896,10 +896,19 @@ impl GetClient for AzureClient {
user_defined_metadata_prefix:
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
};
+ fn retry_config(&self) -> &RetryConfig {
+ &self.config.retry_config
+ }
+
/// Make an Azure GET request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
///
<https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<HttpResponse> {
+ async fn get_request(
+ &self,
+ ctx: &mut RetryContext,
+ path: &Path,
+ options: GetOptions,
+ ) -> Result<HttpResponse> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
@@ -929,12 +938,13 @@ impl GetClient for AzureClient {
.as_deref()
.map(|c| c.sensitive_request())
.unwrap_or_default();
+
let response = builder
.with_get_options(options)
.with_azure_authorization(&credential, &self.config.account)
- .retryable(&self.config.retry_config)
+ .retryable_request()
.sensitive(sensitive)
- .send()
+ .send(ctx)
.await
.map_err(|source| {
let path = path.as_ref().into();
diff --git a/src/client/get.rs b/src/client/get.rs
index 4c65c6d..51d4e1b 100644
--- a/src/client/get.rs
+++ b/src/client/get.rs
@@ -15,20 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-use std::ops::Range;
-
-use crate::client::header::{header_meta, HeaderConfig};
-use crate::client::HttpResponse;
+use crate::client::header::{get_etag, header_meta, HeaderConfig};
+use crate::client::retry::RetryContext;
+use crate::client::{HttpResponse, HttpResponseBody};
use crate::path::Path;
-use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult,
GetResultPayload, Result};
+use crate::{
+ Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload,
ObjectMeta, Result,
+ RetryConfig,
+};
use async_trait::async_trait;
-use futures::{StreamExt, TryStreamExt};
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use futures::StreamExt;
use http::header::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_RANGE,
CONTENT_TYPE,
};
use http::StatusCode;
+use http_body_util::BodyExt;
use reqwest::header::ToStrError;
+use std::ops::Range;
+use std::sync::Arc;
+use tracing::info;
/// A client that can perform a get request
#[async_trait]
@@ -38,7 +46,14 @@ pub(crate) trait GetClient: Send + Sync + 'static {
/// Configure the [`HeaderConfig`] for this client
const HEADER_CONFIG: HeaderConfig;
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<HttpResponse>;
+ fn retry_config(&self) -> &RetryConfig;
+
+ async fn get_request(
+ &self,
+ ctx: &mut RetryContext,
+ path: &Path,
+ options: GetOptions,
+ ) -> Result<HttpResponse>;
}
/// Extension trait for [`GetClient`] that adds common retrieval functionality
@@ -48,20 +63,16 @@ pub(crate) trait GetClientExt {
}
#[async_trait]
-impl<T: GetClient> GetClientExt for T {
+impl<T: GetClient> GetClientExt for Arc<T> {
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- let range = options.range.clone();
- if let Some(r) = range.as_ref() {
- r.is_valid().map_err(|e| crate::Error::Generic {
- store: T::STORE,
- source: Box::new(e),
- })?;
- }
- let response = self.get_request(location, options).await?;
- get_result::<T>(location, range, response).map_err(|e|
crate::Error::Generic {
- store: T::STORE,
- source: Box::new(e),
- })
+ let ctx = GetContext {
+ location: location.clone(),
+ options,
+ client: Self::clone(self),
+ retry_ctx: RetryContext::new(self.retry_config()),
+ };
+
+ ctx.get_result().await
}
}
@@ -145,40 +156,132 @@ enum GetResultError {
},
}
-fn get_result<T: GetClient>(
- location: &Path,
- range: Option<GetRange>,
- response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
- let mut meta = header_meta(location, response.headers(),
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+ client: Arc<T>,
+ location: Path,
+ options: GetOptions,
+ retry_ctx: RetryContext,
+}
- // ensure that we receive the range we asked for
- let range = if let Some(expected) = range {
- if response.status() != StatusCode::PARTIAL_CONTENT {
- return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+ async fn get_result(mut self) -> Result<GetResult> {
+ if let Some(r) = &self.options.range {
+ r.is_valid().map_err(Self::err)?;
}
- let val = response
- .headers()
- .get(CONTENT_RANGE)
- .ok_or(GetResultError::NoContentRange)?;
+ let request = self
+ .client
+ .get_request(&mut self.retry_ctx, &self.location,
self.options.clone())
+ .await?;
+
+ let (parts, body) = request.into_parts();
+ let (range, meta) = get_range_meta(
+ T::HEADER_CONFIG,
+ &self.location,
+ self.options.range.as_ref(),
+ &parts,
+ )
+ .map_err(Self::err)?;
+
+ let attributes = get_attributes(T::HEADER_CONFIG,
&parts.headers).map_err(Self::err)?;
+ let stream = self.retry_stream(body, meta.e_tag.clone(),
range.clone());
+
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(stream),
+ meta,
+ range,
+ attributes,
+ })
+ }
- let value = val
- .to_str()
- .map_err(|source| GetResultError::InvalidContentRange { source })?;
+ fn retry_stream(
+ self,
+ body: HttpResponseBody,
+ etag: Option<String>,
+ range: Range<u64>,
+ ) -> BoxStream<'static, Result<Bytes>> {
+ futures::stream::try_unfold(
+ (self, body, etag, range),
+ |(mut ctx, mut body, etag, mut range)| async move {
+ while let Some(ret) = body.frame().await {
+ match (ret, &etag) {
+ (Ok(frame), _) => match frame.into_data() {
+ Ok(bytes) => {
+ range.start += bytes.len() as u64;
+ return Ok(Some((bytes, (ctx, body, etag,
range))));
+ }
+ Err(_) => continue, // Isn't data frame
+ },
+ // Retry all response body errors
+ (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
+ let sleep = ctx.retry_ctx.backoff();
+ info!(
+ "Encountered error while reading response
body: {}. Retrying in {}s",
+ e,
+ sleep.as_secs_f32()
+ );
+
+ tokio::time::sleep(sleep).await;
+
+ let options = GetOptions {
+ range: Some(GetRange::Bounded(range.clone())),
+ ..ctx.options.clone()
+ };
+
+ // Note: this will potentially retry internally if
applicable
+ let request = ctx
+ .client
+ .get_request(&mut ctx.retry_ctx,
&ctx.location, options)
+ .await
+ .map_err(Self::err)?;
+
+ let (parts, retry_body) = request.into_parts();
+ let retry_etag =
get_etag(&parts.headers).map_err(Self::err)?;
+
+ if etag != &retry_etag {
+ // Return the original error
+ return Err(Self::err(e));
+ }
+
+ body = retry_body;
+ }
+ (Err(e), _) => return Err(Self::err(e)),
+ }
+ }
+ Ok(None)
+ },
+ )
+ .boxed()
+ }
+
+ fn err<E: std::error::Error + Send + Sync + 'static>(e: E) -> crate::Error
{
+ crate::Error::Generic {
+ store: T::STORE,
+ source: Box::new(e),
+ }
+ }
+}
- let value = ContentRange::from_str(value).ok_or_else(|| {
- let value = value.into();
- GetResultError::ParseContentRange { value }
- })?;
+fn get_range_meta(
+ cfg: HeaderConfig,
+ location: &Path,
+ range: Option<&GetRange>,
+ response: &http::response::Parts,
+) -> Result<(Range<u64>, ObjectMeta), GetResultError> {
+ let mut meta = header_meta(location, &response.headers, cfg)?;
+ let range = if let Some(expected) = range {
+ if response.status != StatusCode::PARTIAL_CONTENT {
+ return Err(GetResultError::NotPartial);
+ }
+ let value = parse_range(&response.headers)?;
let actual = value.range;
- // Update size to reflect full size of object (#5272)
+ // Update size to reflect the full size of the object (#5272)
meta.size = value.size;
let expected = expected.as_range(meta.size)?;
-
if actual != expected {
return Err(GetResultError::UnexpectedRange { expected, actual });
}
@@ -188,6 +291,30 @@ fn get_result<T: GetClient>(
0..meta.size
};
+ Ok((range, meta))
+}
+
+/// Extracts the [CONTENT_RANGE] header
+fn parse_range(headers: &http::HeaderMap) -> Result<ContentRange,
GetResultError> {
+ let val = headers
+ .get(CONTENT_RANGE)
+ .ok_or(GetResultError::NoContentRange)?;
+
+ let value = val
+ .to_str()
+ .map_err(|source| GetResultError::InvalidContentRange { source })?;
+
+ ContentRange::from_str(value).ok_or_else(|| {
+ let value = value.into();
+ GetResultError::ParseContentRange { value }
+ })
+}
+
+/// Extracts [`Attributes`] from the response headers
+fn get_attributes(
+ cfg: HeaderConfig,
+ headers: &http::HeaderMap,
+) -> Result<Attributes, GetResultError> {
macro_rules! parse_attributes {
($headers:expr, $(($header:expr, $attr:expr, $map_err:expr)),*) => {{
let mut attributes = Attributes::new();
@@ -202,7 +329,7 @@ fn get_result<T: GetClient>(
}
let mut attributes = parse_attributes!(
- response.headers(),
+ headers,
(CACHE_CONTROL, Attribute::CacheControl, |source| {
GetResultError::InvalidCacheControl { source }
}),
@@ -223,8 +350,8 @@ fn get_result<T: GetClient>(
);
// Add attributes that match the user-defined metadata prefix (e.g.
x-amz-meta-)
- if let Some(prefix) = T::HEADER_CONFIG.user_defined_metadata_prefix {
- for (key, val) in response.headers() {
+ if let Some(prefix) = cfg.user_defined_metadata_prefix {
+ for (key, val) in headers {
if let Some(suffix) = key.as_str().strip_prefix(prefix) {
if let Ok(val_str) = val.to_str() {
attributes.insert(
@@ -239,22 +366,7 @@ fn get_result<T: GetClient>(
}
}
}
-
- let stream = response
- .into_body()
- .bytes_stream()
- .map_err(|source| crate::Error::Generic {
- store: T::STORE,
- source: Box::new(source),
- })
- .boxed();
-
- Ok(GetResult {
- range,
- meta,
- attributes,
- payload: GetResultPayload::Stream(stream),
- })
+ Ok(attributes)
}
#[cfg(test)]
@@ -262,41 +374,17 @@ mod tests {
use super::*;
use http::header::*;
- struct TestClient {}
-
- #[async_trait]
- impl GetClient for TestClient {
- const STORE: &'static str = "TEST";
-
- const HEADER_CONFIG: HeaderConfig = HeaderConfig {
- etag_required: false,
- last_modified_required: false,
- version_header: None,
- user_defined_metadata_prefix: Some("x-test-meta-"),
- };
-
- async fn get_request(&self, _: &Path, _: GetOptions) ->
Result<HttpResponse> {
- unimplemented!()
- }
- }
-
fn make_response(
object_size: usize,
- range: Option<Range<usize>>,
status: StatusCode,
content_range: Option<&str>,
headers: Option<Vec<(&str, &str)>>,
- ) -> HttpResponse {
+ ) -> http::response::Parts {
let mut builder = http::Response::builder();
if let Some(range) = content_range {
builder = builder.header(CONTENT_RANGE, range);
}
- let body = match range {
- Some(range) => vec![0_u8; range.end - range.start],
- None => vec![0_u8; object_size],
- };
-
if let Some(headers) = headers {
for (key, value) in headers {
builder = builder.header(key, value);
@@ -306,124 +394,305 @@ mod tests {
builder
.status(status)
.header(CONTENT_LENGTH, object_size)
- .body(body.into())
+ .body(())
.unwrap()
+ .into_parts()
+ .0
}
+ const CFG: HeaderConfig = HeaderConfig {
+ etag_required: false,
+ last_modified_required: false,
+ version_header: None,
+ user_defined_metadata_prefix: Some("x-test-meta-"),
+ };
+
#[tokio::test]
- async fn test_get_result() {
+ async fn test_get_range_meta() {
let path = Path::from("test");
- let resp = make_response(12, None, StatusCode::OK, None, None);
- let res = get_result::<TestClient>(&path, None, resp).unwrap();
- assert_eq!(res.meta.size, 12);
- assert_eq!(res.range, 0..12);
- let bytes = res.bytes().await.unwrap();
- assert_eq!(bytes.len(), 12);
+ let resp = make_response(12, StatusCode::OK, None, None);
+ let (range, meta) = get_range_meta(CFG, &path, None, &resp).unwrap();
+ assert_eq!(meta.size, 12);
+ assert_eq!(range, 0..12);
let get_range = GetRange::from(2..3);
- let resp = make_response(
- 12,
- Some(2..3),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-2/12"),
- None,
- );
- let res = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap();
- assert_eq!(res.meta.size, 12);
- assert_eq!(res.range, 2..3);
- let bytes = res.bytes().await.unwrap();
- assert_eq!(bytes.len(), 1);
-
- let resp = make_response(12, Some(2..3), StatusCode::OK, None, None);
- let err = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap_err();
+ let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes
2-2/12"), None);
+ let (range, meta) = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap();
+ assert_eq!(meta.size, 12);
+ assert_eq!(range, 2..3);
+
+ let resp = make_response(12, StatusCode::OK, None, None);
+ let err = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap_err();
assert_eq!(
err.to_string(),
"Received non-partial response when range requested"
);
- let resp = make_response(
- 12,
- Some(2..3),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-3/12"),
- None,
- );
- let err = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap_err();
+ let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes
2-3/12"), None);
+ let err = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..3, got 2..4");
- let resp = make_response(
- 12,
- Some(2..3),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-2/*"),
- None,
- );
- let err = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap_err();
+ let resp = make_response(12, StatusCode::PARTIAL_CONTENT, Some("bytes
2-2/*"), None);
+ let err = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap_err();
assert_eq!(
err.to_string(),
"Failed to parse value for CONTENT_RANGE header: \"bytes 2-2/*\""
);
- let resp = make_response(12, Some(2..3), StatusCode::PARTIAL_CONTENT,
None, None);
- let err = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap_err();
+ let resp = make_response(12, StatusCode::PARTIAL_CONTENT, None, None);
+ let err = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap_err();
assert_eq!(
err.to_string(),
"Content-Range header not present in partial response"
);
- let resp = make_response(
- 2,
- Some(2..3),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-3/2"),
- None,
- );
- let err = get_result::<TestClient>(&path, Some(get_range.clone()),
resp).unwrap_err();
+ let resp = make_response(2, StatusCode::PARTIAL_CONTENT, Some("bytes
2-3/2"), None);
+ let err = get_range_meta(CFG, &path, Some(&get_range),
&resp).unwrap_err();
assert_eq!(
err.to_string(),
"Wanted range starting at 2, but object was only 2 bytes long"
);
- let resp = make_response(
- 6,
- Some(2..6),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-5/6"),
- None,
- );
- let res = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)),
resp).unwrap();
- assert_eq!(res.meta.size, 6);
- assert_eq!(res.range, 2..6);
- let bytes = res.bytes().await.unwrap();
- assert_eq!(bytes.len(), 4);
+ let resp = make_response(6, StatusCode::PARTIAL_CONTENT, Some("bytes
2-5/6"), None);
+ let (range, meta) = get_range_meta(CFG, &path,
Some(&GetRange::Suffix(4)), &resp).unwrap();
+ assert_eq!(meta.size, 6);
+ assert_eq!(range, 2..6);
- let resp = make_response(
- 6,
- Some(2..6),
- StatusCode::PARTIAL_CONTENT,
- Some("bytes 2-3/6"),
- None,
- );
- let err = get_result::<TestClient>(&path, Some(GetRange::Suffix(4)),
resp).unwrap_err();
+ let resp = make_response(6, StatusCode::PARTIAL_CONTENT, Some("bytes
2-3/6"), None);
+ let err = get_range_meta(CFG, &path, Some(&GetRange::Suffix(4)),
&resp).unwrap_err();
assert_eq!(err.to_string(), "Requested 2..6, got 2..4");
+ }
+ #[test]
+ fn test_get_attributes() {
let resp = make_response(
12,
- None,
StatusCode::OK,
None,
Some(vec![("x-test-meta-foo", "bar")]),
);
- let res = get_result::<TestClient>(&path, None, resp).unwrap();
- assert_eq!(res.meta.size, 12);
- assert_eq!(res.range, 0..12);
+
+ let attributes = get_attributes(CFG, &resp.headers).unwrap();
assert_eq!(
- res.attributes.get(&Attribute::Metadata("foo".into())),
+ attributes.get(&Attribute::Metadata("foo".into())),
Some(&"bar".into())
);
- let bytes = res.bytes().await.unwrap();
- assert_eq!(bytes.len(), 12);
+ }
+}
+#[cfg(all(test, feature = "http", not(target_arch = "wasm32")))]
+mod http_tests {
+ use crate::client::mock_server::MockServer;
+ use crate::client::{HttpError, HttpErrorKind, HttpResponseBody};
+ use crate::http::HttpBuilder;
+ use crate::path::Path;
+ use crate::{ClientOptions, ObjectStore, RetryConfig};
+ use bytes::Bytes;
+ use futures::FutureExt;
+ use http::header::{CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE};
+ use http::{Response, StatusCode};
+ use hyper::body::Frame;
+ use std::pin::Pin;
+ use std::task::{ready, Context, Poll};
+ use std::time::Duration;
+
+ #[derive(Debug, thiserror::Error)]
+ #[error("ChunkedErr")]
+ struct ChunkedErr {}
+
+ /// A Body from a list of results
+ ///
+ /// Sleeps between each frame to avoid the HTTP Server coalescing the
frames
+ struct Chunked {
+ chunks: std::vec::IntoIter<Result<Bytes, ()>>,
+ sleep: Option<Pin<Box<tokio::time::Sleep>>>,
+ }
+
+ impl Chunked {
+ fn new(v: Vec<Result<Bytes, ()>>) -> Self {
+ Self {
+ chunks: v.into_iter(),
+ sleep: None,
+ }
+ }
+ }
+
+ impl hyper::body::Body for Chunked {
+ type Data = Bytes;
+ type Error = HttpError;
+
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ if let Some(sleep) = &mut self.sleep {
+ ready!(sleep.poll_unpin(cx));
+ self.sleep = None;
+ }
+
+ Poll::Ready(match self.chunks.next() {
+ None => None,
+ Some(Ok(b)) => {
+ self.sleep =
Some(Box::pin(tokio::time::sleep(Duration::from_millis(1))));
+ Some(Ok(Frame::data(b)))
+ }
+ Some(Err(_)) =>
Some(Err(HttpError::new(HttpErrorKind::Unknown, ChunkedErr {}))),
+ })
+ }
+ }
+
+ impl From<Chunked> for HttpResponseBody {
+ fn from(value: Chunked) -> Self {
+ Self::new(value)
+ }
+ }
+
+ #[tokio::test]
+ async fn test_stream_retry() {
+ let mock = MockServer::new().await;
+ let retry = RetryConfig {
+ backoff: Default::default(),
+ max_retries: 3,
+ retry_timeout: Duration::from_secs(1000),
+ };
+
+ let options = ClientOptions::new().with_allow_http(true);
+ let store = HttpBuilder::new()
+ .with_client_options(options)
+ .with_retry(retry)
+ .with_url(mock.url())
+ .build()
+ .unwrap();
+
+ let path = Path::from("test");
+
+ // Test basic
+ let resp = Response::builder()
+ .header(CONTENT_LENGTH, 11)
+ .header(ETAG, "123")
+ .body("Hello World".to_string())
+ .unwrap();
+
+ mock.push(resp);
+
+ let b = store.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(b.as_ref(), b"Hello World");
+
+ // Should retry with range
+ mock.push(
+ Response::builder()
+ .header(CONTENT_LENGTH, 10)
+ .header(ETAG, "123")
+ .body(Chunked::new(vec![
+ Ok(Bytes::from_static(b"banana")),
+ Err(()),
+ ]))
+ .unwrap(),
+ );
+
+ mock.push_fn(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=6-9"
+ );
+
+ Response::builder()
+ .status(StatusCode::PARTIAL_CONTENT)
+ .header(CONTENT_LENGTH, 3)
+ .header(ETAG, "123")
+ .header(CONTENT_RANGE, "bytes 6-9/10")
+ .body("123".to_string())
+ .unwrap()
+ });
+
+ let ret = store.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(ret.as_ref(), b"banana123");
+
+ // Should retry multiple times
+ mock.push(
+ Response::builder()
+ .header(CONTENT_LENGTH, 20)
+ .header(ETAG, "foo")
+ .body(Chunked::new(vec![
+ Ok(Bytes::from_static(b"hello")),
+ Err(()),
+ ]))
+ .unwrap(),
+ );
+
+ mock.push_fn(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=5-19"
+ );
+
+ Response::builder()
+ .status(StatusCode::PARTIAL_CONTENT)
+ .header(CONTENT_LENGTH, 15)
+ .header(ETAG, "foo")
+ .header(CONTENT_RANGE, "bytes 5-19/20")
+ .body(Chunked::new(vec![Ok(Bytes::from_static(b"baz")),
Err(())]))
+ .unwrap()
+ });
+
+ mock.push_fn::<_, String>(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=8-19"
+ );
+ Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body("ignored".to_string())
+ .unwrap()
+ });
+
+ mock.push_fn(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=8-19"
+ );
+
+ Response::builder()
+ .status(StatusCode::PARTIAL_CONTENT)
+ .header(CONTENT_LENGTH, 12)
+ .header(ETAG, "foo")
+ .header(CONTENT_RANGE, "bytes 8-19/20")
+ .body("123456789012".to_string())
+ .unwrap()
+ });
+
+ let ret = store.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(ret.as_ref(), b"hellobaz123456789012");
+
+ // Should abort if etag doesn't match
+ mock.push(
+ Response::builder()
+ .header(CONTENT_LENGTH, 12)
+ .header(ETAG, "foo")
+ .body(Chunked::new(vec![Ok(Bytes::from_static(b"test")),
Err(())]))
+ .unwrap(),
+ );
+
+ mock.push_fn(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=4-11"
+ );
+
+ Response::builder()
+ .status(StatusCode::PARTIAL_CONTENT)
+ .header(CONTENT_LENGTH, 7)
+ .header(ETAG, "baz")
+ .header(CONTENT_RANGE, "bytes 4-11/12")
+ .body("1234567".to_string())
+ .unwrap()
+ });
+
+ let err = store.get(&path).await.unwrap().bytes().await.unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Generic HTTP error: HTTP error: request or response body error"
+ );
}
}
diff --git a/src/client/http/body.rs b/src/client/http/body.rs
index e696904..504bebe 100644
--- a/src/client/http/body.rs
+++ b/src/client/http/body.rs
@@ -213,6 +213,14 @@ impl Body for HttpResponseBody {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Pin::new(&mut self.0).poll_frame(cx)
}
+
+ fn is_end_stream(&self) -> bool {
+ self.0.is_end_stream()
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ self.0.size_hint()
+ }
}
impl From<Bytes> for HttpResponseBody {
diff --git a/src/client/mock_server.rs b/src/client/mock_server.rs
index 8be4a72..9caf731 100644
--- a/src/client/mock_server.rs
+++ b/src/client/mock_server.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::client::{HttpResponse, HttpResponseBody};
use futures::future::BoxFuture;
use futures::FutureExt;
use hyper::body::Incoming;
@@ -33,7 +34,7 @@ use tokio::sync::oneshot;
use tokio::task::{JoinHandle, JoinSet};
pub(crate) type ResponseFn =
- Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>>
+ Send>;
+ Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, HttpResponse> +
Send>;
/// A mock server
pub(crate) struct MockServer {
@@ -76,7 +77,7 @@ impl MockServer {
async move {
Ok::<_, Infallible>(match next {
Some(r) => r(req).await,
- None => Response::new("Hello
World".to_string()),
+ None => HttpResponse::new("Hello
World".to_string().into()),
})
}
}),
@@ -102,16 +103,18 @@ impl MockServer {
}
/// Add a response
- pub(crate) fn push(&self, response: Response<String>) {
- self.push_fn(|_| response)
+ pub(crate) fn push<B: Into<HttpResponseBody>>(&self, response:
Response<B>) {
+ let resp = response.map(Into::into);
+ self.push_fn(|_| resp)
}
/// Add a response function
- pub(crate) fn push_fn<F>(&self, f: F)
+ pub(crate) fn push_fn<F, B>(&self, f: F)
where
- F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
+ F: FnOnce(Request<Incoming>) -> Response<B> + Send + 'static,
+ B: Into<HttpResponseBody>,
{
- let f = Box::new(|req| async move { f(req) }.boxed());
+ let f = Box::new(|req| async move { f(req).map(Into::into) }.boxed());
self.responses.lock().push_back(f)
}
@@ -120,7 +123,8 @@ impl MockServer {
F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
Fut: Future<Output = Response<String>> + Send + 'static,
{
- self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
+ let f = Box::new(|r| f(r).map(|b| b.map(Into::into)).boxed());
+ self.responses.lock().push_back(f)
}
/// Shutdown the mock server
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 7df4f9e..fb09813 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -67,30 +67,35 @@ impl std::fmt::Display for RetryError {
}
/// Context of the retry loop
-struct RetryContext {
- method: Method,
- uri: Option<Uri>,
+///
+/// Most use-cases should use [`RetryExt`] and [`RetryableRequestBuilder`],
however,
+/// [`RetryContext`] allows preserving retry state across multiple
[`RetryableRequest`]
+pub(crate) struct RetryContext {
+ backoff: Backoff,
retries: usize,
max_retries: usize,
- start: Instant,
retry_timeout: Duration,
+ start: Instant,
}
impl RetryContext {
- fn err(self, error: RequestError) -> RetryError {
- RetryError(Box::new(RetryErrorImpl {
- uri: self.uri,
- method: self.method,
- retries: self.retries,
- max_retries: self.max_retries,
- elapsed: self.start.elapsed(),
- retry_timeout: self.retry_timeout,
- inner: error,
- }))
+ pub(crate) fn new(config: &RetryConfig) -> Self {
+ Self {
+ max_retries: config.max_retries,
+ retry_timeout: config.retry_timeout,
+ backoff: Backoff::new(&config.backoff),
+ retries: 0,
+ start: Instant::now(),
+ }
+ }
+
+ pub(crate) fn exhausted(&self) -> bool {
+ self.retries >= self.max_retries || self.start.elapsed() >
self.retry_timeout
}
- fn exhausted(&self) -> bool {
- self.retries == self.max_retries || self.start.elapsed() >
self.retry_timeout
+ pub(crate) fn backoff(&mut self) -> Duration {
+ self.retries += 1;
+ self.backoff.next()
}
}
@@ -248,81 +253,89 @@ fn body_contains_error(response_body: &str) -> bool {
response_body.contains("InternalError") ||
response_body.contains("SlowDown")
}
-pub(crate) struct RetryableRequest {
- client: HttpClient,
- request: HttpRequest,
-
- max_retries: usize,
- retry_timeout: Duration,
- backoff: Backoff,
-
- sensitive: bool,
- idempotent: Option<bool>,
- retry_on_conflict: bool,
- payload: Option<PutPayload>,
-
- retry_error_body: bool,
+/// Combines a [`RetryableRequest`] with a [`RetryContext`]
+pub(crate) struct RetryableRequestBuilder {
+ request: RetryableRequest,
+ context: RetryContext,
}
-impl RetryableRequest {
+impl RetryableRequestBuilder {
/// Set whether this request is idempotent
///
/// An idempotent request will be retried on timeout even if the request
/// method is not
[safe](https://datatracker.ietf.org/doc/html/rfc7231#section-4.2.1)
- pub(crate) fn idempotent(self, idempotent: bool) -> Self {
- Self {
- idempotent: Some(idempotent),
- ..self
- }
+ pub(crate) fn idempotent(mut self, idempotent: bool) -> Self {
+ self.request.idempotent = Some(idempotent);
+ self
}
/// Set whether this request should be retried on a 409 Conflict response.
#[cfg(feature = "aws")]
- pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
- Self {
- retry_on_conflict,
- ..self
- }
+ pub(crate) fn retry_on_conflict(mut self, retry_on_conflict: bool) -> Self
{
+ self.request.retry_on_conflict = retry_on_conflict;
+ self
}
/// Set whether this request contains sensitive data
///
/// This will avoid printing out the URL in error messages
#[allow(unused)]
- pub(crate) fn sensitive(self, sensitive: bool) -> Self {
- Self { sensitive, ..self }
+ pub(crate) fn sensitive(mut self, sensitive: bool) -> Self {
+ self.request.sensitive = sensitive;
+ self
}
/// Provide a [`PutPayload`]
- pub(crate) fn payload(self, payload: Option<PutPayload>) -> Self {
- Self { payload, ..self }
+ pub(crate) fn payload(mut self, payload: Option<PutPayload>) -> Self {
+ self.request.payload = payload;
+ self
}
#[allow(unused)]
- pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
- Self {
- retry_error_body,
- ..self
- }
+ pub(crate) fn retry_error_body(mut self, retry_error_body: bool) -> Self {
+ self.request.retry_error_body = retry_error_body;
+ self
}
- pub(crate) async fn send(self) -> Result<HttpResponse> {
- let mut ctx = RetryContext {
- retries: 0,
- uri: (!self.sensitive).then(|| self.request.uri().clone()),
- method: self.request.method().clone(),
- max_retries: self.max_retries,
- start: Instant::now(),
- retry_timeout: self.retry_timeout,
- };
+ pub(crate) async fn send(mut self) -> Result<HttpResponse> {
+ self.request.send(&mut self.context).await
+ }
+}
- let mut backoff = self.backoff;
- let is_idempotent = self
- .idempotent
- .unwrap_or_else(|| self.request.method().is_safe());
+/// A retryable request
+pub(crate) struct RetryableRequest {
+ client: HttpClient,
+ http: HttpRequest,
+ sensitive: bool,
+ idempotent: Option<bool>,
+ retry_on_conflict: bool,
+ payload: Option<PutPayload>,
+
+ retry_error_body: bool,
+}
+
+impl RetryableRequest {
+ #[allow(unused)]
+ pub(crate) fn sensitive(self, sensitive: bool) -> Self {
+ Self { sensitive, ..self }
+ }
+
+ fn err(&self, error: RequestError, ctx: &RetryContext) -> RetryError {
+ RetryError(Box::new(RetryErrorImpl {
+ uri: (!self.sensitive).then(|| self.http.uri().clone()),
+ method: self.http.method().clone(),
+ retries: ctx.retries,
+ max_retries: ctx.max_retries,
+ elapsed: ctx.start.elapsed(),
+ retry_timeout: ctx.retry_timeout,
+ inner: error,
+ }))
+ }
+
+ pub(crate) async fn send(self, ctx: &mut RetryContext) ->
Result<HttpResponse> {
loop {
- let mut request = self.request.clone();
+ let mut request = self.http.clone();
if let Some(payload) = &self.payload {
*request.body_mut() = payload.clone().into();
@@ -343,7 +356,7 @@ impl RetryableRequest {
let (parts, body) = r.into_parts();
let body = match body.text().await {
Ok(body) => body,
- Err(e) => return
Err(ctx.err(RequestError::Http(e))),
+ Err(e) => return
Err(self.err(RequestError::Http(e), ctx)),
};
if !body_contains_error(&body) {
@@ -352,11 +365,10 @@ impl RetryableRequest {
} else {
// Retry as if this was a 5xx response
if ctx.exhausted() {
- return Err(ctx.err(RequestError::Response {
body, status }));
+ return Err(self.err(RequestError::Response {
body, status }, ctx));
}
- let sleep = backoff.next();
- ctx.retries += 1;
+ let sleep = ctx.backoff();
info!(
"Encountered a response status of {} but body
contains Error, backing off for {} seconds, retry {} of {}",
status,
@@ -367,15 +379,18 @@ impl RetryableRequest {
tokio::time::sleep(sleep).await;
}
} else if status == StatusCode::NOT_MODIFIED {
- return Err(ctx.err(RequestError::Status { status,
body: None }));
+ return Err(self.err(RequestError::Status { status,
body: None }, ctx));
} else if status.is_redirection() {
let is_bare_redirect =
!r.headers().contains_key(LOCATION);
return match is_bare_redirect {
- true => Err(ctx.err(RequestError::BareRedirect)),
- false => Err(ctx.err(RequestError::Status {
- body: None,
- status: r.status(),
- })),
+ true => Err(self.err(RequestError::BareRedirect,
ctx)),
+ false => Err(self.err(
+ RequestError::Status {
+ body: None,
+ status: r.status(),
+ },
+ ctx,
+ )),
};
} else {
let status = r.status();
@@ -393,11 +408,10 @@ impl RetryableRequest {
},
false => RequestError::Status { status, body:
None },
};
- return Err(ctx.err(source));
+ return Err(self.err(source, ctx));
};
- let sleep = backoff.next();
- ctx.retries += 1;
+ let sleep = ctx.backoff();
info!(
"Encountered server error, backing off for {}
seconds, retry {} of {}",
sleep.as_secs_f32(),
@@ -408,7 +422,9 @@ impl RetryableRequest {
}
}
Err(e) => {
- // let e = sanitize_err(e);
+ let is_idempotent = self
+ .idempotent
+ .unwrap_or_else(|| self.http.method().is_safe());
let do_retry = match e.kind() {
HttpErrorKind::Connect | HttpErrorKind::Request =>
true, // Request not sent, can retry
@@ -416,14 +432,10 @@ impl RetryableRequest {
HttpErrorKind::Unknown | HttpErrorKind::Decode =>
false,
};
- if ctx.retries == ctx.max_retries
- || ctx.start.elapsed() > ctx.retry_timeout
- || !do_retry
- {
- return Err(ctx.err(RequestError::Http(e)));
+ if ctx.exhausted() || !do_retry {
+ return Err(self.err(RequestError::Http(e), ctx));
}
- let sleep = backoff.next();
- ctx.retries += 1;
+ let sleep = ctx.backoff();
info!(
"Encountered transport error backing off for {}
seconds, retry {} of {}: {}",
sleep.as_secs_f32(),
@@ -439,8 +451,11 @@ impl RetryableRequest {
}
pub(crate) trait RetryExt {
+ /// Return a [`RetryableRequestBuilder`]
+ fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder;
+
/// Return a [`RetryableRequest`]
- fn retryable(self, config: &RetryConfig) -> RetryableRequest;
+ fn retryable_request(self) -> RetryableRequest;
/// Dispatch a request with the given retry configuration
///
@@ -451,16 +466,20 @@ pub(crate) trait RetryExt {
}
impl RetryExt for HttpRequestBuilder {
- fn retryable(self, config: &RetryConfig) -> RetryableRequest {
+ fn retryable(self, config: &RetryConfig) -> RetryableRequestBuilder {
+ RetryableRequestBuilder {
+ request: self.retryable_request(),
+ context: RetryContext::new(config),
+ }
+ }
+
+ fn retryable_request(self) -> RetryableRequest {
let (client, request) = self.into_parts();
let request = request.expect("request must be valid");
RetryableRequest {
client,
- request,
- max_retries: config.max_retries,
- retry_timeout: config.retry_timeout,
- backoff: Backoff::new(&config.backoff),
+ http: request,
idempotent: None,
payload: None,
sensitive: false,
@@ -524,7 +543,7 @@ mod tests {
let r = do_request().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);
- // Returns client errors immediately with status message
+ // Returns client errors immediately with a status message
mock.push(
Response::builder()
.status(StatusCode::BAD_REQUEST)
@@ -645,13 +664,13 @@ mod tests {
);
// Panic results in an incomplete message error in the client
- mock.push_fn(|_| panic!());
+ mock.push_fn::<_, String>(|_| panic!());
let r = do_request().await.unwrap();
assert_eq!(r.status(), StatusCode::OK);
// Gives up after retrying multiple panics
for _ in 0..=retry.max_retries {
- mock.push_fn(|_| panic!());
+ mock.push_fn::<_, String>(|_| panic!());
}
let e = do_request().await.unwrap_err().to_string();
assert!(
@@ -710,7 +729,7 @@ mod tests {
assert!(!err.contains("SENSITIVE"), "{err}");
for _ in 0..=retry.max_retries {
- mock.push_fn(|_| panic!());
+ mock.push_fn::<_, String>(|_| panic!());
}
let req = client
diff --git a/src/gcp/client.rs b/src/gcp/client.rs
index f815085..0378843 100644
--- a/src/gcp/client.rs
+++ b/src/gcp/client.rs
@@ -19,7 +19,7 @@ use crate::client::builder::HttpRequestBuilder;
use crate::client::get::GetClient;
use crate::client::header::{get_put_result, get_version, HeaderConfig};
use crate::client::list::ListClient;
-use crate::client::retry::RetryExt;
+use crate::client::retry::{RetryContext, RetryExt};
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult,
InitiateMultipartUploadResult,
ListResponse,
@@ -617,8 +617,17 @@ impl GetClient for GoogleCloudStorageClient {
user_defined_metadata_prefix:
Some(USER_DEFINED_METADATA_HEADER_PREFIX),
};
+ fn retry_config(&self) -> &RetryConfig {
+ &self.config.retry_config
+ }
+
/// Perform a get request
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<HttpResponse> {
+ async fn get_request(
+ &self,
+ ctx: &mut RetryContext,
+ path: &Path,
+ options: GetOptions,
+ ) -> Result<HttpResponse> {
let credential = self.get_credential().await?;
let url = self.object_url(path);
@@ -636,7 +645,8 @@ impl GetClient for GoogleCloudStorageClient {
let response = request
.with_bearer_auth(credential.as_deref())
.with_get_options(options)
- .send_retry(&self.config.retry_config)
+ .retryable_request()
+ .send(ctx)
.await
.map_err(|source| {
let path = path.as_ref().into();
diff --git a/src/http/client.rs b/src/http/client.rs
index 8a96e74..272f7c6 100644
--- a/src/http/client.rs
+++ b/src/http/client.rs
@@ -18,7 +18,7 @@
use super::STORE;
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
-use crate::client::retry::{self, RetryConfig, RetryExt};
+use crate::client::retry::{self, RetryConfig, RetryContext, RetryExt};
use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse};
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
@@ -353,7 +353,16 @@ impl GetClient for Client {
user_defined_metadata_prefix: None,
};
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<HttpResponse> {
+ fn retry_config(&self) -> &RetryConfig {
+ &self.retry_config
+ }
+
+ async fn get_request(
+ &self,
+ ctx: &mut RetryContext,
+ path: &Path,
+ options: GetOptions,
+ ) -> Result<HttpResponse> {
let url = self.path_url(path);
let method = match options.head {
true => Method::HEAD,
@@ -364,7 +373,8 @@ impl GetClient for Client {
let res = builder
.with_get_options(options)
- .send_retry(&self.retry_config)
+ .retryable_request()
+ .send(ctx)
.await
.map_err(|source| match source.status() {
// Some stores return METHOD_NOT_ALLOWED for get on directories