This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b69222340b feat(core): abstract HttpFetch trait for raw http client 
(#5184)
b69222340b is described below

commit b69222340b33d38f2fe9396f537f19139e3bae9d
Author: everpcpc <[email protected]>
AuthorDate: Wed Oct 16 17:08:44 2024 +0800

    feat(core): abstract HttpFetch trait for raw http client (#5184)
---
 core/src/raw/http_util/client.rs | 79 ++++++++++++++++++++++++++++++++--------
 core/src/raw/http_util/mod.rs    |  5 +++
 core/src/services/cos/backend.rs |  2 +-
 core/src/services/gcs/backend.rs |  2 +-
 core/src/services/oss/backend.rs |  2 +-
 core/src/services/s3/backend.rs  |  8 ++--
 core/src/services/s3/core.rs     |  2 +-
 7 files changed, 78 insertions(+), 22 deletions(-)

diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 7d756030da..b9eedc2bce 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -19,22 +19,36 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::future;
 use std::mem;
+use std::ops::Deref;
 use std::str::FromStr;
+use std::sync::Arc;
 
+use futures::Future;
 use futures::TryStreamExt;
 use http::Request;
 use http::Response;
+use once_cell::sync::Lazy;
 use raw::oio::Read;
 
 use super::parse_content_encoding;
 use super::parse_content_length;
 use super::HttpBody;
+use crate::raw::*;
 use crate::*;
 
+/// Http client used across opendal for loading credentials.
+/// This is merely a temporary solution because reqsign requires a reqwest 
client to be passed.
+/// We will remove it after the next major version of reqsign, which will 
enable users to provide their own client.
+#[allow(dead_code)]
+pub(crate) static GLOBAL_REQWEST_CLIENT: Lazy<reqwest::Client> = 
Lazy::new(reqwest::Client::new);
+
+/// HttpFetcher is a type erased [`HttpFetch`].
+pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
+
 /// HttpClient that used across opendal.
 #[derive(Clone)]
 pub struct HttpClient {
-    client: reqwest::Client,
+    fetcher: HttpFetcher,
 }
 
 /// We don't want users to know details about our clients.
@@ -47,26 +61,24 @@ impl Debug for HttpClient {
 impl HttpClient {
     /// Create a new http client in async context.
     pub fn new() -> Result<Self> {
-        Self::build(reqwest::ClientBuilder::new())
+        let fetcher = Arc::new(reqwest::Client::new());
+        Ok(Self { fetcher })
     }
 
     /// Construct `Self` with given [`reqwest::Client`]
-    pub fn with(client: reqwest::Client) -> Self {
-        Self { client }
+    pub fn with(client: impl HttpFetch) -> Self {
+        let fetcher = Arc::new(client);
+        Self { fetcher }
     }
 
     /// Build a new http client in async context.
+    #[deprecated]
     pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
-        Ok(Self {
-            client: builder.build().map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "http client build 
failed").set_source(err)
-            })?,
-        })
-    }
-
-    /// Get the async client from http client.
-    pub fn client(&self) -> reqwest::Client {
-        self.client.clone()
+        let client = builder.build().map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "http client build 
failed").set_source(err)
+        })?;
+        let fetcher = Arc::new(client);
+        Ok(Self { fetcher })
     }
 
     /// Send a request in async way.
