tustvold commented on code in PR #383: URL: https://github.com/apache/arrow-rs-object-store/pull/383#discussion_r2107625817
########## src/client/get.rs: ########## @@ -145,40 +156,133 @@ enum GetResultError { }, } -fn get_result<T: GetClient>( - location: &Path, - range: Option<GetRange>, - response: HttpResponse, -) -> Result<GetResult, GetResultError> { - let mut meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?; +/// Retry context for a streaming get request +struct GetContext<T: GetClient> { + client: Arc<T>, + location: Path, + options: GetOptions, + retry_ctx: RetryContext, +} - // ensure that we receive the range we asked for - let range = if let Some(expected) = range { - if response.status() != StatusCode::PARTIAL_CONTENT { - return Err(GetResultError::NotPartial); +impl<T: GetClient> GetContext<T> { + async fn get_result(mut self) -> Result<GetResult> { + if let Some(r) = &self.options.range { + r.is_valid().map_err(Self::err)?; } - let val = response - .headers() - .get(CONTENT_RANGE) - .ok_or(GetResultError::NoContentRange)?; + let request = self + .client + .get_request(&mut self.retry_ctx, &self.location, self.options.clone()) + .await?; + + let (parts, body) = request.into_parts(); + let (range, meta) = get_range_meta( + T::HEADER_CONFIG, + &self.location, + self.options.range.as_ref(), + &parts, + ) + .map_err(Self::err)?; + + let attributes = get_attributes(T::HEADER_CONFIG, &parts.headers).map_err(Self::err)?; + let stream = self.retry_stream(body, meta.e_tag.clone(), range.clone()); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range, + attributes, + }) + } - let value = val - .to_str() - .map_err(|source| GetResultError::InvalidContentRange { source })?; + fn retry_stream( + self, + body: HttpResponseBody, + etag: Option<String>, + range: Range<u64>, + ) -> BoxStream<'static, Result<Bytes>> { + futures::stream::try_unfold( + (self, body, etag, range), + |(mut ctx, mut body, etag, mut range)| async move { + while let Some(ret) = body.frame().await { + match (ret, &etag) { + (Ok(frame), _) => match frame.into_data() { + Ok(bytes) => { + range.start += bytes.len() as u64; + return Ok(Some((bytes, (ctx, body, etag, range)))); + } + Err(_) => continue, // Isn't data frame + }, + // Retry all response body errors + (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => { + let sleep = ctx.retry_ctx.backoff(); + info!( + "Encountered error while reading response body: {}. Retrying in {}s", + e, + sleep.as_secs_f32() + ); + + tokio::time::sleep(sleep).await; + + let options = GetOptions { + range: Some(GetRange::Bounded(range.clone())), + ..ctx.options.clone() + }; + + // Note: this will potentially retry internally if applicable + let request = ctx + .client + .get_request(&mut ctx.retry_ctx, &ctx.location, options) + .await + .map_err(Self::err)?; + + let (parts, retry_body) = request.into_parts(); + let retry_etag = get_etag(&parts.headers).map_err(Self::err)?; + + if etag != &retry_etag { + // Return the original error + return Err(Self::err(e)); + } + + body = retry_body; + } + (Err(e), _) => return Err(Self::err(e)), + } + } + Ok(None) + }, + ) + .map_err(Self::err) + .boxed() + } - let value = ContentRange::from_str(value).ok_or_else(|| { - let value = value.into(); - GetResultError::ParseContentRange { value } - })?; + fn err<E: std::error::Error + Send + Sync + 'static>(e: E) -> crate::Error { + crate::Error::Generic { + store: T::STORE, + source: Box::new(e), + } + } +} + +fn get_range_meta( Review Comment: I opted to split up get_result as it was relatively hard to follow, being a single >100 line function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org