This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 64f0e9ff refactor: Remove not used blocking http client (#1895)
64f0e9ff is described below

commit 64f0e9ff31a70c1bc6dd411ccab3927f1f91927e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 01:07:02 2023 +0800

    refactor: Remove not used blocking http client (#1895)
    
    * refactor: Remove not used blocking http client
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Polish naming
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix API
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/Cargo.toml                      |   1 -
 core/src/raw/http_util/body.rs       |  51 -------------
 core/src/raw/http_util/client.rs     | 137 +++--------------------------------
 core/src/raw/http_util/mod.rs        |   1 -
 core/src/raw/rps.rs                  |  11 +--
 core/src/services/azblob/backend.rs  |  14 ++--
 core/src/services/azblob/writer.rs   |   2 +-
 core/src/services/azdfs/backend.rs   |  10 +--
 core/src/services/azdfs/writer.rs    |   4 +-
 core/src/services/gcs/backend.rs     |  12 +--
 core/src/services/gcs/writer.rs      |   2 +-
 core/src/services/ghac/backend.rs    |  18 ++---
 core/src/services/ghac/writer.rs     |   4 +-
 core/src/services/http/backend.rs    |   4 +-
 core/src/services/ipfs/backend.rs    |   6 +-
 core/src/services/ipmfs/backend.rs   |  12 +--
 core/src/services/obs/backend.rs     |  12 +--
 core/src/services/obs/writer.rs      |   2 +-
 core/src/services/oss/backend.rs     |  18 ++---
 core/src/services/oss/writer.rs      |   4 +-
 core/src/services/s3/backend.rs      |  18 ++---
 core/src/services/s3/writer.rs       |   4 +-
 core/src/services/webdav/backend.rs  |  14 ++--
 core/src/services/webhdfs/backend.rs |  14 ++--
 core/src/services/webhdfs/writer.rs  |   2 +-
 25 files changed, 100 insertions(+), 277 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 17b49a54..48d5fffa 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -122,7 +122,6 @@ reqsign = "0.8.5"
 reqwest = { version = "0.11.13", features = [
   "multipart",
   "stream",
-  "blocking",
 ], default-features = false }
 rocksdb = { version = "0.20.1", default-features = false, optional = true }
 serde = { version = "1", features = ["derive"] }
diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs
index 412143c2..b4d43247 100644
--- a/core/src/raw/http_util/body.rs
+++ b/core/src/raw/http_util/body.rs
@@ -33,57 +33,6 @@ use crate::Error;
 use crate::ErrorKind;
 use crate::Result;
 
-/// Body used in blocking HTTP requests.
-#[derive(Default)]
-pub enum Body {
-    /// An empty body.
-    #[default]
-    Empty,
-    /// Body with bytes.
-    Bytes(Bytes),
-}
-
-impl Body {
-    /// Consume the entire body.
-    pub fn consume(self) -> Result<()> {
-        Ok(())
-    }
-}
-
-impl From<Body> for reqwest::blocking::Body {
-    fn from(v: Body) -> Self {
-        match v {
-            Body::Empty => reqwest::blocking::Body::from(""),
-            Body::Bytes(bs) => reqwest::blocking::Body::from(bs),
-        }
-    }
-}
-
-/// IncomingBody carries the content returned by remote servers.
-///
-/// # Notes
-///
-/// Client SHOULD NEVER construct this body.
-pub struct IncomingBody {
-    /// # TODO
-    ///
-    /// hyper returns `impl Stream<Item = crate::Result<Bytes>>` but we can't
-    /// write the types in stable. So we will box here.
-    ///
-    /// After 
[TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html)
-    /// has been stable, we can change `IncomingAsyncBody` into 
`IncomingAsyncBody<S>`.
-    #[allow(unused)]
-    inner: reqwest::blocking::Response,
-    #[allow(unused)]
-    size: Option<u64>,
-}
-
-impl IncomingBody {
-    pub fn new(resp: reqwest::blocking::Response, size: Option<u64>) -> Self {
-        IncomingBody { inner: resp, size }
-    }
-}
-
 /// Body used in async HTTP requests.
 #[derive(Default)]
 pub enum AsyncBody {
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 8bd2c318..bd4722ff 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -18,7 +18,6 @@
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::str::FromStr;
-use std::thread;
 
 use futures::TryStreamExt;
 use http::Request;
@@ -27,10 +26,8 @@ use reqwest::redirect::Policy;
 use reqwest::Url;
 
 use super::body::IncomingAsyncBody;
-use super::body::IncomingBody;
 use super::parse_content_length;
 use super::AsyncBody;
-use super::Body;
 use crate::Error;
 use crate::ErrorKind;
 use crate::Result;
@@ -38,8 +35,7 @@ use crate::Result;
 /// HttpClient that used across opendal.
 #[derive(Clone)]
 pub struct HttpClient {
-    async_client: reqwest::Client,
-    blocking_client: reqwest::blocking::Client,
+    client: reqwest::Client,
 }
 
 /// We don't want users to know details about our clients.
@@ -52,25 +48,11 @@ impl Debug for HttpClient {
 impl HttpClient {
     /// Create a new http client in async context.
     pub fn new() -> Result<Self> {
-        Self::build(
-            reqwest::ClientBuilder::new(),
-            reqwest::blocking::ClientBuilder::new(),
-        )
+        Self::build(reqwest::ClientBuilder::new())
     }
 
     /// Build a new http client in async context.
-    pub fn build(
-        async_builder: reqwest::ClientBuilder,
-        blocking_builder: reqwest::blocking::ClientBuilder,
-    ) -> Result<Self> {
-        Ok(HttpClient {
-            async_client: Self::build_async_client(async_builder)?,
-            blocking_client: Self::build_blocking_client(blocking_builder)?,
-        })
-    }
-
-    /// Build a new blocking client with given builder.
-    fn build_async_client(mut builder: reqwest::ClientBuilder) -> 
Result<reqwest::Client> {
+    pub fn build(mut builder: reqwest::ClientBuilder) -> Result<Self> {
         // Make sure we don't enable auto gzip decompress.
         builder = builder.no_gzip();
         // Make sure we don't enable auto brotli decompress.
@@ -83,122 +65,25 @@ impl HttpClient {
         #[cfg(feature = "trust-dns")]
         let builder = builder.trust_dns(true);
 
-        builder.build().map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "async client build 
failed").set_source(err)
+        Ok(Self {
+            client: builder.build().map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "async client build 
failed").set_source(err)
+            })?,
         })
     }
 
-    /// Build a new blocking client with given builder.
-    ///
-    /// # Notes
-    ///
-    /// `reqwest::blocking::ClientBuilder::build` will panic if called
-    /// inside async context. So we need to spawn a thread to build it.
-    ///
-    /// And users SHOULD never call blocking clients in async context.
-    fn build_blocking_client(
-        mut builder: reqwest::blocking::ClientBuilder,
-    ) -> Result<reqwest::blocking::Client> {
-        // Make sure we don't enable auto gzip decompress.
-        builder = builder.no_gzip();
-        // Make sure we don't enable auto brotli decompress.
-        builder = builder.no_brotli();
-        // Make sure we don't enable auto deflate decompress.
-        builder = builder.no_deflate();
-        // Redirect will be handled by ourselves.
-        builder = builder.redirect(Policy::none());
-
-        #[cfg(feature = "trust-dns")]
-        let builder = builder.trust_dns(true);
-
-        thread::spawn(|| {
-            builder.build().map_err(|err| {
-                Error::new(ErrorKind::Unexpected, "blocking client build 
failed").set_source(err)
-            })
-        })
-        .join()
-        .expect("the thread of building blocking client join failed")
-    }
-
     /// Get the async client from http client.
-    pub async fn async_client(&self) -> reqwest::Client {
-        self.async_client.clone()
-    }
-
-    /// Get the blocking client from http client.
-    pub fn blocking_client(&self) -> reqwest::blocking::Client {
-        self.blocking_client.clone()
-    }
-
-    /// Send a request in blocking way.
-    pub fn send(&self, req: Request<Body>) -> Result<Response<IncomingBody>> {
-        let is_head = req.method() == http::Method::HEAD;
-        let (parts, body) = req.into_parts();
-
-        let mut req_builder = self
-            .blocking_client
-            .request(
-                parts.method,
-                Url::from_str(&parts.uri.to_string()).expect("input request 
url must be valid"),
-            )
-            .version(parts.version)
-            .headers(parts.headers);
-
-        req_builder = req_builder.body(body);
-
-        let resp = req_builder.send().map_err(|err| {
-            let is_temporary = !(
-                // Builder related error should not be retried.
-                err.is_builder() ||
-                // Error returned by RedirectPolicy.
-                //
-                // We don't set this by hand, just don't allow retry.
-                err.is_redirect() ||
-                 // We never use `Response::error_for_status`, just don't 
allow retry.
-                //
-                // Status should be checked by our services.
-                err.is_status()
-            );
-
-            let mut oerr = Error::new(ErrorKind::Unexpected, "send blocking 
request")
-                .with_operation("http_util::Client::send")
-                .set_source(err);
-            if is_temporary {
-                oerr = oerr.set_temporary();
-            }
-
-            oerr
-        })?;
-
-        // Get content length from header so that we can check it.
-        // If the request method is HEAD, we will ignore this.
-        let content_length = if is_head {
-            None
-        } else {
-            parse_content_length(resp.headers()).expect("response content 
length must be valid")
-        };
-
-        let mut hr = Response::builder()
-            .version(resp.version())
-            .status(resp.status());
-        for (k, v) in resp.headers().iter() {
-            hr = hr.header(k, v);
-        }
-
-        let body = IncomingBody::new(resp, content_length);
-
-        let resp = hr.body(body).expect("response must build succeed");
-
-        Ok(resp)
+    pub fn client(&self) -> reqwest::Client {
+        self.client.clone()
     }
 
     /// Send a request in async way.
-    pub async fn send_async(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
         let is_head = req.method() == http::Method::HEAD;
         let (parts, body) = req.into_parts();
 
         let mut req_builder = self
-            .async_client
+            .client
             .request(
                 parts.method,
                 Url::from_str(&parts.uri.to_string()).expect("input request 
url must be valid"),
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index b2878bd0..9cb60a04 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -27,7 +27,6 @@ pub use client::HttpClient;
 
 mod body;
 pub use body::AsyncBody;
-pub use body::Body;
 pub use body::IncomingAsyncBody;
 
 mod header;
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index 7ce8ba0c..101310e7 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -247,16 +247,7 @@ mod tests {
             },
         };
 
-        let req: Request<AsyncBody> = pr.clone().into();
-        assert_eq!(Method::PATCH, req.method());
-        assert_eq!(
-            "https://opendal.apache.org/path/to/file";,
-            req.uri().to_string()
-        );
-        assert_eq!("123", req.headers().get(CONTENT_LENGTH).unwrap());
-        assert_eq!("application/json", 
req.headers().get(CONTENT_TYPE).unwrap());
-
-        let req: Request<Body> = pr.into();
+        let req: Request<AsyncBody> = pr.into();
         assert_eq!(Method::PATCH, req.method());
         assert_eq!(
             "https://opendal.apache.org/path/to/file";,
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index c5ec262e..20b2d596 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -502,7 +502,7 @@ impl Accessor for AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
 
@@ -706,7 +706,7 @@ impl AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub fn azblob_put_blob_request(
@@ -761,7 +761,7 @@ impl AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn azblob_delete_blob(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -782,7 +782,7 @@ impl AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn azblob_copy_blob(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -809,7 +809,7 @@ impl AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(crate) async fn azblob_list_blobs(
@@ -845,7 +845,7 @@ impl AzblobBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn azblob_batch_delete(&self, paths: &[String]) -> 
Result<Response<IncomingAsyncBody>> {
@@ -881,7 +881,7 @@ impl AzblobBackend {
         let mut req = batch_delete_req_builder.try_into_req()?;
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/azblob/writer.rs 
b/core/src/services/azblob/writer.rs
index e5c0a6f2..8cad24c6 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for AzblobWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index f750d103..5a810ac2 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -342,7 +342,7 @@ impl Accessor for AzdfsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
 
@@ -462,7 +462,7 @@ impl AzdfsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     /// resource should be one of `file` or `directory`
@@ -556,7 +556,7 @@ impl AzdfsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn azdfs_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -579,7 +579,7 @@ impl AzdfsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(crate) async fn azdfs_list(
@@ -613,7 +613,7 @@ impl AzdfsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index 327c82d2..2d85f9fb 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -54,7 +54,7 @@ impl oio::Write for AzdfsWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
         match status {
@@ -77,7 +77,7 @@ impl oio::Write for AzdfsWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
         match status {
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 77314abe..eda03ae6 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -371,7 +371,7 @@ impl Accessor for GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
@@ -420,7 +420,7 @@ impl Accessor for GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         if !resp.status().is_success() {
             return Err(parse_error(resp).await?);
@@ -535,7 +535,7 @@ impl GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub fn gcs_insert_object_request(
@@ -588,7 +588,7 @@ impl GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -607,7 +607,7 @@ impl GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(crate) async fn gcs_list_objects(
@@ -648,7 +648,7 @@ impl GcsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 0b984f74..1dda8d9b 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for GcsWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/ghac/backend.rs 
b/core/src/services/ghac/backend.rs
index 34ff3bc1..a3279b9d 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -319,7 +319,7 @@ impl Accessor for GhacBackend {
 
         let req = self.ghac_reserve(path).await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let cache_id = if resp.status().is_success() {
             let slc = resp.into_body().bytes().await?;
@@ -340,7 +340,7 @@ impl Accessor for GhacBackend {
             .ghac_upload(cache_id, 1, 
AsyncBody::Bytes(Bytes::from_static(&[0])))
             .await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
@@ -351,7 +351,7 @@ impl Accessor for GhacBackend {
         }
 
         let req = self.ghac_commit(cache_id, 1).await?;
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
@@ -366,7 +366,7 @@ impl Accessor for GhacBackend {
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
         let req = self.ghac_query(path).await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let location = if resp.status() == StatusCode::OK {
             let slc = resp.into_body().bytes().await?;
@@ -378,7 +378,7 @@ impl Accessor for GhacBackend {
         };
 
         let req = self.ghac_get_location(&location, args.range()).await?;
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
         match status {
@@ -400,7 +400,7 @@ impl Accessor for GhacBackend {
 
         let req = self.ghac_reserve(path).await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let cache_id = if resp.status().is_success() {
             let slc = resp.into_body().bytes().await?;
@@ -424,7 +424,7 @@ impl Accessor for GhacBackend {
 
         let req = self.ghac_query(path).await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let location = if resp.status() == StatusCode::OK {
             let slc = resp.into_body().bytes().await?;
@@ -438,7 +438,7 @@ impl Accessor for GhacBackend {
         };
 
         let req = self.ghac_head_location(&location).await?;
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
         match status {
@@ -615,7 +615,7 @@ impl GhacBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a650d18d..0f2caa85 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -49,7 +49,7 @@ impl oio::Write for GhacWriter {
             .ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs))
             .await?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
@@ -73,7 +73,7 @@ impl oio::Write for GhacWriter {
 
     async fn close(&mut self) -> Result<()> {
         let req = self.backend.ghac_commit(self.cache_id, self.size).await?;
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
diff --git a/core/src/services/http/backend.rs 
b/core/src/services/http/backend.rs
index 8a0e71bb..39fb10f5 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -318,7 +318,7 @@ impl HttpBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn http_head(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -336,7 +336,7 @@ impl HttpBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/ipfs/backend.rs 
b/core/src/services/ipfs/backend.rs
index da83fdf7..b73108cc 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -414,7 +414,7 @@ impl IpfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn ipfs_head(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -428,7 +428,7 @@ impl IpfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn ipfs_list(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -448,7 +448,7 @@ impl IpfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index 967a6c29..9984a4d1 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -196,7 +196,7 @@ impl IpmfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn ipmfs_read(
@@ -224,7 +224,7 @@ impl IpmfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn ipmfs_rm(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -241,7 +241,7 @@ impl IpmfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(crate) async fn ipmfs_ls(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -258,7 +258,7 @@ impl IpmfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn ipmfs_mkdir(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -275,7 +275,7 @@ impl IpmfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     /// Support write from reader.
@@ -296,7 +296,7 @@ impl IpmfsBackend {
 
         let req = req.body(body).map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 93ee55ff..840956fd 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -326,7 +326,7 @@ impl Accessor for ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
 
@@ -451,7 +451,7 @@ impl ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub fn obs_put_object_request(
@@ -496,7 +496,7 @@ impl ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn obs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -512,7 +512,7 @@ impl ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn obs_copy_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -529,7 +529,7 @@ impl ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(crate) async fn obs_list_objects(
@@ -567,6 +567,6 @@ impl ObsBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 97c16b1d..df7db515 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for ObsWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 1e141519..c6276982 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -739,14 +739,14 @@ impl OssBackend {
         let mut req = self.oss_get_object_request(path, range, false)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn oss_head_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let mut req = self.oss_head_object_request(path, false)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn oss_put_object(
@@ -767,7 +767,7 @@ impl OssBackend {
         )?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn oss_copy_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -787,7 +787,7 @@ impl OssBackend {
             .map_err(new_request_build_error)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub(super) async fn oss_list_object(
@@ -800,13 +800,13 @@ impl OssBackend {
         let mut req = self.oss_list_object_request(path, token, delimiter, 
limit)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn oss_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let mut req = self.oss_delete_object_request(path)?;
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn oss_delete_objects(&self, paths: Vec<String>) -> 
Result<Response<IncomingAsyncBody>> {
@@ -837,7 +837,7 @@ impl OssBackend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     fn get_endpoint(&self, is_presign: bool) -> &str {
@@ -850,7 +850,7 @@ impl OssBackend {
 
     async fn oss_initiate_upload(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let req = self.oss_initiate_upload_request(path, None, None, 
AsyncBody::Empty, false)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     /// Creates a request that initiates multipart upload
@@ -941,7 +941,7 @@ impl OssBackend {
             .map_err(new_request_build_error)?;
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index ce7eb222..cb4ea23b 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -63,7 +63,7 @@ impl oio::Write for OssWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
@@ -96,7 +96,7 @@ impl oio::Write for OssWriter {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
         match resp.status() {
             StatusCode::OK => {
                 let etag = parse_etag(resp.headers())?
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index d27f0049..e68c376f 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1097,7 +1097,7 @@ impl Accessor for S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
 
@@ -1385,7 +1385,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub fn s3_put_object_request(
@@ -1457,7 +1457,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn s3_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -1471,7 +1471,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn s3_copy_object(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -1529,7 +1529,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     /// Make this functions as `pub(suber)` because `DirStream` depends
@@ -1570,7 +1570,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn s3_initiate_multipart_upload(
@@ -1612,7 +1612,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub fn s3_upload_part_request(
@@ -1683,7 +1683,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn s3_delete_objects(&self, paths: Vec<String>) -> 
Result<Response<IncomingAsyncBody>> {
@@ -1714,7 +1714,7 @@ impl S3Backend {
 
         self.signer.sign(&mut req).map_err(new_request_sign_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index bbdb9045..8114f43d 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -70,7 +70,7 @@ impl oio::Write for S3Writer {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
@@ -103,7 +103,7 @@ impl oio::Write for S3Writer {
             .sign(&mut req)
             .map_err(new_request_sign_error)?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index ad80a0b3..2f7fb30c 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -448,7 +448,7 @@ impl WebdavBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     pub async fn webdav_put(
@@ -482,7 +482,7 @@ impl WebdavBackend {
         // Set body
         let req = req.body(body).map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webdav_mkcol(
@@ -509,7 +509,7 @@ impl WebdavBackend {
 
         let req = req.body(body).map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webdav_propfind(
@@ -548,7 +548,7 @@ impl WebdavBackend {
 
         let req = req.body(body).map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webdav_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -566,7 +566,7 @@ impl WebdavBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webdav_copy(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -591,7 +591,7 @@ impl WebdavBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webdav_move(&self, from: &str, to: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -616,7 +616,7 @@ impl WebdavBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn create_internal(&self, abs_path: &str) -> Result<RpCreate> {
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index a9aa64aa..3bd32631 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -288,7 +288,7 @@ impl WebhdfsBackend {
             return Ok(req);
         }
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         // should be a 307 TEMPORARY_REDIRECT
         if resp.status() != StatusCode::TEMPORARY_REDIRECT {
@@ -391,7 +391,7 @@ impl WebhdfsBackend {
         range: BytesRange,
     ) -> Result<Response<IncomingAsyncBody>> {
         let req = self.webhdfs_open_req(path, &range).await?;
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         // this should be a 307 redirect
         if resp.status() != StatusCode::TEMPORARY_REDIRECT {
@@ -402,7 +402,7 @@ impl WebhdfsBackend {
         let re_req = Request::get(&re_url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
-        self.client.send_async(re_req).await
+        self.client.send(re_req).await
     }
 
     async fn webhdfs_status_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -422,7 +422,7 @@ impl WebhdfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 
     async fn webhdfs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -441,7 +441,7 @@ impl WebhdfsBackend {
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
-        self.client.send_async(req).await
+        self.client.send(req).await
     }
 }
 
@@ -564,7 +564,7 @@ impl Accessor for WebhdfsBackend {
             .webhdfs_create_object_req(&path, Some(0), None, AsyncBody::Empty)
             .await?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
 
         let status = resp.status();
 
@@ -666,7 +666,7 @@ impl Accessor for WebhdfsBackend {
         let path = path.trim_end_matches('/');
         let req = self.webhdfs_list_status_req(path)?;
 
-        let resp = self.client.send_async(req).await?;
+        let resp = self.client.send(req).await?;
         match resp.status() {
             StatusCode::OK => {
                 let body_bs = resp.into_body().bytes().await?;
diff --git a/core/src/services/webhdfs/writer.rs 
b/core/src/services/webhdfs/writer.rs
index d0db0f8f..8fea27a6 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -51,7 +51,7 @@ impl oio::Write for WebhdfsWriter {
             )
             .await?;
 
-        let resp = self.backend.client.send_async(req).await?;
+        let resp = self.backend.client.send(req).await?;
 
         let status = resp.status();
         match status {


Reply via email to