This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new cdb7b6f15bd Update reqwest 0.12 and http 1.0 (#5536)
cdb7b6f15bd is described below
commit cdb7b6f15bd4cfb67adc51640186bd6aa86913f5
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Mar 29 03:51:47 2024 +0000
Update reqwest 0.12 and http 1.0 (#5536)
---
object_store/Cargo.toml | 8 ++--
object_store/src/aws/credential.rs | 18 ++++----
object_store/src/azure/credential.rs | 28 ++++++------
object_store/src/client/mock_server.rs | 82 ++++++++++++++++++++--------------
object_store/src/client/retry.rs | 35 ++++++++-------
object_store/src/parse.rs | 6 +--
6 files changed, 99 insertions(+), 78 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index a1e80ce51de..79813a0ea1f 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -45,12 +45,12 @@ walkdir = "2"
# Cloud storage support
base64 = { version = "0.22", default-features = false, features = ["std"],
optional = true }
-hyper = { version = "0.14", default-features = false, optional = true }
+hyper = { version = "1.2", default-features = false, optional = true }
quick-xml = { version = "0.31.0", features = ["serialize",
"overlapped-lists"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"],
optional = true }
serde_json = { version = "1.0", default-features = false, optional = true }
rand = { version = "0.8", default-features = false, features = ["std",
"std_rng"], optional = true }
-reqwest = { version = "0.11", default-features = false, features =
["rustls-tls-native-roots"], optional = true }
+reqwest = { version = "0.12", default-features = false, features =
["rustls-tls-native-roots", "http2"], optional = true }
ring = { version = "0.17", default-features = false, features = ["std"],
optional = true }
rustls-pemfile = { version = "2.0", default-features = false, features =
["std"], optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time",
"io-util"] }
@@ -69,7 +69,9 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
[dev-dependencies] # In alphabetical order
futures-test = "0.3"
-hyper = { version = "0.14.24", features = ["server"] }
+hyper = { version = "1.2", features = ["server"] }
+hyper-util = "0.1"
+http-body-util = "0.1"
rand = "0.8"
tempfile = "3.1.0"
diff --git a/object_store/src/aws/credential.rs
b/object_store/src/aws/credential.rs
index f8614f4f563..dd7fa5b41da 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -738,7 +738,7 @@ struct CreateSessionOutput {
mod tests {
use super::*;
use crate::client::mock_server::MockServer;
- use hyper::{Body, Response};
+ use hyper::Response;
use reqwest::{Client, Method};
use std::env;
@@ -939,7 +939,7 @@ mod tests {
#[tokio::test]
async fn test_mock() {
- let server = MockServer::new();
+ let server = MockServer::new().await;
const IMDSV2_HEADER: &str = "X-aws-ec2-metadata-token";
@@ -955,7 +955,7 @@ mod tests {
server.push_fn(|req| {
assert_eq!(req.uri().path(), "/latest/api/token");
assert_eq!(req.method(), &Method::PUT);
- Response::new(Body::from("cupcakes"))
+ Response::new("cupcakes".to_string())
});
server.push_fn(|req| {
assert_eq!(
@@ -965,14 +965,14 @@ mod tests {
assert_eq!(req.method(), &Method::GET);
let t =
req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
assert_eq!(t, "cupcakes");
- Response::new(Body::from("myrole"))
+ Response::new("myrole".to_string())
});
server.push_fn(|req| {
assert_eq!(req.uri().path(),
"/latest/meta-data/iam/security-credentials/myrole");
assert_eq!(req.method(), &Method::GET);
let t =
req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
assert_eq!(t, "cupcakes");
-
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
+
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
});
let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -989,7 +989,7 @@ mod tests {
assert_eq!(req.method(), &Method::PUT);
Response::builder()
.status(StatusCode::FORBIDDEN)
- .body(Body::empty())
+ .body(String::new())
.unwrap()
});
server.push_fn(|req| {
@@ -999,13 +999,13 @@ mod tests {
);
assert_eq!(req.method(), &Method::GET);
assert!(req.headers().get(IMDSV2_HEADER).is_none());
- Response::new(Body::from("myrole"))
+ Response::new("myrole".to_string())
});
server.push_fn(|req| {
assert_eq!(req.uri().path(),
"/latest/meta-data/iam/security-credentials/myrole");
assert_eq!(req.method(), &Method::GET);
assert!(req.headers().get(IMDSV2_HEADER).is_none());
-
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
+
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
});
let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -1020,7 +1020,7 @@ mod tests {
server.push(
Response::builder()
.status(StatusCode::FORBIDDEN)
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
diff --git a/object_store/src/azure/credential.rs
b/object_store/src/azure/credential.rs
index 9360831974c..6dc3141b08c 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -930,8 +930,8 @@ impl CredentialProvider for AzureCliCredential {
#[cfg(test)]
mod tests {
use futures::executor::block_on;
- use hyper::body::to_bytes;
- use hyper::{Body, Response, StatusCode};
+ use http_body_util::BodyExt;
+ use hyper::{Response, StatusCode};
use reqwest::{Client, Method};
use tempfile::NamedTempFile;
@@ -942,7 +942,7 @@ mod tests {
#[tokio::test]
async fn test_managed_identity() {
- let server = MockServer::new();
+ let server = MockServer::new().await;
std::env::set_var(MSI_SECRET_ENV_KEY, "env-secret");
@@ -964,7 +964,7 @@ mod tests {
assert_eq!(t, "env-secret");
let t = req.headers().get("metadata").unwrap().to_str().unwrap();
assert_eq!(t, "true");
- Response::new(Body::from(
+ Response::new(
r#"
{
"access_token": "TOKEN",
@@ -975,8 +975,9 @@ mod tests {
"resource": "https://management.azure.com/",
"token_type": "Bearer"
}
- "#,
- ))
+ "#
+ .to_string(),
+ )
});
let credential = ImdsManagedIdentityProvider::new(
@@ -999,7 +1000,7 @@ mod tests {
#[tokio::test]
async fn test_workload_identity() {
- let server = MockServer::new();
+ let server = MockServer::new().await;
let tokenfile = NamedTempFile::new().unwrap();
let tenant = "tenant";
std::fs::write(tokenfile.path(), "federated-token").unwrap();
@@ -1012,10 +1013,10 @@ mod tests {
server.push_fn(move |req| {
assert_eq!(req.uri().path(),
format!("/{tenant}/oauth2/v2.0/token"));
assert_eq!(req.method(), &Method::POST);
- let body = block_on(to_bytes(req.into_body())).unwrap();
+ let body = block_on(async move {
req.into_body().collect().await.unwrap().to_bytes() });
let body = String::from_utf8(body.to_vec()).unwrap();
assert!(body.contains("federated-token"));
- Response::new(Body::from(
+ Response::new(
r#"
{
"access_token": "TOKEN",
@@ -1026,8 +1027,9 @@ mod tests {
"resource": "https://management.azure.com/",
"token_type": "Bearer"
}
- "#,
- ))
+ "#
+ .to_string(),
+ )
});
let credential = WorkloadIdentityOAuthProvider::new(
@@ -1050,7 +1052,7 @@ mod tests {
#[tokio::test]
async fn test_no_credentials() {
- let server = MockServer::new();
+ let server = MockServer::new().await;
let endpoint = server.url();
let store = MicrosoftAzureBuilder::new()
@@ -1068,7 +1070,7 @@ mod tests {
assert!(req.headers().get("Authorization").is_none());
Response::builder()
.status(StatusCode::NOT_FOUND)
- .body(Body::from("not found"))
+ .body("not found".to_string())
.unwrap()
});
diff --git a/object_store/src/client/mock_server.rs
b/object_store/src/client/mock_server.rs
index 70b856186d7..aa5a9e0ab4d 100644
--- a/object_store/src/client/mock_server.rs
+++ b/object_store/src/client/mock_server.rs
@@ -17,18 +17,23 @@
use futures::future::BoxFuture;
use futures::FutureExt;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Request, Response, Server};
+use hyper::body::Incoming;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
+use tokio::net::TcpListener;
use tokio::sync::oneshot;
-use tokio::task::JoinHandle;
+use tokio::task::{JoinHandle, JoinSet};
-pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static,
Response<Body>> + Send>;
+pub type ResponseFn =
+ Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>>
+ Send>;
/// A mock server
pub struct MockServer {
@@ -39,39 +44,48 @@ pub struct MockServer {
}
impl MockServer {
- pub fn new() -> Self {
+ pub async fn new() -> Self {
let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(10)));
- let r = Arc::clone(&responses);
- let make_service = make_service_fn(move |_conn| {
- let r = Arc::clone(&r);
- async move {
- Ok::<_, Infallible>(service_fn(move |req| {
- let r = Arc::clone(&r);
- let next = r.lock().pop_front();
- async move {
- Ok::<_, Infallible>(match next {
- Some(r) => r(req).await,
- None => Response::new(Body::from("Hello World")),
- })
- }
- }))
- }
- });
+ let addr = SocketAddr::from(([127, 0, 0, 1], 0));
+ let listener = TcpListener::bind(addr).await.unwrap();
- let (shutdown, rx) = oneshot::channel::<()>();
- let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1],
0))).serve(make_service);
+ let (shutdown, mut rx) = oneshot::channel::<()>();
- let url = format!("http://{}", server.local_addr());
+ let url = format!("http://{}", listener.local_addr().unwrap());
+ let r = Arc::clone(&responses);
let handle = tokio::spawn(async move {
- server
- .with_graceful_shutdown(async {
- rx.await.ok();
- })
- .await
- .unwrap()
+ let mut set = JoinSet::new();
+
+ loop {
+ let (stream, _) = tokio::select! {
+ conn = listener.accept() => conn.unwrap(),
+ _ = &mut rx => break,
+ };
+
+ let r = Arc::clone(&r);
+ set.spawn(async move {
+ let _ = http1::Builder::new()
+ .serve_connection(
+ TokioIo::new(stream),
+ service_fn(move |req| {
+ let r = Arc::clone(&r);
+ let next = r.lock().pop_front();
+ async move {
+ Ok::<_, Infallible>(match next {
+ Some(r) => r(req).await,
+ None => Response::new("Hello
World".to_string()),
+ })
+ }
+ }),
+ )
+ .await;
+ });
+ }
+
+ set.abort_all();
});
Self {
@@ -88,14 +102,14 @@ impl MockServer {
}
/// Add a response
- pub fn push(&self, response: Response<Body>) {
+ pub fn push(&self, response: Response<String>) {
self.push_fn(|_| response)
}
/// Add a response function
pub fn push_fn<F>(&self, f: F)
where
- F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
+ F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
{
let f = Box::new(|req| async move { f(req) }.boxed());
self.responses.lock().push_back(f)
@@ -103,8 +117,8 @@ impl MockServer {
pub fn push_async_fn<F, Fut>(&self, f: F)
where
- F: FnOnce(Request<Body>) -> Fut + Send + 'static,
- Fut: Future<Output = Response<Body>> + Send + 'static,
+ F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
+ Fut: Future<Output = Response<String>> + Send + 'static,
{
self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
}
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index fbd3645d278..c4f52989a52 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -259,13 +259,16 @@ impl RetryExt for reqwest::RequestBuilder {
Err(e) =>
{
let mut do_retry = false;
- if req.method().is_safe() && e.is_timeout() {
+ if e.is_connect() || ( req.method().is_safe() &&
e.is_timeout()) {
do_retry = true
- } else if let Some(source) = e.source() {
- if let Some(e) =
source.downcast_ref::<hyper::Error>() {
- if e.is_connect() || e.is_closed() ||
e.is_incomplete_message() {
- do_retry = true;
+ } else {
+ let mut source = e.source();
+ while let Some(e) = source {
+ if let Some(e) =
e.downcast_ref::<hyper::Error>() {
+ do_retry = e.is_closed() ||
e.is_incomplete_message();
+ break
}
+ source = e.source();
}
}
@@ -305,13 +308,13 @@ mod tests {
use crate::client::retry::{Error, RetryExt};
use crate::RetryConfig;
use hyper::header::LOCATION;
- use hyper::{Body, Response};
+ use hyper::Response;
use reqwest::{Client, Method, StatusCode};
use std::time::Duration;
#[tokio::test]
async fn test_retry() {
- let mock = MockServer::new();
+ let mock = MockServer::new().await;
let retry = RetryConfig {
backoff: Default::default(),
@@ -334,7 +337,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::BAD_REQUEST)
- .body(Body::from("cupcakes"))
+ .body("cupcakes".to_string())
.unwrap(),
);
@@ -350,7 +353,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::BAD_REQUEST)
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -366,7 +369,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::BAD_GATEWAY)
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -377,7 +380,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::NO_CONTENT)
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -389,7 +392,7 @@ mod tests {
Response::builder()
.status(StatusCode::FOUND)
.header(LOCATION, "/foo")
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -402,7 +405,7 @@ mod tests {
Response::builder()
.status(StatusCode::FOUND)
.header(LOCATION, "/bar")
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -416,7 +419,7 @@ mod tests {
Response::builder()
.status(StatusCode::FOUND)
.header(LOCATION, "/bar")
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
}
@@ -428,7 +431,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::FOUND)
- .body(Body::empty())
+ .body(String::new())
.unwrap(),
);
@@ -441,7 +444,7 @@ mod tests {
mock.push(
Response::builder()
.status(StatusCode::BAD_GATEWAY)
- .body(Body::from("ignored"))
+ .body("ignored".to_string())
.unwrap(),
);
}
diff --git a/object_store/src/parse.rs b/object_store/src/parse.rs
index 116c2ad2ac0..5549fd3a3e5 100644
--- a/object_store/src/parse.rs
+++ b/object_store/src/parse.rs
@@ -311,14 +311,14 @@ mod tests {
#[cfg(feature = "http")]
async fn test_url_http() {
use crate::client::mock_server::MockServer;
- use hyper::{header::USER_AGENT, Body, Response};
+ use hyper::{header::USER_AGENT, Response};
- let server = MockServer::new();
+ let server = MockServer::new().await;
server.push_fn(|r| {
assert_eq!(r.uri().path(), "/foo/bar");
assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url");
- Response::new(Body::empty())
+ Response::new(String::new())
});
let test = format!("{}/foo/bar", server.url());