alamb commented on code in PR #383: URL: https://github.com/apache/arrow-rs-object-store/pull/383#discussion_r2113767672
########## src/client/get.rs: ########## @@ -145,40 +156,133 @@ enum GetResultError { }, } -fn get_result<T: GetClient>( - location: &Path, - range: Option<GetRange>, - response: HttpResponse, -) -> Result<GetResult, GetResultError> { - let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; +/// Retry context for a streaming get request +struct GetContext<T: GetClient> { + client: Arc<T>, + location: Path, + options: GetOptions, + retry_ctx: RetryContext, +} - // ensure that we receive the range we asked for - let range = if let Some(expected) = range { - if response.status() != StatusCode::PARTIAL_CONTENT { - return Err(GetResultError::NotPartial); +impl<T: GetClient> GetContext<T> { + async fn get_result(mut self) -> Result<GetResult> { + if let Some(r) = &self.options.range { + r.is_valid().map_err(Self::err)?; } - let val = response - .headers() - .get(CONTENT_RANGE) - .ok_or(GetResultError::NoContentRange)?; + let request = self + .client + .get_request(&mut self.retry_ctx, &self.location, self.options.clone()) + .await?; + + let (parts, body) = request.into_parts(); + let (range, meta) = get_range_meta( + T::HEADER_CONFIG, + &self.location, + self.options.range.as_ref(), + &parts, + ) + .map_err(Self::err)?; + + let attributes = get_attributes(T::HEADER_CONFIG, &parts.headers).map_err(Self::err)?; + let stream = self.retry_stream(body, meta.e_tag.clone(), range.clone()); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes, + }) + } - let value = val - .to_str() - .map_err(|source| GetResultError::InvalidContentRange { source })?; + fn retry_stream( + self, + body: HttpResponseBody, + etag: Option<String>, + range: Range<u64>, + ) -> BoxStream<'static, Result<Bytes>> { + futures::stream::try_unfold( + (self, body, etag, range), + |(mut ctx, mut body, etag, mut range)| async move { + while let Some(ret) = body.frame().await { + match (ret, &etag) { + (Ok(frame), _) => match frame.into_data() { + Ok(bytes) => { + range.start += bytes.len() as u64; + return Ok(Some((bytes, (ctx, body, etag, range)))); + } + Err(_) => continue, // Isn't data frame + }, + // Retry all response body errors + (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => { + let sleep = ctx.retry_ctx.backoff(); Review Comment: This strategy will make many classes of transient error less severe which is great However, it is still possible to get timeout errors when actively reading a streaming response body as the retry mechanism kicks in after timeout, even though progress was made At some point, it may make sense to detect when progress was made and adjust the remaining retries / policy accordingly. - I filed https://github.com/apache/arrow-rs-object-store/issues/386 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org