crepererum commented on code in PR #383:
URL: 
https://github.com/apache/arrow-rs-object-store/pull/383#discussion_r2108581276


##########
src/client/get.rs:
##########
@@ -145,40 +156,132 @@ enum GetResultError {
     },
 }
 
-fn get_result<T: GetClient>(
-    location: &Path,
-    range: Option<GetRange>,
-    response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
-    let mut meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+    client: Arc<T>,
+    location: Path,
+    options: GetOptions,
+    retry_ctx: RetryContext,
+}
 
-    // ensure that we receive the range we asked for
-    let range = if let Some(expected) = range {
-        if response.status() != StatusCode::PARTIAL_CONTENT {
-            return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+    async fn get_result(mut self) -> Result<GetResult> {
+        if let Some(r) = &self.options.range {
+            r.is_valid().map_err(Self::err)?;
         }
 
-        let val = response
-            .headers()
-            .get(CONTENT_RANGE)
-            .ok_or(GetResultError::NoContentRange)?;
+        let request = self
+            .client
+            .get_request(&mut self.retry_ctx, &self.location, 
self.options.clone())
+            .await?;
+
+        let (parts, body) = request.into_parts();
+        let (range, meta) = get_range_meta(
+            T::HEADER_CONFIG,
+            &self.location,
+            self.options.range.as_ref(),
+            &parts,
+        )
+        .map_err(Self::err)?;
+
+        let attributes = get_attributes(T::HEADER_CONFIG, 
&parts.headers).map_err(Self::err)?;
+        let stream = self.retry_stream(body, meta.e_tag.clone(), 
range.clone());
+
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            meta,
+            range,
+            attributes,
+        })
+    }
 
-        let value = val
-            .to_str()
-            .map_err(|source| GetResultError::InvalidContentRange { source })?;
+    fn retry_stream(
+        self,
+        body: HttpResponseBody,
+        etag: Option<String>,
+        range: Range<u64>,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        futures::stream::try_unfold(
+            (self, body, etag, range),
+            |(mut ctx, mut body, etag, mut range)| async move {
+                while let Some(ret) = body.frame().await {
+                    match (ret, &etag) {
+                        (Ok(frame), _) => match frame.into_data() {
+                            Ok(bytes) => {
+                                range.start += bytes.len() as u64;
+                                return Ok(Some((bytes, (ctx, body, etag, 
range))));
+                            }
+                            Err(_) => continue, // Isn't data frame
+                        },
+                        // Retry all response body errors
+                        (Err(e), Some(etag)) if !ctx.retry_ctx.exhausted() => {
+                            let sleep = ctx.retry_ctx.backoff();
+                            info!(
+                                "Encountered error while reading response 
body: {}. Retrying in {}s",
+                                e,
+                                sleep.as_secs_f32()
+                            );
+
+                            tokio::time::sleep(sleep).await;
+
+                            let options = GetOptions {
+                                range: Some(GetRange::Bounded(range.clone())),
+                                ..ctx.options.clone()
+                            };
+
+                            // Note: this will potentially retry internally if 
applicable
+                            let request = ctx
+                                .client
+                                .get_request(&mut ctx.retry_ctx, 
&ctx.location, options)
+                                .await
+                                .map_err(Self::err)?;
+
+                            let (parts, retry_body) = request.into_parts();
+                            let retry_etag = 
get_etag(&parts.headers).map_err(Self::err)?;
+
+                            if etag != &retry_etag {

Review Comment:
   :+1: 



##########
src/client/get.rs:
##########
@@ -145,40 +156,132 @@ enum GetResultError {
     },
 }
 
-fn get_result<T: GetClient>(
-    location: &Path,
-    range: Option<GetRange>,
-    response: HttpResponse,
-) -> Result<GetResult, GetResultError> {
-    let mut meta = header_meta(location, response.headers(), 
T::HEADER_CONFIG)?;
+/// Retry context for a streaming get request
+struct GetContext<T: GetClient> {
+    client: Arc<T>,
+    location: Path,
+    options: GetOptions,
+    retry_ctx: RetryContext,
+}
 
-    // ensure that we receive the range we asked for
-    let range = if let Some(expected) = range {
-        if response.status() != StatusCode::PARTIAL_CONTENT {
-            return Err(GetResultError::NotPartial);
+impl<T: GetClient> GetContext<T> {
+    async fn get_result(mut self) -> Result<GetResult> {
+        if let Some(r) = &self.options.range {
+            r.is_valid().map_err(Self::err)?;
         }
 
-        let val = response
-            .headers()
-            .get(CONTENT_RANGE)
-            .ok_or(GetResultError::NoContentRange)?;
+        let request = self
+            .client
+            .get_request(&mut self.retry_ctx, &self.location, 
self.options.clone())
+            .await?;
+
+        let (parts, body) = request.into_parts();
+        let (range, meta) = get_range_meta(
+            T::HEADER_CONFIG,
+            &self.location,
+            self.options.range.as_ref(),
+            &parts,
+        )
+        .map_err(Self::err)?;
+
+        let attributes = get_attributes(T::HEADER_CONFIG, 
&parts.headers).map_err(Self::err)?;
+        let stream = self.retry_stream(body, meta.e_tag.clone(), 
range.clone());
+
+        Ok(GetResult {
+            payload: GetResultPayload::Stream(stream),
+            meta,
+            range,
+            attributes,
+        })
+    }
 
-        let value = val
-            .to_str()
-            .map_err(|source| GetResultError::InvalidContentRange { source })?;
+    fn retry_stream(
+        self,
+        body: HttpResponseBody,
+        etag: Option<String>,
+        range: Range<u64>,
+    ) -> BoxStream<'static, Result<Bytes>> {
+        futures::stream::try_unfold(
+            (self, body, etag, range),
+            |(mut ctx, mut body, etag, mut range)| async move {

Review Comment:
   I think that code is rather elegant :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to