This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eea918a5 Fix retry logic (#2573) (#2572) (#2574)
8eea918a5 is described below

commit 8eea918a537bb0d82c5bc7734df4b8d2a8274268
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 25 14:14:20 2022 +0100

    Fix retry logic (#2573) (#2572) (#2574)
    
    * Fix retry logic (#2573) (#2572)
    
    * Fix logical conflicts
    
    * Rework tests
---
 object_store/Cargo.toml          |   1 +
 object_store/src/aws/client.rs   |  50 ++++----
 object_store/src/aws/mod.rs      |   2 +-
 object_store/src/azure/client.rs |  37 +++---
 object_store/src/azure/mod.rs    |   2 +-
 object_store/src/client/oauth.rs |  13 ++-
 object_store/src/client/retry.rs | 242 +++++++++++++++++++++++++++++++++++++--
 object_store/src/gcp.rs          |  83 ++++----------
 8 files changed, 297 insertions(+), 133 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index b5c5ef6ed..2be233c83 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -63,3 +63,4 @@ dotenv = "0.15.0"
 tempfile = "3.1.0"
 futures-test = "0.3"
 rand = "0.8"
+hyper = { version = "0.14", features = ["server"] }
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 36ba9ad12..d8ab3bba8 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -52,36 +52,48 @@ const STRICT_PATH_ENCODE_SET: AsciiSet = 
STRICT_ENCODE_SET.remove(b'/');
 pub(crate) enum Error {
     #[snafu(display("Error performing get request {}: {}", path, source))]
     GetRequest {
+        source: crate::client::retry::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error fetching get response body {}: {}", path, source))]
+    GetResponseBody {
         source: reqwest::Error,
         path: String,
     },
 
     #[snafu(display("Error performing put request {}: {}", path, source))]
     PutRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing delete request {}: {}", path, source))]
     DeleteRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing copy request {}: {}", path, source))]
     CopyRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing list request: {}", source))]
-    ListRequest { source: reqwest::Error },
+    ListRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting list response body: {}", source))]
+    ListResponseBody { source: reqwest::Error },
 
     #[snafu(display("Error performing create multipart request: {}", source))]
-    CreateMultipartRequest { source: reqwest::Error },
+    CreateMultipartRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting create multipart response body: {}", 
source))]
+    CreateMultipartResponseBody { source: reqwest::Error },
 
     #[snafu(display("Error performing complete multipart request: {}", 
source))]
-    CompleteMultipartRequest { source: reqwest::Error },
+    CompleteMultipartRequest { source: crate::client::retry::Error },
 
     #[snafu(display("Got invalid list response: {}", source))]
     InvalidListResponse { source: quick_xml::de::DeError },
