This is an automated email from the ASF dual-hosted git repository.
jayzhan211 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 4d042dc fix(retry): respect range header on retry (#690)
4d042dc is described below
commit 4d042dc6136e8eccdc559979663f6773419e83d3
Author: dentiny <[email protected]>
AuthorDate: Wed May 6 05:44:25 2026 -0700
fix(retry): respect range header on retry (#690)
* fix(retry): respect range header on retry
* clippy
---
src/client/get.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 95 insertions(+), 1 deletion(-)
diff --git a/src/client/get.rs b/src/client/get.rs
index 80b3f15..044919b 100644
--- a/src/client/get.rs
+++ b/src/client/get.rs
@@ -244,7 +244,48 @@ impl<T: GetClient> GetContext<T> {
return Err(Self::err(e));
}
- body = retry_body;
+ // Validate the Content-Range of the retry response
+ let content_range =
+
parse_range(&parts.headers).map_err(Self::err)?;
+ let actual = content_range.range;
+
+ // Exact match — use body as-is
+ if actual == range {
+ body = retry_body;
+ } else if actual.start <= range.start &&
actual.end >= range.end
+ {
+ // Received range is a superset for requested
content,
+ // skip leading bytes to align to the needed
offset.
+ let skip = (range.start - actual.start) as
usize;
+ let mut skipped = 0;
+ let mut retry_body = retry_body;
+ while skipped < skip {
+ let frame = retry_body.frame().await
+ .ok_or_else(||
Self::err(GetResultError::UnexpectedRange {
+ expected: range.clone(), actual:
actual.clone(),
+ }))?
+ .map_err(Self::err)?;
+ let Some(bytes) = frame.into_data().ok()
else { continue };
+ let remaining = skip - skipped;
+ if bytes.len() <= remaining {
+ skipped += bytes.len();
+ } else {
+ let keep = bytes.slice(remaining..);
+ range.start += keep.len() as u64;
+ body = retry_body;
+ let etag = Some(etag.clone());
+ return Ok(Some((keep, (ctx, body,
etag, range))));
+ }
+ }
+ body = retry_body;
+ } else {
+ return Err(Self::err(
+ GetResultError::UnexpectedRange {
+ expected: range,
+ actual,
+ },
+ ));
+ }
}
(Err(e), _) => return Err(Self::err(e)),
}
@@ -772,4 +813,57 @@ mod http_tests {
"Generic HTTP error: HTTP error: request or response body error"
);
}
+
+ #[tokio::test]
+ async fn test_retry_validate_content_range() {
+ let mock = MockServer::new().await;
+ let retry = RetryConfig {
+ backoff: Default::default(),
+ max_retries: 3,
+ retry_timeout: Duration::from_secs(1000),
+ };
+
+ let options = ClientOptions::new().with_allow_http(true);
+ let store = HttpBuilder::new()
+ .with_client_options(options)
+ .with_retry(retry)
+ .with_url(mock.url())
+ .build()
+ .unwrap();
+
+ let path = Path::from("test");
+
+ mock.push(
+ Response::builder()
+ .header(CONTENT_LENGTH, 10)
+ .header(ETAG, "abc")
+ .body(Chunked::new(vec![
+ Ok(Bytes::from_static(b"hello")),
+ Err(()),
+ ]))
+ .unwrap(),
+ );
+
+ mock.push_fn(|req| {
+ assert_eq!(
+ req.headers().get(RANGE).unwrap().to_str().unwrap(),
+ "bytes=5-9"
+ );
+
+ Response::builder()
+ .status(StatusCode::PARTIAL_CONTENT)
+ .header(CONTENT_LENGTH, 10)
+ .header(ETAG, "abc")
+ .header(CONTENT_RANGE, "bytes 0-9/10")
+ .body("helloworld".to_string())
+ .unwrap()
+ });
+
+ let result = store.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(
+ result.as_ref(),
+ b"helloworld",
+ "expected correct 10-byte content"
+ );
+ }
}