This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new cdb7b6f15bd Update reqwest 0.12 and http 1.0 (#5536)
cdb7b6f15bd is described below

commit cdb7b6f15bd4cfb67adc51640186bd6aa86913f5
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Mar 29 03:51:47 2024 +0000

    Update reqwest 0.12 and http 1.0 (#5536)
---
 object_store/Cargo.toml                |  8 ++--
 object_store/src/aws/credential.rs     | 18 ++++----
 object_store/src/azure/credential.rs   | 28 ++++++------
 object_store/src/client/mock_server.rs | 82 ++++++++++++++++++++--------------
 object_store/src/client/retry.rs       | 35 ++++++++-------
 object_store/src/parse.rs              |  6 +--
 6 files changed, 99 insertions(+), 78 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index a1e80ce51de..79813a0ea1f 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -45,12 +45,12 @@ walkdir = "2"
 
 # Cloud storage support
 base64 = { version = "0.22", default-features = false, features = ["std"], 
optional = true }
-hyper = { version = "0.14", default-features = false, optional = true }
+hyper = { version = "1.2", default-features = false, optional = true }
 quick-xml = { version = "0.31.0", features = ["serialize", 
"overlapped-lists"], optional = true }
 serde = { version = "1.0", default-features = false, features = ["derive"], 
optional = true }
 serde_json = { version = "1.0", default-features = false, optional = true }
 rand = { version = "0.8", default-features = false, features = ["std", 
"std_rng"], optional = true }
-reqwest = { version = "0.11", default-features = false, features = 
["rustls-tls-native-roots"], optional = true }
+reqwest = { version = "0.12", default-features = false, features = 
["rustls-tls-native-roots", "http2"], optional = true }
 ring = { version = "0.17", default-features = false, features = ["std"], 
optional = true }
 rustls-pemfile = { version = "2.0", default-features = false, features = 
["std"], optional = true }
 tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", 
"io-util"] }
@@ -69,7 +69,9 @@ tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
 
 [dev-dependencies] # In alphabetical order
 futures-test = "0.3"
-hyper = { version = "0.14.24", features = ["server"] }
+hyper = { version = "1.2", features = ["server"] }
+hyper-util = "0.1"
+http-body-util = "0.1"
 rand = "0.8"
 tempfile = "3.1.0"
 
