This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8eea918a5 Fix retry logic (#2573) (#2572) (#2574)
8eea918a5 is described below
commit 8eea918a537bb0d82c5bc7734df4b8d2a8274268
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Aug 25 14:14:20 2022 +0100
Fix retry logic (#2573) (#2572) (#2574)
* Fix retry logic (#2573) (#2572)
* Fix logical conflicts
* Rework tests
---
object_store/Cargo.toml | 1 +
object_store/src/aws/client.rs | 50 ++++----
object_store/src/aws/mod.rs | 2 +-
object_store/src/azure/client.rs | 37 +++---
object_store/src/azure/mod.rs | 2 +-
object_store/src/client/oauth.rs | 13 ++-
object_store/src/client/retry.rs | 242 +++++++++++++++++++++++++++++++++++++--
object_store/src/gcp.rs | 83 ++++----------
8 files changed, 297 insertions(+), 133 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index b5c5ef6ed..2be233c83 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -63,3 +63,4 @@ dotenv = "0.15.0"
tempfile = "3.1.0"
futures-test = "0.3"
rand = "0.8"
+hyper = { version = "0.14", features = ["server"] }
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 36ba9ad12..d8ab3bba8 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -52,36 +52,48 @@ const STRICT_PATH_ENCODE_SET: AsciiSet =
STRICT_ENCODE_SET.remove(b'/');
pub(crate) enum Error {
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error fetching get response body {}: {}", path, source))]
+ GetResponseBody {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing put request {}: {}", path, source))]
PutRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing list request: {}", source))]
- ListRequest { source: reqwest::Error },
+ ListRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting list response body: {}", source))]
+ ListResponseBody { source: reqwest::Error },
#[snafu(display("Error performing create multipart request: {}", source))]
- CreateMultipartRequest { source: reqwest::Error },
+ CreateMultipartRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting create multipart response body: {}",
source))]
+ CreateMultipartResponseBody { source: reqwest::Error },
#[snafu(display("Error performing complete multipart request: {}",
source))]
- CompleteMultipartRequest { source: reqwest::Error },
+ CompleteMultipartRequest { source: crate::client::retry::Error },
#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },
@@ -259,10 +271,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
@@ -290,10 +298,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(PutRequestSnafu {
path: path.as_ref(),
})?;
@@ -316,10 +320,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
@@ -339,10 +339,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
- .context(CopyRequestSnafu {
- path: from.as_ref(),
- })?
- .error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
@@ -385,11 +381,9 @@ impl S3Client {
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
- .error_for_status()
- .context(ListRequestSnafu)?
.bytes()
.await
- .context(ListRequestSnafu)?;
+ .context(ListResponseBodySnafu)?;
let mut response: ListResponse =
quick_xml::de::from_reader(response.reader())
.context(InvalidListResponseSnafu)?;
@@ -430,11 +424,9 @@ impl S3Client {
.send_retry(&self.config.retry_config)
.await
.context(CreateMultipartRequestSnafu)?
- .error_for_status()
- .context(CreateMultipartRequestSnafu)?
.bytes()
.await
- .context(CreateMultipartRequestSnafu)?;
+ .context(CreateMultipartResponseBodySnafu)?;
let response: InitiateMultipart =
quick_xml::de::from_reader(response.reader())
.context(InvalidMultipartResponseSnafu)?;
@@ -470,8 +462,6 @@ impl S3Client {
.with_aws_sigv4(credential.as_ref(), &self.config.region, "s3")
.send_retry(&self.config.retry_config)
.await
- .context(CompleteMultipartRequestSnafu)?
- .error_for_status()
.context(CompleteMultipartRequestSnafu)?;
Ok(())
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index f7955ade1..ab90afa5d 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -175,7 +175,7 @@ impl ObjectStore for AmazonS3 {
.await?
.bytes()
.await
- .map_err(|source| client::Error::GetRequest {
+ .map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5f37ea951..722f6768a 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -41,30 +41,39 @@ use url::Url;
pub(crate) enum Error {
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error getting get response body {}: {}", path, source))]
+ GetResponseBody {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing put request {}: {}", path, source))]
PutRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing list request: {}", source))]
- ListRequest { source: reqwest::Error },
+ ListRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting list response body: {}", source))]
+ ListResponseBody { source: reqwest::Error },
#[snafu(display("Error performing create multipart request: {}", source))]
CreateMultipartRequest { source: reqwest::Error },
@@ -218,10 +227,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(PutRequestSnafu {
path: path.as_ref(),
})?;
@@ -259,10 +264,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
@@ -286,10 +287,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
@@ -328,10 +325,6 @@ impl AzureClient {
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
- .context(CopyRequestSnafu {
- path: from.as_ref(),
- })?
- .error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
@@ -373,11 +366,9 @@ impl AzureClient {
.send_retry(&self.config.retry_config)
.await
.context(ListRequestSnafu)?
- .error_for_status()
- .context(ListRequestSnafu)?
.bytes()
.await
- .context(ListRequestSnafu)?;
+ .context(ListResponseBodySnafu)?;
let mut response: ListResultInternal =
quick_xml::de::from_reader(response.reader())
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 53e7ed606..c659e1f80 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -188,7 +188,7 @@ impl ObjectStore for MicrosoftAzure {
.await?
.bytes()
.await
- .map_err(|source| client::Error::GetRequest {
+ .map_err(|source| client::Error::GetResponseBody {
source,
path: location.to_string(),
})?;
diff --git a/object_store/src/client/oauth.rs b/object_store/src/client/oauth.rs
index 220940629..6b3acea10 100644
--- a/object_store/src/client/oauth.rs
+++ b/object_store/src/client/oauth.rs
@@ -45,7 +45,10 @@ pub enum Error {
UnsupportedKey { encoding: String },
#[snafu(display("Error performing token request: {}", source))]
- TokenRequest { source: reqwest::Error },
+ TokenRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting token response body: {}", source))]
+ TokenResponseBody { source: reqwest::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -181,11 +184,9 @@ impl OAuthProvider {
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
- .error_for_status()
- .context(TokenRequestSnafu)?
.json()
.await
- .context(TokenRequestSnafu)?;
+ .context(TokenResponseBodySnafu)?;
let token = TemporaryToken {
token: response.access_token,
@@ -289,10 +290,10 @@ impl ClientSecretOAuthProvider {
.await
.context(TokenRequestSnafu)?
.error_for_status()
- .context(TokenRequestSnafu)?
+ .context(TokenResponseBodySnafu)?
.json()
.await
- .context(TokenRequestSnafu)?;
+ .context(TokenResponseBodySnafu)?;
let token = TemporaryToken {
token: response.access_token,
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index c4dd6ee93..44d7835a5 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -20,10 +20,55 @@
use crate::client::backoff::{Backoff, BackoffConfig};
use futures::future::BoxFuture;
use futures::FutureExt;
-use reqwest::{Response, Result};
+use reqwest::{Response, StatusCode};
+use snafu::Snafu;
use std::time::{Duration, Instant};
use tracing::info;
+/// Retry request error
+#[derive(Debug, Snafu)]
+#[snafu(display(
+ "response error \"{}\", after {} retries: {}",
+ message,
+ retries,
+ source
+))]
+pub struct Error {
+ retries: usize,
+ message: String,
+ source: reqwest::Error,
+}
+
+impl Error {
+ /// Returns the status code associated with this error if any
+ pub fn status(&self) -> Option<StatusCode> {
+ self.source.status()
+ }
+}
+
+impl From<Error> for std::io::Error {
+ fn from(err: Error) -> Self {
+ use std::io::ErrorKind;
+ if err.source.is_builder() || err.source.is_request() {
+ Self::new(ErrorKind::InvalidInput, err)
+ } else if let Some(s) = err.source.status() {
+ match s {
+ StatusCode::NOT_FOUND => Self::new(ErrorKind::NotFound, err),
+ StatusCode::BAD_REQUEST => Self::new(ErrorKind::InvalidInput,
err),
+ _ => Self::new(ErrorKind::Other, err),
+ }
+ } else if err.source.is_timeout() {
+ Self::new(ErrorKind::TimedOut, err)
+ } else if err.source.is_connect() {
+ Self::new(ErrorKind::NotConnected, err)
+ } else {
+ Self::new(ErrorKind::Other, err)
+ }
+ }
+}
+
+pub type Result<T, E = Error> = std::result::Result<T, E>;
+
/// Contains the configuration for how to respond to server errors
///
/// By default they will be retried up to some limit, using exponential
@@ -85,22 +130,195 @@ impl RetryExt for reqwest::RequestBuilder {
loop {
let s = self.try_clone().expect("request body must be
cloneable");
match s.send().await {
- Err(e)
- if retries < max_retries
- && now.elapsed() < retry_timeout
- && e.status()
- .map(|s| s.is_server_error())
- .unwrap_or(false) =>
+ Ok(r) => match r.error_for_status_ref() {
+ Ok(_) => return Ok(r),
+ Err(e) => {
+ let status = r.status();
+
+ if retries == max_retries
+ || now.elapsed() > retry_timeout
+ || !status.is_server_error() {
+
+ // Get the response message if returned a
client error
+ let message = match status.is_client_error() {
+ true => match r.text().await {
+ Ok(message) if !message.is_empty() =>
message,
+ Ok(_) => "No Body".to_string(),
+ Err(e) => format!("error getting
response body: {}", e)
+ }
+ false => status.to_string(),
+ };
+
+ return Err(Error{
+ message,
+ retries,
+ source: e,
+ })
+
+ }
+
+ let sleep = backoff.next();
+ retries += 1;
+ info!("Encountered server error, backing off for
{} seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
+ tokio::time::sleep(sleep).await;
+ }
+ },
+ Err(e) =>
{
- let sleep = backoff.next();
- retries += 1;
- info!("Encountered server error, backing off for {}
seconds, retry {} of {}", sleep.as_secs_f32(), retries, max_retries);
- tokio::time::sleep(sleep).await;
+ return Err(Error{
+ retries,
+ message: "request error".to_string(),
+ source: e
+ })
}
- r => return r,
}
}
}
.boxed()
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::client::retry::RetryExt;
+ use crate::RetryConfig;
+ use hyper::header::LOCATION;
+ use hyper::service::{make_service_fn, service_fn};
+ use hyper::{Body, Response, Server};
+ use parking_lot::Mutex;
+ use reqwest::{Client, Method, StatusCode};
+ use std::collections::VecDeque;
+ use std::convert::Infallible;
+ use std::net::SocketAddr;
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ #[tokio::test]
+ async fn test_retry() {
+ let responses: Arc<Mutex<VecDeque<Response<Body>>>> =
+ Arc::new(Mutex::new(VecDeque::with_capacity(10)));
+
+ let r = Arc::clone(&responses);
+ let make_service = make_service_fn(move |_conn| {
+ let r = Arc::clone(&r);
+ async move {
+ Ok::<_, Infallible>(service_fn(move |_req| {
+ let r = Arc::clone(&r);
+ async move {
+ Ok::<_, Infallible>(match r.lock().pop_front() {
+ Some(r) => r,
+ None => Response::new(Body::from("Hello World")),
+ })
+ }
+ }))
+ }
+ });
+
+ let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+ let server =
+ Server::bind(&SocketAddr::from(([127, 0, 0, 1],
0))).serve(make_service);
+
+ let url = format!("http://{}", server.local_addr());
+
+ let server_handle = tokio::spawn(async move {
+ server
+ .with_graceful_shutdown(async {
+ rx.await.ok();
+ })
+ .await
+ .unwrap()
+ });
+
+ let retry = RetryConfig {
+ backoff: Default::default(),
+ max_retries: 2,
+ retry_timeout: Duration::from_secs(1000),
+ };
+
+ let client = Client::new();
+ let do_request = || client.request(Method::GET,
&url).send_retry(&retry);
+
+ // Simple request should work
+ let r = do_request().await.unwrap();
+ assert_eq!(r.status(), StatusCode::OK);
+
+ // Returns client errors immediately with status message
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::from("cupcakes"))
+ .unwrap(),
+ );
+
+ let e = do_request().await.unwrap_err();
+ assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
+ assert_eq!(e.retries, 0);
+ assert_eq!(&e.message, "cupcakes");
+
+ // Handles client errors with no payload
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::empty())
+ .unwrap(),
+ );
+
+ let e = do_request().await.unwrap_err();
+ assert_eq!(e.status().unwrap(), StatusCode::BAD_REQUEST);
+ assert_eq!(e.retries, 0);
+ assert_eq!(&e.message, "No Body");
+
+ // Should retry server error request
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body(Body::empty())
+ .unwrap(),
+ );
+
+ let r = do_request().await.unwrap();
+ assert_eq!(r.status(), StatusCode::OK);
+
+ // Accepts 204 status code
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())
+ .unwrap(),
+ );
+
+ let r = do_request().await.unwrap();
+ assert_eq!(r.status(), StatusCode::NO_CONTENT);
+
+ // Follows redirects
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::FOUND)
+ .header(LOCATION, "/foo")
+ .body(Body::empty())
+ .unwrap(),
+ );
+
+ let r = do_request().await.unwrap();
+ assert_eq!(r.status(), StatusCode::OK);
+ assert_eq!(r.url().path(), "/foo");
+
+ // Gives up after the retrying the specified number of times
+ for _ in 0..=retry.max_retries {
+ responses.lock().push_back(
+ Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body(Body::from("ignored"))
+ .unwrap(),
+ );
+ }
+
+ let e = do_request().await.unwrap_err();
+ assert_eq!(e.retries, retry.max_retries);
+ assert_eq!(e.message, "502 Bad Gateway");
+
+ // Shutdown
+ let _ = tx.send(());
+ server_handle.await.unwrap();
+ }
+}
diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs
index c9bb63359..e9c7d0249 100644
--- a/object_store/src/gcp.rs
+++ b/object_store/src/gcp.rs
@@ -72,28 +72,40 @@ enum Error {
},
#[snafu(display("Error performing list request: {}", source))]
- ListRequest { source: reqwest::Error },
+ ListRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting list response body: {}", source))]
+ ListResponseBody { source: reqwest::Error },
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error getting get response body {}: {}", path, source))]
+ GetResponseBody {
source: reqwest::Error,
path: String,
},
#[snafu(display("Error performing delete request {}: {}", path, source))]
DeleteRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing copy request {}: {}", path, source))]
CopyRequest {
- source: reqwest::Error,
+ source: crate::client::retry::Error,
path: String,
},
#[snafu(display("Error performing put request: {}", source))]
- PutRequest { source: reqwest::Error },
+ PutRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting put response body: {}", source))]
+ PutResponseBody { source: reqwest::Error },
#[snafu(display("Error decoding object size: {}", source))]
InvalidSize { source: std::num::ParseIntError },
@@ -269,10 +281,6 @@ impl GoogleCloudStorageClient {
.query(&[("alt", alt)])
.send_retry(&self.retry_config)
.await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
@@ -297,8 +305,6 @@ impl GoogleCloudStorageClient {
.body(payload)
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?
- .error_for_status()
.context(PutRequestSnafu)?;
Ok(())
@@ -318,11 +324,9 @@ impl GoogleCloudStorageClient {
.query(&[("uploads", "")])
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?
- .error_for_status()
.context(PutRequestSnafu)?;
- let data = response.bytes().await.context(PutRequestSnafu)?;
+ let data = response.bytes().await.context(PutResponseBodySnafu)?;
let result: InitiateMultipartUploadResult = quick_xml::de::from_reader(
data.as_ref().reader(),
)
@@ -352,8 +356,6 @@ impl GoogleCloudStorageClient {
.query(&[("uploadId", multipart_id)])
.send_retry(&self.retry_config)
.await
- .context(PutRequestSnafu)?
- .error_for_status()
.context(PutRequestSnafu)?;
Ok(())
@@ -369,10 +371,6 @@ impl GoogleCloudStorageClient {
.bearer_auth(token)
.send_retry(&self.retry_config)
.await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?
- .error_for_status()
.context(DeleteRequestSnafu {
path: path.as_ref(),
})?;
@@ -412,10 +410,6 @@ impl GoogleCloudStorageClient {
.bearer_auth(token)
.send_retry(&self.retry_config)
.await
- .context(CopyRequestSnafu {
- path: from.as_ref(),
- })?
- .error_for_status()
.context(CopyRequestSnafu {
path: from.as_ref(),
})?;
@@ -462,11 +456,9 @@ impl GoogleCloudStorageClient {
.send_retry(&self.retry_config)
.await
.context(ListRequestSnafu)?
- .error_for_status()
- .context(ListRequestSnafu)?
.json()
.await
- .context(ListRequestSnafu)?;
+ .context(ListResponseBodySnafu)?;
Ok(response)
}
@@ -489,27 +481,6 @@ impl GoogleCloudStorageClient {
}
}
-fn reqwest_error_as_io(err: reqwest::Error) -> io::Error {
- if err.is_builder() || err.is_request() {
- io::Error::new(io::ErrorKind::InvalidInput, err)
- } else if err.is_status() {
- match err.status() {
- Some(StatusCode::NOT_FOUND) =>
io::Error::new(io::ErrorKind::NotFound, err),
- Some(StatusCode::BAD_REQUEST) => {
- io::Error::new(io::ErrorKind::InvalidInput, err)
- }
- Some(_) => io::Error::new(io::ErrorKind::Other, err),
- None => io::Error::new(io::ErrorKind::Other, err),
- }
- } else if err.is_timeout() {
- io::Error::new(io::ErrorKind::TimedOut, err)
- } else if err.is_connect() {
- io::Error::new(io::ErrorKind::NotConnected, err)
- } else {
- io::Error::new(io::ErrorKind::Other, err)
- }
-}
-
struct GCSMultipartUpload {
client: Arc<GoogleCloudStorageClient>,
encoded_path: String,
@@ -549,10 +520,7 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.header(header::CONTENT_LENGTH, format!("{}", buf.len()))
.body(buf)
.send_retry(&self.client.retry_config)
- .await
- .map_err(reqwest_error_as_io)?
- .error_for_status()
- .map_err(reqwest_error_as_io)?;
+ .await?;
let content_id = response
.headers()
@@ -609,10 +577,7 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.query(&[("uploadId", upload_id)])
.body(data)
.send_retry(&self.client.retry_config)
- .await
- .map_err(reqwest_error_as_io)?
- .error_for_status()
- .map_err(reqwest_error_as_io)?;
+ .await?;
Ok(())
}
@@ -672,14 +637,14 @@ impl ObjectStore for GoogleCloudStorage {
.client
.get_request(location, Some(range), false)
.await?;
- Ok(response.bytes().await.context(GetRequestSnafu {
+ Ok(response.bytes().await.context(GetResponseBodySnafu {
path: location.as_ref(),
})?)
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let response = self.client.get_request(location, None, true).await?;
- let object = response.json().await.context(GetRequestSnafu {
+ let object = response.json().await.context(GetResponseBodySnafu {
path: location.as_ref(),
})?;
convert_object_meta(&object)
@@ -1057,9 +1022,7 @@ mod test {
.unwrap_err()
.to_string();
assert!(
- err.contains(
- "Error performing put request: HTTP status client error (404
Not Found)"
- ),
+ err.contains("HTTP status client error (404 Not Found)"),
"{}",
err
)