tustvold commented on code in PR #7183: URL: https://github.com/apache/arrow-rs/pull/7183#discussion_r1966911968
########## object_store/src/client/body.rs: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::connection::{HttpError, HttpErrorKind}; +use crate::{collect_bytes, PutPayload}; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::StreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::{Body, Frame, SizeHint}; +use serde::de::DeserializeOwned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// An HTTP Request +pub type HttpRequest = http::Request<HttpRequestBody>; + +/// The [`Body`] of an [`HttpRequest`] +#[derive(Debug, Clone)] +pub struct HttpRequestBody(Inner); Review Comment: We define fixed body types as we need HttpClient to be dyn-compatible (it also makes things slightly easier to follow) ########## object_store/src/client/connection.rs: ########## @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::body::{HttpRequest, HttpResponse}; +use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; +use crate::client::HttpResponseBody; +use crate::ClientOptions; +use async_trait::async_trait; +use http::{Method, Uri}; +use http_body_util::BodyExt; +use std::error::Error; +use std::sync::Arc; + +/// An HTTP protocol error +/// +/// Clients should return this when an HTTP request fails to be completed, e.g. because +/// of a connection issue. This does **not** include HTTP requests that are return +/// non 2xx Status Codes, as these should instead be returned as an [`HttpResponse`] +/// with the appropriate status code set. +#[derive(Debug, thiserror::Error)] +#[error("HTTP error: {source}")] +pub struct HttpError { + kind: HttpErrorKind, + #[source] + source: Box<dyn Error + Send + Sync>, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum HttpErrorKind { + /// Request timed out + Timeout, + /// An error occurred whilst connecting to the remote + Connect, + /// An error occurred whilst making the request + Request, + /// An error occurred whilst decoding the response + Decode, + /// The request was aborted + Interrupted, + /// An unknown error occurred + Unknown, +} + +impl HttpError { + /// Create a new [`HttpError`] with the optional status code + pub fn new<E>(kind: HttpErrorKind, e: E) -> Self + where + E: Error + Send + Sync + 'static, + { + Self { + kind, + source: Box::new(e), + } + } + + pub(crate) fn reqwest(e: reqwest::Error) -> Self { + let mut kind = if e.is_timeout() { + HttpErrorKind::Timeout + } else if e.is_connect() { + HttpErrorKind::Connect + } else if e.is_decode() { + HttpErrorKind::Decode + } else { + HttpErrorKind::Unknown + }; + + // Reqwest error variants aren't great, attempt to refine them + let mut source = e.source(); Review Comment: This logic is moved out of retry.rs and reframed in terms of HttpErrorKind ########## object_store/src/client/builder.rs: ########## @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::connection::HttpErrorKind; +use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; +use http::header::{Entry, InvalidHeaderName, InvalidHeaderValue, OccupiedEntry}; +use http::uri::{InvalidUri, PathAndQuery}; +use http::{HeaderMap, HeaderName, HeaderValue, Method, Uri}; +use serde::Serialize; +use std::borrow::Borrow; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RequestBuilderError { + #[error("Invalid URI")] + InvalidUri(#[from] InvalidUri), + + #[error("Invalid Header Value")] + InvalidHeaderValue(#[from] InvalidHeaderValue), + + #[error("Invalid Header Name")] + InvalidHeaderName(#[from] InvalidHeaderName), + + #[error("JSON serialization error")] + SerdeJson(#[from] serde_json::Error), + + #[error("URL serialization error")] + SerdeUrl(#[from] serde_urlencoded::ser::Error), +} + +impl From<RequestBuilderError> for HttpError { + fn from(value: RequestBuilderError) -> Self { + Self::new(HttpErrorKind::Request, value) + } +} + +impl From<std::convert::Infallible> for RequestBuilderError { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +pub(crate) struct HttpRequestBuilder { Review Comment: This is a crate-private builder that provides an interface similar to `reqwest` but which can then be used with the `HttpClient`. Unfortunately `reqwest::Request` is largely opaque and I couldn't see an obvious way to make use of it. ########## object_store/src/client/connection.rs: ########## @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::body::{HttpRequest, HttpResponse}; +use crate::client::builder::{HttpRequestBuilder, RequestBuilderError}; +use crate::client::HttpResponseBody; +use crate::ClientOptions; +use async_trait::async_trait; +use http::{Method, Uri}; +use http_body_util::BodyExt; +use std::error::Error; +use std::sync::Arc; + +/// An HTTP protocol error +/// +/// Clients should return this when an HTTP request fails to be completed, e.g. because +/// of a connection issue. This does **not** include HTTP requests that are return +/// non 2xx Status Codes, as these should instead be returned as an [`HttpResponse`] +/// with the appropriate status code set. +#[derive(Debug, thiserror::Error)] +#[error("HTTP error: {source}")] +pub struct HttpError { + kind: HttpErrorKind, + #[source] + source: Box<dyn Error + Send + Sync>, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum HttpErrorKind { Review Comment: This allows the retry logic to work on top of arbitrary HttpClients ########## object_store/src/client/builder.rs: ########## @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::connection::HttpErrorKind; +use crate::client::{HttpClient, HttpError, HttpRequest, HttpRequestBody}; +use http::header::{Entry, InvalidHeaderName, InvalidHeaderValue, OccupiedEntry}; +use http::uri::{InvalidUri, PathAndQuery}; +use http::{HeaderMap, HeaderName, HeaderValue, Method, Uri}; +use serde::Serialize; +use std::borrow::Borrow; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum RequestBuilderError { + #[error("Invalid URI")] + InvalidUri(#[from] InvalidUri), + + #[error("Invalid Header Value")] + InvalidHeaderValue(#[from] InvalidHeaderValue), + + #[error("Invalid Header Name")] + InvalidHeaderName(#[from] InvalidHeaderName), + + #[error("JSON serialization error")] + SerdeJson(#[from] serde_json::Error), + + #[error("URL serialization error")] + SerdeUrl(#[from] serde_urlencoded::ser::Error), +} + +impl From<RequestBuilderError> for HttpError { + fn from(value: RequestBuilderError) -> Self { + Self::new(HttpErrorKind::Request, value) + } +} + +impl From<std::convert::Infallible> for RequestBuilderError { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +pub(crate) struct HttpRequestBuilder { + client: HttpClient, + request: Result<HttpRequest, RequestBuilderError>, +} + +impl HttpRequestBuilder { + pub(crate) fn new(client: HttpClient) -> Self { + Self { + client, + request: Ok(HttpRequest::new(HttpRequestBody::empty())), + } + } + + pub(crate) fn from_parts(client: HttpClient, request: HttpRequest) -> Self { + Self { + client, + request: Ok(request), + } + } + + pub(crate) fn method(mut self, method: Method) -> Self { + if let Ok(r) = &mut self.request { + *r.method_mut() = method; + } + self + } + + pub(crate) fn uri<U>(mut self, url: U) -> Self + where + U: TryInto<Uri>, + U::Error: Into<RequestBuilderError>, + { + match (url.try_into(), &mut self.request) { + (Ok(uri), Ok(r)) => *r.uri_mut() = uri, + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + pub(crate) fn header<K, V>(mut self, name: K, value: V) -> Self + where + K: TryInto<HeaderName>, + K::Error: Into<RequestBuilderError>, + V: TryInto<HeaderValue>, + V::Error: Into<RequestBuilderError>, + { + match (name.try_into(), value.try_into(), &mut self.request) { + (Ok(name), Ok(value), Ok(r)) => { + r.headers_mut().insert(name, value); + } + (Err(e), _, Ok(_)) => self.request = Err(e.into()), + (_, Err(e), Ok(_)) => self.request = Err(e.into()), + (_, _, Err(_)) => {} + } + self + } + + pub(crate) fn headers(mut self, headers: HeaderMap) -> Self { + if let Ok(ref mut req) = self.request { + // IntoIter of HeaderMap yields (Option<HeaderName>, HeaderValue). + // The first time a name is yielded, it will be Some(name), and if + // there are more values with the same name, the next yield will be + // None. + + let mut prev_entry: Option<OccupiedEntry<'_, _>> = None; + for (key, value) in headers { + match key { + Some(key) => match req.headers_mut().entry(key) { + Entry::Occupied(mut e) => { + e.insert(value); + prev_entry = Some(e); + } + Entry::Vacant(e) => { + let e = e.insert_entry(value); + prev_entry = Some(e); + } + }, + None => match prev_entry { + Some(ref mut entry) => { + entry.append(value); + } + None => unreachable!("HeaderMap::into_iter yielded None first"), + }, + } + } + } + self + } + + #[cfg(feature = "gcp")] + pub(crate) fn bearer_auth(mut self, token: &str) -> Self { + let value = HeaderValue::try_from(format!("Bearer {}", token)); + match (value, &mut self.request) { + (Ok(mut v), Ok(r)) => { + v.set_sensitive(true); + r.headers_mut().insert(http::header::AUTHORIZATION, v); + } + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + #[cfg(any(feature = "aws", feature = "gcp"))] + pub(crate) fn json<S: Serialize>(mut self, s: S) -> Self { + match (serde_json::to_vec(&s), &mut self.request) { + (Ok(json), Ok(request)) => { + *request.body_mut() = json.into(); + } + (Err(e), Ok(_)) => self.request = Err(e.into()), + (_, Err(_)) => {} + } + self + } + + #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] + pub(crate) fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self { + let mut error = None; + if let Ok(ref mut req) = self.request { + let mut out = format!("{}?", req.uri().path()); + let mut encoder = form_urlencoded::Serializer::new(&mut out); + let serializer = serde_urlencoded::Serializer::new(&mut encoder); + + if let Err(err) = query.serialize(serializer) { + error = Some(err.into()); + } + + match PathAndQuery::from_maybe_shared(out) { + Ok(p) => { + let mut parts = req.uri().clone().into_parts(); + parts.path_and_query = Some(p); + *req.uri_mut() = Uri::from_parts(parts).unwrap(); + } + Err(err) => error = Some(err.into()), + } + } + if let Some(err) = error { + self.request = Err(err); + } + self + } + + #[cfg(any(feature = "gcp", feature = "azure"))] + pub(crate) fn form<T: Serialize>(mut self, form: T) -> Self { + let mut error = None; + if let Ok(ref mut req) = self.request { + match serde_urlencoded::to_string(form) { + Ok(body) => { + req.headers_mut().insert( + http::header::CONTENT_TYPE, + HeaderValue::from_static("application/x-www-form-urlencoded"), + ); + *req.body_mut() = body.into(); + } + Err(err) => error = Some(err.into()), + } + } + if let Some(err) = error { + self.request = Err(err); + } + self + } + + pub(crate) fn body(mut self, b: impl Into<HttpRequestBody>) -> Self { + if let Ok(r) = &mut self.request { + *r.body_mut() = b.into(); + } + self + } + + pub(crate) fn into_parts(self) -> (HttpClient, Result<HttpRequest, RequestBuilderError>) { + (self.client, self.request) + } +} + +pub(crate) fn add_query_pairs<I, K, V>(uri: &mut Uri, query_pairs: I) Review Comment: It is a shame that http::Request uses http::Uri which lacks the ergonomic niceities of Url, so we require some additional shenanigans ########## object_store/src/client/retry.rs: ########## @@ -460,12 +443,12 @@ pub(crate) trait RetryExt { /// # Panic /// /// This will panic if the request body is a stream - fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<Response>>; + fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, Result<HttpResponse>>; } -impl RetryExt for reqwest::RequestBuilder { +impl RetryExt for HttpRequestBuilder { Review Comment: We could theoretically remove the extension trait, as HttpRequestBuilder, is not a foreign type. I opted to keep it for now though. ########## object_store/src/client/retry.rs: ########## @@ -18,60 +18,118 @@ //! A shared HTTP client implementation incorporating retries use crate::client::backoff::{Backoff, BackoffConfig}; +use crate::client::builder::HttpRequestBuilder; +use crate::client::connection::HttpErrorKind; +use crate::client::{HttpClient, HttpError, HttpRequest, HttpResponse}; use crate::PutPayload; use futures::future::BoxFuture; +use http::{Method, Uri}; use reqwest::header::LOCATION; -use reqwest::{Client, Request, Response, StatusCode}; -use std::error::Error as StdError; +use reqwest::StatusCode; use std::time::{Duration, Instant}; -use tracing::{debug, info}; +use tracing::info; /// Retry request error #[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("Received redirect without LOCATION, this normally indicates an incorrectly configured region")] +pub struct RetryError { + method: Method, + uri: Option<Uri>, + retries: usize, + max_retries: usize, + elapsed: Duration, + retry_timeout: Duration, + inner: RequestError, +} + +impl std::fmt::Display for RetryError { Review Comment: This somewhat lays the ground work for #6377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