diff --git a/object_store/src/aws/credential.rs 
b/object_store/src/aws/credential.rs
index f8614f4f563..dd7fa5b41da 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -738,7 +738,7 @@ struct CreateSessionOutput {
 mod tests {
     use super::*;
     use crate::client::mock_server::MockServer;
-    use hyper::{Body, Response};
+    use hyper::Response;
     use reqwest::{Client, Method};
     use std::env;
 
@@ -939,7 +939,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_mock() {
-        let server = MockServer::new();
+        let server = MockServer::new().await;
 
         const IMDSV2_HEADER: &str = "X-aws-ec2-metadata-token";
 
@@ -955,7 +955,7 @@ mod tests {
         server.push_fn(|req| {
             assert_eq!(req.uri().path(), "/latest/api/token");
             assert_eq!(req.method(), &Method::PUT);
-            Response::new(Body::from("cupcakes"))
+            Response::new("cupcakes".to_string())
         });
         server.push_fn(|req| {
             assert_eq!(
@@ -965,14 +965,14 @@ mod tests {
             assert_eq!(req.method(), &Method::GET);
             let t = 
req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
             assert_eq!(t, "cupcakes");
-            Response::new(Body::from("myrole"))
+            Response::new("myrole".to_string())
         });
         server.push_fn(|req| {
             assert_eq!(req.uri().path(), 
"/latest/meta-data/iam/security-credentials/myrole");
             assert_eq!(req.method(), &Method::GET);
             let t = 
req.headers().get(IMDSV2_HEADER).unwrap().to_str().unwrap();
             assert_eq!(t, "cupcakes");
-            
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
+            
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
         });
 
         let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -989,7 +989,7 @@ mod tests {
             assert_eq!(req.method(), &Method::PUT);
             Response::builder()
                 .status(StatusCode::FORBIDDEN)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap()
         });
         server.push_fn(|req| {
@@ -999,13 +999,13 @@ mod tests {
             );
             assert_eq!(req.method(), &Method::GET);
             assert!(req.headers().get(IMDSV2_HEADER).is_none());
-            Response::new(Body::from("myrole"))
+            Response::new("myrole".to_string())
         });
         server.push_fn(|req| {
             assert_eq!(req.uri().path(), 
"/latest/meta-data/iam/security-credentials/myrole");
             assert_eq!(req.method(), &Method::GET);
             assert!(req.headers().get(IMDSV2_HEADER).is_none());
-            
Response::new(Body::from(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#))
+            
Response::new(r#"{"AccessKeyId":"KEYID","Code":"Success","Expiration":"2022-08-30T10:51:04Z","LastUpdated":"2022-08-30T10:21:04Z","SecretAccessKey":"SECRET","Token":"TOKEN","Type":"AWS-HMAC"}"#.to_string())
         });
 
         let creds = instance_creds(&client, &retry_config, endpoint, true)
@@ -1020,7 +1020,7 @@ mod tests {
         server.push(
             Response::builder()
                 .status(StatusCode::FORBIDDEN)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
diff --git a/object_store/src/azure/credential.rs 
b/object_store/src/azure/credential.rs
index 9360831974c..6dc3141b08c 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -930,8 +930,8 @@ impl CredentialProvider for AzureCliCredential {
 #[cfg(test)]
 mod tests {
     use futures::executor::block_on;
-    use hyper::body::to_bytes;
-    use hyper::{Body, Response, StatusCode};
+    use http_body_util::BodyExt;
+    use hyper::{Response, StatusCode};
     use reqwest::{Client, Method};
     use tempfile::NamedTempFile;
 
@@ -942,7 +942,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_managed_identity() {
-        let server = MockServer::new();
+        let server = MockServer::new().await;
 
         std::env::set_var(MSI_SECRET_ENV_KEY, "env-secret");
 
@@ -964,7 +964,7 @@ mod tests {
             assert_eq!(t, "env-secret");
             let t = req.headers().get("metadata").unwrap().to_str().unwrap();
             assert_eq!(t, "true");
-            Response::new(Body::from(
+            Response::new(
                 r#"
             {
                 "access_token": "TOKEN",
@@ -975,8 +975,9 @@ mod tests {
                 "resource": "https://management.azure.com/";,
                 "token_type": "Bearer"
               }
-            "#,
-            ))
+            "#
+                .to_string(),
+            )
         });
 
         let credential = ImdsManagedIdentityProvider::new(
@@ -999,7 +1000,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_workload_identity() {
-        let server = MockServer::new();
+        let server = MockServer::new().await;
         let tokenfile = NamedTempFile::new().unwrap();
         let tenant = "tenant";
         std::fs::write(tokenfile.path(), "federated-token").unwrap();
@@ -1012,10 +1013,10 @@ mod tests {
         server.push_fn(move |req| {
             assert_eq!(req.uri().path(), 
format!("/{tenant}/oauth2/v2.0/token"));
             assert_eq!(req.method(), &Method::POST);
-            let body = block_on(to_bytes(req.into_body())).unwrap();
+            let body = block_on(async move { 
req.into_body().collect().await.unwrap().to_bytes() });
             let body = String::from_utf8(body.to_vec()).unwrap();
             assert!(body.contains("federated-token"));
-            Response::new(Body::from(
+            Response::new(
                 r#"
             {
                 "access_token": "TOKEN",
@@ -1026,8 +1027,9 @@ mod tests {
                 "resource": "https://management.azure.com/";,
                 "token_type": "Bearer"
               }
-            "#,
-            ))
+            "#
+                .to_string(),
+            )
         });
 
         let credential = WorkloadIdentityOAuthProvider::new(
@@ -1050,7 +1052,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_no_credentials() {
-        let server = MockServer::new();
+        let server = MockServer::new().await;
 
         let endpoint = server.url();
         let store = MicrosoftAzureBuilder::new()
@@ -1068,7 +1070,7 @@ mod tests {
             assert!(req.headers().get("Authorization").is_none());
             Response::builder()
                 .status(StatusCode::NOT_FOUND)
-                .body(Body::from("not found"))
+                .body("not found".to_string())
                 .unwrap()
         });
 
diff --git a/object_store/src/client/mock_server.rs 
b/object_store/src/client/mock_server.rs
index 70b856186d7..aa5a9e0ab4d 100644
--- a/object_store/src/client/mock_server.rs
+++ b/object_store/src/client/mock_server.rs
@@ -17,18 +17,23 @@
 
 use futures::future::BoxFuture;
 use futures::FutureExt;
-use hyper::service::{make_service_fn, service_fn};
-use hyper::{Body, Request, Response, Server};
+use hyper::body::Incoming;
+use hyper::server::conn::http1;
+use hyper::service::service_fn;
+use hyper::{Request, Response};
+use hyper_util::rt::TokioIo;
 use parking_lot::Mutex;
 use std::collections::VecDeque;
 use std::convert::Infallible;
 use std::future::Future;
 use std::net::SocketAddr;
 use std::sync::Arc;
+use tokio::net::TcpListener;
 use tokio::sync::oneshot;
-use tokio::task::JoinHandle;
+use tokio::task::{JoinHandle, JoinSet};
 
-pub type ResponseFn = Box<dyn FnOnce(Request<Body>) -> BoxFuture<'static, 
Response<Body>> + Send>;
+pub type ResponseFn =
+    Box<dyn FnOnce(Request<Incoming>) -> BoxFuture<'static, Response<String>> 
+ Send>;
 
 /// A mock server
 pub struct MockServer {
@@ -39,39 +44,48 @@ pub struct MockServer {
 }
 
 impl MockServer {
-    pub fn new() -> Self {
+    pub async fn new() -> Self {
         let responses: Arc<Mutex<VecDeque<ResponseFn>>> =
             Arc::new(Mutex::new(VecDeque::with_capacity(10)));
 
-        let r = Arc::clone(&responses);
-        let make_service = make_service_fn(move |_conn| {
-            let r = Arc::clone(&r);
-            async move {
-                Ok::<_, Infallible>(service_fn(move |req| {
-                    let r = Arc::clone(&r);
-                    let next = r.lock().pop_front();
-                    async move {
-                        Ok::<_, Infallible>(match next {
-                            Some(r) => r(req).await,
-                            None => Response::new(Body::from("Hello World")),
-                        })
-                    }
-                }))
-            }
-        });
+        let addr = SocketAddr::from(([127, 0, 0, 1], 0));
+        let listener = TcpListener::bind(addr).await.unwrap();
 
-        let (shutdown, rx) = oneshot::channel::<()>();
-        let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 
0))).serve(make_service);
+        let (shutdown, mut rx) = oneshot::channel::<()>();
 
-        let url = format!("http://{}";, server.local_addr());
+        let url = format!("http://{}";, listener.local_addr().unwrap());
 
+        let r = Arc::clone(&responses);
         let handle = tokio::spawn(async move {
-            server
-                .with_graceful_shutdown(async {
-                    rx.await.ok();
-                })
-                .await
-                .unwrap()
+            let mut set = JoinSet::new();
+
+            loop {
+                let (stream, _) = tokio::select! {
+                    conn = listener.accept() => conn.unwrap(),
+                    _ = &mut rx => break,
+                };
+
+                let r = Arc::clone(&r);
+                set.spawn(async move {
+                    let _ = http1::Builder::new()
+                        .serve_connection(
+                            TokioIo::new(stream),
+                            service_fn(move |req| {
+                                let r = Arc::clone(&r);
+                                let next = r.lock().pop_front();
+                                async move {
+                                    Ok::<_, Infallible>(match next {
+                                        Some(r) => r(req).await,
+                                        None => Response::new("Hello 
World".to_string()),
+                                    })
+                                }
+                            }),
+                        )
+                        .await;
+                });
+            }
+
+            set.abort_all();
         });
 
         Self {
@@ -88,14 +102,14 @@ impl MockServer {
     }
 
     /// Add a response
-    pub fn push(&self, response: Response<Body>) {
+    pub fn push(&self, response: Response<String>) {
         self.push_fn(|_| response)
     }
 
     /// Add a response function
     pub fn push_fn<F>(&self, f: F)
     where
-        F: FnOnce(Request<Body>) -> Response<Body> + Send + 'static,
+        F: FnOnce(Request<Incoming>) -> Response<String> + Send + 'static,
     {
         let f = Box::new(|req| async move { f(req) }.boxed());
         self.responses.lock().push_back(f)
@@ -103,8 +117,8 @@ impl MockServer {
 
     pub fn push_async_fn<F, Fut>(&self, f: F)
     where
-        F: FnOnce(Request<Body>) -> Fut + Send + 'static,
-        Fut: Future<Output = Response<Body>> + Send + 'static,
+        F: FnOnce(Request<Incoming>) -> Fut + Send + 'static,
+        Fut: Future<Output = Response<String>> + Send + 'static,
     {
         self.responses.lock().push_back(Box::new(|r| f(r).boxed()))
     }
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index fbd3645d278..c4f52989a52 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -259,13 +259,16 @@ impl RetryExt for reqwest::RequestBuilder {
                     Err(e) =>
                     {
                         let mut do_retry = false;
-                        if req.method().is_safe() && e.is_timeout() {
+                        if e.is_connect() || ( req.method().is_safe() && 
e.is_timeout()) {
                             do_retry = true
-                        } else if let Some(source) = e.source() {
-                            if let Some(e) = 
source.downcast_ref::<hyper::Error>() {
-                                if e.is_connect() || e.is_closed() || 
e.is_incomplete_message() {
-                                    do_retry = true;
+                        } else {
+                            let mut source = e.source();
+                            while let Some(e) = source {
+                                if let Some(e) = 
e.downcast_ref::<hyper::Error>() {
+                                    do_retry = e.is_closed() || 
e.is_incomplete_message();
+                                    break
                                 }
+                                source = e.source();
                             }
                         }
 
@@ -305,13 +308,13 @@ mod tests {
     use crate::client::retry::{Error, RetryExt};
     use crate::RetryConfig;
     use hyper::header::LOCATION;
-    use hyper::{Body, Response};
+    use hyper::Response;
     use reqwest::{Client, Method, StatusCode};
     use std::time::Duration;
 
     #[tokio::test]
     async fn test_retry() {
-        let mock = MockServer::new();
+        let mock = MockServer::new().await;
 
         let retry = RetryConfig {
             backoff: Default::default(),
@@ -334,7 +337,7 @@ mod tests {
         mock.push(
             Response::builder()
                 .status(StatusCode::BAD_REQUEST)
-                .body(Body::from("cupcakes"))
+                .body("cupcakes".to_string())
                 .unwrap(),
         );
 
@@ -350,7 +353,7 @@ mod tests {
         mock.push(
             Response::builder()
                 .status(StatusCode::BAD_REQUEST)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -366,7 +369,7 @@ mod tests {
         mock.push(
             Response::builder()
                 .status(StatusCode::BAD_GATEWAY)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -377,7 +380,7 @@ mod tests {
         mock.push(
             Response::builder()
                 .status(StatusCode::NO_CONTENT)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -389,7 +392,7 @@ mod tests {
             Response::builder()
                 .status(StatusCode::FOUND)
                 .header(LOCATION, "/foo")
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -402,7 +405,7 @@ mod tests {
             Response::builder()
                 .status(StatusCode::FOUND)
                 .header(LOCATION, "/bar")
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -416,7 +419,7 @@ mod tests {
                 Response::builder()
                     .status(StatusCode::FOUND)
                     .header(LOCATION, "/bar")
-                    .body(Body::empty())
+                    .body(String::new())
                     .unwrap(),
             );
         }
@@ -428,7 +431,7 @@ mod tests {
         mock.push(
             Response::builder()
                 .status(StatusCode::FOUND)
-                .body(Body::empty())
+                .body(String::new())
                 .unwrap(),
         );
 
@@ -441,7 +444,7 @@ mod tests {
             mock.push(
                 Response::builder()
                     .status(StatusCode::BAD_GATEWAY)
-                    .body(Body::from("ignored"))
+                    .body("ignored".to_string())
                     .unwrap(),
             );
         }
diff --git a/object_store/src/parse.rs b/object_store/src/parse.rs
index 116c2ad2ac0..5549fd3a3e5 100644
--- a/object_store/src/parse.rs
+++ b/object_store/src/parse.rs
@@ -311,14 +311,14 @@ mod tests {
     #[cfg(feature = "http")]
     async fn test_url_http() {
         use crate::client::mock_server::MockServer;
-        use hyper::{header::USER_AGENT, Body, Response};
+        use hyper::{header::USER_AGENT, Response};
 
-        let server = MockServer::new();
+        let server = MockServer::new().await;
 
         server.push_fn(|r| {
             assert_eq!(r.uri().path(), "/foo/bar");
             assert_eq!(r.headers().get(USER_AGENT).unwrap(), "test_url");
-            Response::new(Body::empty())
+            Response::new(String::new())
         });
 
         let test = format!("{}/foo/bar", server.url());

Reply via email to