tustvold commented on code in PR #383:
URL: 
https://github.com/apache/arrow-rs-object-store/pull/383#discussion_r2107625817


##########
src/client/get.rs:
##########
@@ -145,40 +156,133 @@ enum GetResultError {
     },
 }
 
-fn get_result<T: GetClient>(
-    location: &Path,
-    range: Option<GetRange>,
-    response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
-    let mut meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+    client: Arc<T>,
+    location: Path,
+    options: GetOptions,
+    retry_ctx: RetryContext,
+}
 
-    // ensure that we receive the range we asked for
-    let range = if let Some(expected) = range {
-        if response.status() != StatusCode::PARTIAL_CONTENT {
-            return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+    async fn get_result(mut self) -> Result<GetResult> {
+        if let Some(r) = &self.options.range {
+            r.is_valid().map_err(Self::err)?;
         }
 
-        let val = response
-            .headers()
-            .get(CONTENT_RANGE)
-            .ok_or(GetResultError::NoContentRange)?;
+        let request = self
+            .client
+            .get_request(&mut self.retry_ctx, &self.location, 
self.options.clone())
+            .await?;
+
+        let (parts, body) = request.into_parts();
+        let (range, meta) = get_range_meta(
+            T::HEADER_CONFIG,
+            &self.location,
+            self.options.range.as_ref(),
+            &parts,
+        )
+        .map_err(Self::err)?;
+
+        let attributes = get_attributes(T::HEADER_CONFIG, 
&parts.headers).map_err(Self::err)?;
+        let stream = self.retry_stream(body, meta.e_tag.clone(), 
range.clone());
+
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            meta,
+            range,
+            attributes,
+        })
+    }
 
-        let value = val
-            .to_str()
-            .map_err(|source| GetResultError::InvalidContentRange { source })?;
+    fn retry_stream(
+        self,
+        body: HttpResponseBody,
+        etag: Option<String>,
+        range: Range<u64>,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        futures::stream::try_unfold(
+            (self, body, etag, range),
+            |(mut ctx, mut body, etag, mut range)| async move {
+                while let Some(ret) = body.frame().await {
+                    match (ret, &etag) {
+                        (Ok(frame), _) => match frame.into_data() {
+                            Ok(bytes) => {
+                                range.start += bytes.len() as u64;
+                                return Ok(Some((bytes, (ctx, body, etag, 
range))));
+                            }
+                            Err(_) => continue, // Isn't data frame
+                        },
+                        // Retry all response body errors
+                        (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
+                            let sleep = ctx.retry_ctx.backoff();
+                            info!(
+                                "Encountered error while reading response 
body: {}. Retrying in {}s",
+                                e,
+                                sleep.as_secs_f32()
+                            );
+
+                            tokio::time::sleep(sleep).await;
+
+                            let options = GetOptions {
+                                range: Some(GetRange::Bounded(range.clone())),
+                                ..ctx.options.clone()
+                            };
+
+                            // Note: this will potentially retry internally if 
applicable
+                            let request = ctx
+                                .client
+                                .get_request(&mut ctx.retry_ctx, 
&ctx.location, options)
+                                .await
+                                .map_err(Self::err)?;
+
+                            let (parts, retry_body) = request.into_parts();
+                            let retry_etag = 
get_etag(&parts.headers).map_err(Self::err)?;
+
+                            if etag != &retry_etag {
+                                // Return the original error
+                                return Err(Self::err(e));
+                            }
+
+                            body = retry_body;
+                        }
+                        (Err(e), _) => return Err(Self::err(e)),
+                    }
+                }
+                Ok(None)
+            },
+        )
+        .map_err(Self::err)
+        .boxed()
+    }
 
-        let value = ContentRange::from_str(value).ok_or_else(|| {
-            let value = value.into();
-            GetResultError::ParseContentRange { value }
-        })?;
+    fn err<E: std::error::Error + Send + Sync + 'static>(e: E) -> crate::Error 
{
+        crate::Error::Generic {
+            store: T::STORE,
+            source: Box::new(e),
+        }
+    }
+}
+
+fn get_range_meta(

Review Comment:
   I opted to split up get_result as it was relatively hard to follow, being a 
single >100 line function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to