This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b69222340b feat(core): abstract HttpFetch trait for raw http client
(#5184)
b69222340b is described below
commit b69222340b33d38f2fe9396f537f19139e3bae9d
Author: everpcpc <[email protected]>
AuthorDate: Wed Oct 16 17:08:44 2024 +0800
feat(core): abstract HttpFetch trait for raw http client (#5184)
---
core/src/raw/http_util/client.rs | 79 ++++++++++++++++++++++++++++++++--------
core/src/raw/http_util/mod.rs | 5 +++
core/src/services/cos/backend.rs | 2 +-
core/src/services/gcs/backend.rs | 2 +-
core/src/services/oss/backend.rs | 2 +-
core/src/services/s3/backend.rs | 8 ++--
core/src/services/s3/core.rs | 2 +-
7 files changed, 78 insertions(+), 22 deletions(-)
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 7d756030da..b9eedc2bce 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -19,22 +19,36 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
+use std::ops::Deref;
use std::str::FromStr;
+use std::sync::Arc;
+use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
+use once_cell::sync::Lazy;
use raw::oio::Read;
use super::parse_content_encoding;
use super::parse_content_length;
use super::HttpBody;
+use crate::raw::*;
use crate::*;
+/// Http client used across opendal for loading credentials.
+/// This is merely a temporary solution because reqsign requires a reqwest
client to be passed.
+/// We will remove it after the next major version of reqsign, which will
enable users to provide their own client.
+#[allow(dead_code)]
+pub(crate) static GLOBAL_REQWEST_CLIENT: Lazy<reqwest::Client> =
Lazy::new(reqwest::Client::new);
+
+/// HttpFetcher is a type erased [`HttpFetch`].
+pub type HttpFetcher = Arc<dyn HttpFetchDyn>;
+
/// HttpClient that used across opendal.
#[derive(Clone)]
pub struct HttpClient {
- client: reqwest::Client,
+ fetcher: HttpFetcher,
}
/// We don't want users to know details about our clients.
@@ -47,26 +61,24 @@ impl Debug for HttpClient {
impl HttpClient {
/// Create a new http client in async context.
pub fn new() -> Result<Self> {
- Self::build(reqwest::ClientBuilder::new())
+ let fetcher = Arc::new(reqwest::Client::new());
+ Ok(Self { fetcher })
}
/// Construct `Self` with given [`reqwest::Client`]
- pub fn with(client: reqwest::Client) -> Self {
- Self { client }
+ pub fn with(client: impl HttpFetch) -> Self {
+ let fetcher = Arc::new(client);
+ Self { fetcher }
}
/// Build a new http client in async context.
+ #[deprecated]
pub fn build(builder: reqwest::ClientBuilder) -> Result<Self> {
- Ok(Self {
- client: builder.build().map_err(|err| {
- Error::new(ErrorKind::Unexpected, "http client build
failed").set_source(err)
- })?,
- })
- }
-
- /// Get the async client from http client.
- pub fn client(&self) -> reqwest::Client {
- self.client.clone()
+ let client = builder.build().map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "http client build
failed").set_source(err)
+ })?;
+ let fetcher = Arc::new(client);
+ Ok(Self { fetcher })
}
/// Send a request in async way.
@@ -78,6 +90,44 @@ impl HttpClient {
/// Fetch a request in async way.
pub async fn fetch(&self, req: Request<Buffer>) ->
Result<Response<HttpBody>> {
+ self.fetcher.fetch(req).await
+ }
+}
+
+/// HttpFetch is the trait to fetch a request in async way.
+/// User should implement this trait to provide their own http client.
+pub trait HttpFetch: Send + Sync + Unpin + 'static {
+ /// Fetch a request in async way.
+ fn fetch(
+ &self,
+ req: Request<Buffer>,
+ ) -> impl Future<Output = Result<Response<HttpBody>>> + MaybeSend;
+}
+
+/// HttpFetchDyn is the dyn version of [`HttpFetch`]
+/// which make it possible to use as `Arc<dyn HttpFetchDyn>`.
+/// User should never implement this trait, but use `HttpFetch` instead.
+pub trait HttpFetchDyn: Send + Sync + Unpin + 'static {
+ /// The dyn version of [`HttpFetch::fetch`].
+ ///
+ /// This function returns a boxed future to make it object safe.
+ fn fetch_dyn(&self, req: Request<Buffer>) ->
BoxedFuture<Result<Response<HttpBody>>>;
+}
+
+impl<T: HttpFetch + ?Sized> HttpFetchDyn for T {
+ fn fetch_dyn(&self, req: Request<Buffer>) ->
BoxedFuture<Result<Response<HttpBody>>> {
+ Box::pin(self.fetch(req))
+ }
+}
+
+impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
+ async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
+ self.deref().fetch_dyn(req).await
+ }
+}
+
+impl HttpFetch for reqwest::Client {
+ async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
// Uri stores all string alike data in `Bytes` which means
// the clone here is cheap.
let uri = req.uri().clone();
@@ -86,7 +136,6 @@ impl HttpClient {
let (parts, body) = req.into_parts();
let mut req_builder = self
- .client
.request(
parts.method,
reqwest::Url::from_str(&uri.to_string()).expect("input request
url must be valid"),
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index 965ea0bd49..226fb17b7d 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -24,6 +24,11 @@
mod client;
pub use client::HttpClient;
+pub use client::HttpFetch;
+
+/// temporary client used by several features
+#[allow(unused_imports)]
+pub(crate) use client::GLOBAL_REQWEST_CLIENT;
mod body;
pub use body::HttpBody;
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index ff4db12ee4..6065dca95e 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -205,7 +205,7 @@ impl Builder for CosBuilder {
cfg.secret_key = Some(v);
}
- let cred_loader = TencentCosCredentialLoader::new(client.client(),
cfg);
+ let cred_loader =
TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
let signer = TencentCosSigner::new();
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 74d837c4c8..4c7bc7eca6 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -292,7 +292,7 @@ impl Builder for GcsBuilder {
DEFAULT_GCS_SCOPE
};
- let mut token_loader = GoogleTokenLoader::new(scope, client.client());
+ let mut token_loader = GoogleTokenLoader::new(scope,
GLOBAL_REQWEST_CLIENT.clone());
if let Some(account) = &self.config.service_account {
token_loader = token_loader.with_service_account(account);
}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 6b064d3682..7e2d67b3ca 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -381,7 +381,7 @@ impl Builder for OssBuilder {
})?
};
- let loader = AliyunLoader::new(client.client(), cfg);
+ let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg);
let signer = AliyunOssSigner::new(bucket);
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index c441f8ac45..57b3a2f2fa 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -800,7 +800,8 @@ impl Builder for S3Builder {
// If role_arn is set, we must use AssumeRoleLoad.
if let Some(role_arn) = self.config.role_arn {
// use current env as source credential loader.
- let default_loader = AwsDefaultLoader::new(client.client(),
cfg.clone());
+ let default_loader =
+ AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(),
cfg.clone());
// Build the config for assume role.
let mut assume_role_cfg = AwsConfig {
@@ -817,7 +818,7 @@ impl Builder for S3Builder {
}
let assume_role_loader = AwsAssumeRoleLoader::new(
- client.client(),
+ GLOBAL_REQWEST_CLIENT.clone().clone(),
assume_role_cfg,
Box::new(default_loader),
)
@@ -835,7 +836,8 @@ impl Builder for S3Builder {
let loader = match loader {
Some(v) => v,
None => {
- let mut default_loader =
AwsDefaultLoader::new(client.client(), cfg);
+ let mut default_loader =
+
AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg);
if self.config.disable_ec2_metadata {
default_loader =
default_loader.with_disable_ec2_metadata();
}
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index 66bcb14e24..79ba662392 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -114,7 +114,7 @@ impl S3Core {
async fn load_credential(&self) -> Result<Option<AwsCredential>> {
let cred = self
.loader
- .load_credential(self.client.client())
+ .load_credential(GLOBAL_REQWEST_CLIENT.clone())
.await
.map_err(new_request_credential_error)?;