alamb commented on code in PR #383:
URL:
https://github.com/apache/arrow-rs-object-store/pull/383#discussion_r2113767672
##########
src/client/get.rs:
##########
@@ -145,40 +156,133 @@ enum GetResultError {
},
}
-fn get_result<T: GetClient>(
- location: &Path,
- range: Option<GetRange>,
- response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
- let mut meta = header_meta(location, response.headers(),
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+ client: Arc<T>,
+ location: Path,
+ options: GetOptions,
+ retry_ctx: RetryContext,
+}
- // ensure that we receive the range we asked for
- let range = if let Some(expected) = range {
- if response.status() != StatusCode::PARTIAL_CONTENT {
- return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+ async fn get_result(mut self) -> Result<GetResult> {
+ if let Some(r) = &self.options.range {
+ r.is_valid().map_err(Self::err)?;
}
- let val = response
- .headers()
- .get(CONTENT_RANGE)
- .ok_or(GetResultError::NoContentRange)?;
+ let request = self
+ .client
+ .get_request(&mut self.retry_ctx, &self.location,
self.options.clone())
+ .await?;
+
+ let (parts, body) = request.into_parts();
+ let (range, meta) = get_range_meta(
+ T::HEADER_CONFIG,
+ &self.location,
+ self.options.range.as_ref(),
+ &parts,
+ )
+ .map_err(Self::err)?;
+
+ let attributes = get_attributes(T::HEADER_CONFIG,
&parts.headers).map_err(Self::err)?;
+ let stream = self.retry_stream(body, meta.e_tag.clone(),
range.clone());
+
+ Ok(GetResult {
+ payload: GetResultPayload::Stream(stream),
+ meta,
+ range,
+ attributes,
+ })
+ }
- let value = val
- .to_str()
- .map_err(|source| GetResultError::InvalidContentRange { source })?;
+ fn retry_stream(
+ self,
+ body: HttpResponseBody,
+ etag: Option<String>,
+ range: Range<u64>,
+ ) -> BoxStream<'static, Result<Bytes>> {
+ futures::stream::try_unfold(
+ (self, body, etag, range),
+ |(mut ctx, mut body, etag, mut range)| async move {
+ while let Some(ret) = body.frame().await {
+ match (ret, &etag) {
+ (Ok(frame), _) => match frame.into_data() {
+ Ok(bytes) => {
+ range.start += bytes.len() as u64;
+ return Ok(Some((bytes, (ctx, body, etag,
range))));
+ }
+ Err(_) => continue, // Isn't data frame
+ },
+ // Retry all response body errors
+ (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
+ let sleep = ctx.retry_ctx.backoff();
Review Comment:
This strategy will make many classes of transient error less severe which is
great
However, it is still possible to get timeout errors when actively reading a
streaming response body as the retry mechanism kicks in after timeout, even
though progress was made
At some point, it may make sense to detect when progress was made and adjust
the remaining retries / policy accordingly.
- I filed https://github.com/apache/arrow-rs-object-store/issues/386
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]