@@ -78,6 +90,44 @@ impl HttpClient {
 
     /// Fetch a request in async way.
     pub async fn fetch(&self, req: Request<Buffer>) -> 
Result<Response<HttpBody>> {
+        self.fetcher.fetch(req).await
+    }
+}
+
+/// HttpFetch is the trait to fetch a request in async way.
+/// User should implement this trait to provide their own http client.
+pub trait HttpFetch: Send + Sync + Unpin + 'static {
+    /// Fetch a request in async way.
+    fn fetch(
+        &self,
+        req: Request<Buffer>,
+    ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
+}
+
+/// HttpFetchDyn is the dyn version of [`HttpFetch`]
+/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
+/// User should never implement this trait, but use `HttpFetch` instead.
+pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
+    /// The dyn version of [`HttpFetch::fetch`].
+    ///
+    /// This function returns a boxed future to make it object safe.
+    fn fetch_dyn(&self, req: Request<Buffer>) -> 
BoxedFuture<Result<Response<HttpBody>>>;
+}
+
+impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
+    fn fetch_dyn(&self, req: Request<Buffer>) -> 
BoxedFuture<Result<Response<HttpBody>>> {
+        Box::pin(self.fetch(req))
+    }
+}
+
+impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
+    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
+        self.deref().fetch_dyn(req).await
+    }
+}
+
+impl HttpFetch for reqwest::Client {
+    async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
         // Uri stores all string alike data in `Bytes` which means
         // the clone here is cheap.
         let uri = req.uri().clone();
@@ -86,7 +136,6 @@ impl HttpClient {
         let (parts, body) = req.into_parts();
 
         let mut req_builder = self
-            .client
             .request(
                 parts.method,
                 reqwest::Url::from_str(&uri.to_string()).expect("input request 
url must be valid"),
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 965ea0bd49..226fb17b7d 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -24,6 +24,11 @@
 
 mod client;
 pub use client::HttpClient;
+pub use client::HttpFetch;
+
+/// temporary client used by several features
+#[allow(unused_imports)]
+pub(crate) use client::GLOBAL_REQWEST_CLIENT;
 
 mod body;
 pub use body::HttpBody;
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index ff4db12ee4..6065dca95e 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -205,7 +205,7 @@ impl Builder for CosBuilder {
             cfg.secret_key = Some(v);
         }
 
-        let cred_loader = TencentCosCredentialLoader::new(client.client(), 
cfg);
+        let cred_loader = 
TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
 
         let signer = TencentCosSigner::new();
 
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 74d837c4c8..4c7bc7eca6 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -292,7 +292,7 @@ impl Builder for GcsBuilder {
             DEFAULT_GCS_SCOPE
         };
 
-        let mut token_loader = GoogleTokenLoader::new(scope, client.client());
+        let mut token_loader = GoogleTokenLoader::new(scope, 
GLOBAL_REQWEST_CLIENT.clone());
         if let Some(account) = &self.config.service_account {
             token_loader = token_loader.with_service_account(account);
         }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6b064d3682..7e2d67b3ca 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -381,7 +381,7 @@ impl Builder for OssBuilder {
             })?
         };
 
-        let loader = AliyunLoader::new(client.client(), cfg);
+        let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
 
         let signer = AliyunOssSigner::new(bucket);
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c441f8ac45..57b3a2f2fa 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -800,7 +800,8 @@ impl Builder for S3Builder {
         // If role_arn is set, we must use AssumeRoleLoad.
         if let Some(role_arn) = self.config.role_arn {
             // use current env as source credential loader.
-            let default_loader = AwsDefaultLoader::new(client.client(), 
cfg.clone());
+            let default_loader =
+                AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), 
cfg.clone());
 
             // Build the config for assume role.
             let mut assume_role_cfg = AwsConfig {
@@ -817,7 +818,7 @@ impl Builder for S3Builder {
             }
 
             let assume_role_loader = AwsAssumeRoleLoader::new(
-                client.client(),
+                GLOBAL_REQWEST_CLIENT.clone().clone(),
                 assume_role_cfg,
                 Box::new(default_loader),
             )
@@ -835,7 +836,8 @@ impl Builder for S3Builder {
         let loader = match loader {
             Some(v) => v,
             None => {
-                let mut default_loader = 
AwsDefaultLoader::new(client.client(), cfg);
+                let mut default_loader =
+                    
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
                 if self.config.disable_ec2_metadata {
                     default_loader = 
default_loader.with_disable_ec2_metadata();
                 }
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 66bcb14e24..79ba662392 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -114,7 +114,7 @@ impl S3Core {
     async fn load_credential(&self) -> Result<Option<AwsCredential>> {
         let cred = self
             .loader
-            .load_credential(self.client.client())
+            .load_credential(GLOBAL_REQWEST_CLIENT.clone())
             .await
             .map_err(new_request_credential_error)?;
 

Reply via email to