@@ -259,10 +271,6 @@ impl S3Client {
             .with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
             .send_retry(&self.config.retry_config)
             .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(GetRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -290,10 +298,6 @@ impl S3Client {
             .with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
             .send_retry(&self.config.retry_config)
             .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(PutRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -316,10 +320,6 @@ impl S3Client {
             .with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
             .send_retry(&self.config.retry_config)
             .await
-            .context(DeleteRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(DeleteRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -339,10 +339,6 @@ impl S3Client {
             .with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
             .send_retry(&self.config.retry_config)
             .await
-            .context(CopyRequestSnafu {
-                path: from.as_ref(),
-            })?
-            .error_for_status()
             .context(CopyRequestSnafu {
                 path: from.as_ref(),
             })?;
@@ -385,11 +381,9 @@ impl S3Client {
             .send_retry(&self.config.retry_config)
             .await
             .context(ListRequestSnafu)?
-            .error_for_status()
-            .context(ListRequestSnafu)?
             .bytes()
             .await
-            .context(ListRequestSnafu)?;
+            .context(ListResponseBodySnafu)?;
 
         let mut response: ListResponse = 
quick_xml::de::from_reader(response.reader())
             .context(InvalidListResponseSnafu)?;
@@ -430,11 +424,9 @@ impl S3Client {
             .send_retry(&self.config.retry_config)
             .await
             .context(CreateMultipartRequestSnafu)?
-            .error_for_status()
-            .context(CreateMultipartRequestSnafu)?
             .bytes()
             .await
-            .context(CreateMultipartRequestSnafu)?;
+            .context(CreateMultipartResponseBodySnafu)?;
 
         let response: InitiateMultipart = 
quick_xml::de::from_reader(response.reader())
             .context(InvalidMultipartResponseSnafu)?;
@@ -470,8 +462,6 @@ impl S3Client {
             .with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
             .send_retry(&self.config.retry_config)
             .await
-            .context(CompleteMultipartRequestSnafu)?
-            .error_for_status()
             .context(CompleteMultipartRequestSnafu)?;
 
         Ok(())
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index f7955ade1..ab90afa5d 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -175,7 +175,7 @@ impl ObjectStore for AmazonS3 {
             .await?
             .bytes()
             .await
-            .map_err(|source| client::Error::GetRequest {
+            .map_err(|source| client::Error::GetResponseBody {
                 source,
                 path: location.to_string(),
             })?;
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5f37ea951..722f6768a 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -41,30 +41,39 @@ use url::Url;
 pub(crate) enum Error {
     #[snafu(display("Error performing get request {}: {}", path, source))]
     GetRequest {
+        source: crate::client::retry::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error getting get response body {}: {}", path, source))]
+    GetResponseBody {
         source: reqwest::Error,
         path: String,
     },
 
     #[snafu(display("Error performing put request {}: {}", path, source))]
     PutRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing delete request {}: {}", path, source))]
     DeleteRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing copy request {}: {}", path, source))]
     CopyRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing list request: {}", source))]
-    ListRequest { source: reqwest::Error },
+    ListRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting list response body: {}", source))]
+    ListResponseBody { source: reqwest::Error },
 
     #[snafu(display("Error performing create multipart request: {}", source))]
     CreateMultipartRequest { source: reqwest::Error },
@@ -218,10 +227,6 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(PutRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -259,10 +264,6 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(GetRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -286,10 +287,6 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .context(DeleteRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(DeleteRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -328,10 +325,6 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .context(CopyRequestSnafu {
-                path: from.as_ref(),
-            })?
-            .error_for_status()
             .context(CopyRequestSnafu {
                 path: from.as_ref(),
             })?;
@@ -373,11 +366,9 @@ impl AzureClient {
             .send_retry(&self.config.retry_config)
             .await
             .context(ListRequestSnafu)?
-            .error_for_status()
-            .context(ListRequestSnafu)?
             .bytes()
             .await
-            .context(ListRequestSnafu)?;
+            .context(ListResponseBodySnafu)?;
 
         let mut response: ListResultInternal =
             quick_xml::de::from_reader(response.reader())
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 53e7ed606..c659e1f80 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -188,7 +188,7 @@ impl ObjectStore for MicrosoftAzure {
             .await?
             .bytes()
             .await
-            .map_err(|source| client::Error::GetRequest {
+            .map_err(|source| client::Error::GetResponseBody {
                 source,
                 path: location.to_string(),
             })?;
diff --git a/object_store/src/client/oauth.rs b/object_store/src/client/oauth.rs
index 220940629..6b3acea10 100644
--- a/object_store/src/client/oauth.rs
+++ b/object_store/src/client/oauth.rs
@@ -45,7 +45,10 @@ pub enum Error {
     UnsupportedKey { encoding: String },
 
     #[snafu(display("Error performing token request: {}", source))]
-    TokenRequest { source: reqwest::Error },
+    TokenRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting token response body: {}", source))]
+    TokenResponseBody { source: reqwest::Error },
 }
 
 pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -181,11 +184,9 @@ impl OAuthProvider {
             .send_retry(retry)
             .await
             .context(TokenRequestSnafu)?
-            .error_for_status()
-            .context(TokenRequestSnafu)?
             .json()
             .await
-            .context(TokenRequestSnafu)?;
+            .context(TokenResponseBodySnafu)?;
 
         let token = TemporaryToken {
             token: response.access_token,
@@ -289,10 +290,10 @@ impl ClientSecretOAuthProvider {
             .await
             .context(TokenRequestSnafu)?
             .error_for_status()
-            .context(TokenRequestSnafu)?
+            .context(TokenResponseBodySnafu)?
             .json()
             .await
-            .context(TokenRequestSnafu)?;
+            .context(TokenResponseBodySnafu)?;
 
         let token = TemporaryToken {
             token: response.access_token,
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index c4dd6ee93..44d7835a5 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -20,10 +20,55 @@
 use crate::client::backoff::{Backoff, BackoffConfig};
 use futures::future::BoxFuture;
 use futures::FutureExt;
-use reqwest::{Response, Result};
+use reqwest::{Response, StatusCode};
+use snafu::Snafu;
 use std::time::{Duration, Instant};
 use tracing::info;
 
+/// Retry request error
+#[derive(Debug, Snafu)]
+#[snafu(display(
+    "response error \"{}\", after {} retries: {}",
+    message,
+    retries,
+    source
+))]
+pub struct Error {
+    retries: usize,
+    message: String,
+    source: reqwest::Error,
+}
+
+impl Error {
+    /// Returns the status code associated with this error if any
+    pub fn status(&self) -> Option<StatusCode> {
+        self.source.status()
+    }
+}
+
+impl From<Error> for std::io::Error {
+    fn from(err: Error) -> Self {
+        use std::io::ErrorKind;
+        if err.source.is_builder() || err.source.is_request() {
+            Self::new(ErrorKind::InvalidInput, err)
+        } else if let Some(s) = err.source.status() {
+            match s {
+                StatusCode::NOT_FOUND => Self::new(ErrorKind::NotFound, err),
+                StatusCode::BAD_REQUEST => Self::new(ErrorKind::InvalidInput, 
err),
+                _ => Self::new(ErrorKind::Other, err),
+            }
+        } else if err.source.is_timeout() {
+            Self::new(ErrorKind::TimedOut, err)
+        } else if err.source.is_connect() {
+            Self::new(ErrorKind::NotConnected, err)
+        } else {
+            Self::new(ErrorKind::Other, err)
+        }
+    }
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
 /// Contains the configuration for how to respond to server errors
 ///
 /// By default they will be retried up to some limit, using exponential
@@ -85,22 +130,195 @@ impl RetryExt for reqwest::RequestBuilder {
             loop {
                 let s = self.try_clone().expect("request body must be 
cloneable");
                 match s.send().await {
-                    Err(e)
-                        if retries < max_retries
-                            && now.elapsed() < retry_timeout
-                            && e.status()
-                                .map(|s| s.is_server_error())
-                                .unwrap_or(false) =>
+                    Ok(r) => match r.error_for_status_ref() {
+                        Ok(_) => return Ok(r),
+                        Err(e) => {
+                            let status = r.status();
+
+                            if retries == max_retries
+                                || now.elapsed() > retry_timeout
+                                || !status.is_server_error() {
+
+                                // Get the response message if returned a 
client error
+                                let message = match status.is_client_error() {
+                                    true => match r.text().await {
+                                        Ok(message) if !message.is_empty() => 
message,
+                                        Ok(_) => "No Body".to_string(),
+                                        Err(e) => format!("error getting 
response body: {}", e)
+                                    }
+                                    false => status.to_string(),
+                                };
+
+                                return Err(Error{
+                                    message,
+                                    retries,
+                                    source: e,
+                                })
+
+                            }
+
+                            let sleep = backoff.next();
+                            retries += 1;
+                            info!("Encountered server error, backing off for 
{} seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
+                            tokio::time::sleep(sleep).await;
+                        }
+                    },
+                    Err(e) =>
                     {
-                        let sleep = backoff.next();
-                        retries += 1;
-                        info!("Encountered server error, backing off for {} 
seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
-                        tokio::time::sleep(sleep).await;
+                        return Err(Error{
+                            retries,
+                            message: "request error".to_string(),
+                            source: e
+                        })
                     }
-                    r => return r,
                 }
             }
         }
         .boxed()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::client::retry::RetryExt;
+    use crate::RetryConfig;
+    use hyper::header::LOCATION;
+    use hyper::service::{make_service_fn, service_fn};
+    use hyper::{Body, Response, Server};
+    use parking_lot::Mutex;
+    use reqwest::{Client, Method, StatusCode};
+    use std::collections::VecDeque;
+    use std::convert::Infallible;
+    use std::net::SocketAddr;
+    use std::sync::Arc;
+    use std::time::Duration;
+
+    #[tokio::test]
+    async fn test_retry() {
+        let responses: Arc<Mutex<VecDeque<Response<Body>>>> =
+            Arc::new(Mutex::new(VecDeque::with_capacity(10)));
+
+        let r = Arc::clone(&responses);
+        let make_service = make_service_fn(move |_conn| {
+            let r = Arc::clone(&r);
+            async move {
+                Ok::<_, Infallible>(service_fn(move |_req| {
+                    let r = Arc::clone(&r);
+                    async move {
+                        Ok::<_, Infallible>(match r.lock().pop_front() {
+                            Some(r) => r,
+                            None => Response::new(Body::from("Hello World")),
+                        })
+                    }
+                }))
+            }
+        });
+
+        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+        let server =
+            Server::bind(&SocketAddr::from(([127, 0, 0, 1], 
0))).serve(make_service);
+
+        let url = format!("http://{}";, server.local_addr());
+
+        let server_handle = tokio::spawn(async move {
+            server
+                .with_graceful_shutdown(async {
+                    rx.await.ok();
+                })
+                .await
+                .unwrap()
+        });
+
+        let retry = RetryConfig {
+            backoff: Default::default(),
+            max_retries: 2,
+            retry_timeout: Duration::from_secs(1000),
+        };
+
+        let client = Client::new();
+        let do_request = || client.request(Method::GET, 
&url).send_retry(&retry);
+
+        // Simple request should work
+        let r = do_request().await.unwrap();
+        assert_eq!(r.status(), StatusCode::OK);
+
+        // Returns client errors immediately with status message
+        responses.lock().push_back(
+            Response::builder()
+                .status(StatusCode::BAD_REQUEST)
+                .body(Body::from("cupcakes"))
+                .unwrap(),
+        );
+
+        let e = do_request().await.unwrap_err();
+        assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
+        assert_eq!(e.retries, 0);
+        assert_eq!(&e.message, "cupcakes");
+
+        // Handles client errors with no payload
+        responses.lock().push_back(
+            Response::builder()
+                .status(StatusCode::BAD_REQUEST)
+                .body(Body::empty())
+                .unwrap(),
+        );
+
+        let e = do_request().await.unwrap_err();
+        assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
+        assert_eq!(e.retries, 0);
+        assert_eq!(&e.message, "No Body");
+
+        // Should retry server error request
+        responses.lock().push_back(
+            Response::builder()
+                .status(StatusCode::BAD_GATEWAY)
+                .body(Body::empty())
+                .unwrap(),
+        );
+
+        let r = do_request().await.unwrap();
+        assert_eq!(r.status(), StatusCode::OK);
+
+        // Accepts 204 status code
+        responses.lock().push_back(
+            Response::builder()
+                .status(StatusCode::NO_CONTENT)
+                .body(Body::empty())
+                .unwrap(),
+        );
+
+        let r = do_request().await.unwrap();
+        assert_eq!(r.status(), StatusCode::NO_CONTENT);
+
+        // Follows redirects
+        responses.lock().push_back(
+            Response::builder()
+                .status(StatusCode::FOUND)
+                .header(LOCATION, "/foo")
+                .body(Body::empty())
+                .unwrap(),
+        );
+
+        let r = do_request().await.unwrap();
+        assert_eq!(r.status(), StatusCode::OK);
+        assert_eq!(r.url().path(), "/foo");
+
+        // Gives up after the retrying the specified number of times
+        for _ in 0..=retry.max_retries {
+            responses.lock().push_back(
+                Response::builder()
+                    .status(StatusCode::BAD_GATEWAY)
+                    .body(Body::from("ignored"))
+                    .unwrap(),
+            );
+        }
+
+        let e = do_request().await.unwrap_err();
+        assert_eq!(e.retries, retry.max_retries);
+        assert_eq!(e.message, "502 Bad Gateway");
+
+        // Shutdown
+        let _ = tx.send(());
+        server_handle.await.unwrap();
+    }
+}
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
index c9bb63359..e9c7d0249 100644
--- a/object_store/src/gcp.rs
+++ b/object_store/src/gcp.rs
@@ -72,28 +72,40 @@ enum Error {
     },
 
     #[snafu(display("Error performing list request: {}", source))]
-    ListRequest { source: reqwest::Error },
+    ListRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting list response body: {}", source))]
+    ListResponseBody { source: reqwest::Error },
 
     #[snafu(display("Error performing get request {}: {}", path, source))]
     GetRequest {
+        source: crate::client::retry::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error getting get response body {}: {}", path, source))]
+    GetResponseBody {
         source: reqwest::Error,
         path: String,
     },
 
     #[snafu(display("Error performing delete request {}: {}", path, source))]
     DeleteRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing copy request {}: {}", path, source))]
     CopyRequest {
-        source: reqwest::Error,
+        source: crate::client::retry::Error,
         path: String,
     },
 
     #[snafu(display("Error performing put request: {}", source))]
-    PutRequest { source: reqwest::Error },
+    PutRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting put response body: {}", source))]
+    PutResponseBody { source: reqwest::Error },
 
     #[snafu(display("Error decoding object size: {}", source))]
     InvalidSize { source: std::num::ParseIntError },
@@ -269,10 +281,6 @@ impl GoogleCloudStorageClient {
             .query(&[("alt", alt)])
             .send_retry(&self.retry_config)
             .await
-            .context(GetRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(GetRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -297,8 +305,6 @@ impl GoogleCloudStorageClient {
             .body(payload)
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?
-            .error_for_status()
             .context(PutRequestSnafu)?;
 
         Ok(())
@@ -318,11 +324,9 @@ impl GoogleCloudStorageClient {
             .query(&[("uploads", "")])
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?
-            .error_for_status()
             .context(PutRequestSnafu)?;
 
-        let data = response.bytes().await.context(PutRequestSnafu)?;
+        let data = response.bytes().await.context(PutResponseBodySnafu)?;
         let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
             data.as_ref().reader(),
         )
@@ -352,8 +356,6 @@ impl GoogleCloudStorageClient {
             .query(&[("uploadId", multipart_id)])
             .send_retry(&self.retry_config)
             .await
-            .context(PutRequestSnafu)?
-            .error_for_status()
             .context(PutRequestSnafu)?;
 
         Ok(())
@@ -369,10 +371,6 @@ impl GoogleCloudStorageClient {
             .bearer_auth(token)
             .send_retry(&self.retry_config)
             .await
-            .context(DeleteRequestSnafu {
-                path: path.as_ref(),
-            })?
-            .error_for_status()
             .context(DeleteRequestSnafu {
                 path: path.as_ref(),
             })?;
@@ -412,10 +410,6 @@ impl GoogleCloudStorageClient {
             .bearer_auth(token)
             .send_retry(&self.retry_config)
             .await
-            .context(CopyRequestSnafu {
-                path: from.as_ref(),
-            })?
-            .error_for_status()
             .context(CopyRequestSnafu {
                 path: from.as_ref(),
             })?;
@@ -462,11 +456,9 @@ impl GoogleCloudStorageClient {
             .send_retry(&self.retry_config)
             .await
             .context(ListRequestSnafu)?
-            .error_for_status()
-            .context(ListRequestSnafu)?
             .json()
             .await
-            .context(ListRequestSnafu)?;
+            .context(ListResponseBodySnafu)?;
 
         Ok(response)
     }
@@ -489,27 +481,6 @@ impl GoogleCloudStorageClient {
     }
 }
 
-fn reqwest_error_as_io(err: reqwest::Error) -> io::Error {
-    if err.is_builder() || err.is_request() {
-        io::Error::new(io::ErrorKind::InvalidInput, err)
-    } else if err.is_status() {
-        match err.status() {
-            Some(StatusCode::NOT_FOUND) => 
io::Error::new(io::ErrorKind::NotFound, err),
-            Some(StatusCode::BAD_REQUEST) => {
-                io::Error::new(io::ErrorKind::InvalidInput, err)
-            }
-            Some(_) => io::Error::new(io::ErrorKind::Other, err),
-            None => io::Error::new(io::ErrorKind::Other, err),
-        }
-    } else if err.is_timeout() {
-        io::Error::new(io::ErrorKind::TimedOut, err)
-    } else if err.is_connect() {
-        io::Error::new(io::ErrorKind::NotConnected, err)
-    } else {
-        io::Error::new(io::ErrorKind::Other, err)
-    }
-}
-
 struct GCSMultipartUpload {
     client: Arc<GoogleCloudStorageClient>,
     encoded_path: String,
@@ -549,10 +520,7 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
             .header(header::CONTENT_LENGTH, format!("{}", buf.len()))
             .body(buf)
             .send_retry(&self.client.retry_config)
-            .await
-            .map_err(reqwest_error_as_io)?
-            .error_for_status()
-            .map_err(reqwest_error_as_io)?;
+            .await?;
 
         let content_id = response
             .headers()
@@ -609,10 +577,7 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
             .query(&[("uploadId", upload_id)])
             .body(data)
             .send_retry(&self.client.retry_config)
-            .await
-            .map_err(reqwest_error_as_io)?
-            .error_for_status()
-            .map_err(reqwest_error_as_io)?;
+            .await?;
 
         Ok(())
     }
@@ -672,14 +637,14 @@ impl ObjectStore for GoogleCloudStorage {
             .client
             .get_request(location, Some(range), false)
             .await?;
-        Ok(response.bytes().await.context(GetRequestSnafu {
+        Ok(response.bytes().await.context(GetResponseBodySnafu {
             path: location.as_ref(),
         })?)
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let response = self.client.get_request(location, None, true).await?;
-        let object = response.json().await.context(GetRequestSnafu {
+        let object = response.json().await.context(GetResponseBodySnafu {
             path: location.as_ref(),
         })?;
         convert_object_meta(&object)
@@ -1057,9 +1022,7 @@ mod test {
             .unwrap_err()
             .to_string();
         assert!(
-            err.contains(
-                "Error performing put request: HTTP status client error (404 
Not Found)"
-            ),
+            err.contains("HTTP status client error (404 Not Found)"),
             "{}",
             err
         )

Reply via email to