This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 64f0e9ff refactor: Remove not used blocking http client (#1895)
64f0e9ff is described below
commit 64f0e9ff31a70c1bc6dd411ccab3927f1f91927e
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 01:07:02 2023 +0800
refactor: Remove not used blocking http client (#1895)
* refactor: Remove not used blocking http client
Signed-off-by: Xuanwo <[email protected]>
* Polish naming
Signed-off-by: Xuanwo <[email protected]>
* Fix API
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/Cargo.toml | 1 -
core/src/raw/http_util/body.rs | 51 -------------
core/src/raw/http_util/client.rs | 137 +++--------------------------------
core/src/raw/http_util/mod.rs | 1 -
core/src/raw/rps.rs | 11 +--
core/src/services/azblob/backend.rs | 14 ++--
core/src/services/azblob/writer.rs | 2 +-
core/src/services/azdfs/backend.rs | 10 +--
core/src/services/azdfs/writer.rs | 4 +-
core/src/services/gcs/backend.rs | 12 +--
core/src/services/gcs/writer.rs | 2 +-
core/src/services/ghac/backend.rs | 18 ++---
core/src/services/ghac/writer.rs | 4 +-
core/src/services/http/backend.rs | 4 +-
core/src/services/ipfs/backend.rs | 6 +-
core/src/services/ipmfs/backend.rs | 12 +--
core/src/services/obs/backend.rs | 12 +--
core/src/services/obs/writer.rs | 2 +-
core/src/services/oss/backend.rs | 18 ++---
core/src/services/oss/writer.rs | 4 +-
core/src/services/s3/backend.rs | 18 ++---
core/src/services/s3/writer.rs | 4 +-
core/src/services/webdav/backend.rs | 14 ++--
core/src/services/webhdfs/backend.rs | 14 ++--
core/src/services/webhdfs/writer.rs | 2 +-
25 files changed, 100 insertions(+), 277 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 17b49a54..48d5fffa 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -122,7 +122,6 @@ reqsign = "0.8.5"
reqwest = { version = "0.11.13", features = [
"multipart",
"stream",
- "blocking",
], default-features = false }
rocksdb = { version = "0.20.1", default-features = false, optional = true }
serde = { version = "1", features = ["derive"] }
diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs
index 412143c2..b4d43247 100644
--- a/core/src/raw/http_util/body.rs
+++ b/core/src/raw/http_util/body.rs
@@ -33,57 +33,6 @@ use crate::Error;
use crate::ErrorKind;
use crate::Result;
-/// Body used in blocking HTTP requests.
-#[derive(Default)]
-pub enum Body {
- /// An empty body.
- #[default]
- Empty,
- /// Body with bytes.
- Bytes(Bytes),
-}
-
-impl Body {
- /// Consume the entire body.
- pub fn consume(self) -> Result<()> {
- Ok(())
- }
-}
-
-impl From<Body> for reqwest::blocking::Body {
- fn from(v: Body) -> Self {
- match v {
- Body::Empty => reqwest::blocking::Body::from(""),
- Body::Bytes(bs) => reqwest::blocking::Body::from(bs),
- }
- }
-}
-
-/// IncomingBody carries the content returned by remote servers.
-///
-/// # Notes
-///
-/// Client SHOULD NEVER construct this body.
-pub struct IncomingBody {
- /// # TODO
- ///
- /// hyper returns `impl Stream<Item = crate::Result<Bytes>>` but we can't
- /// write the types in stable. So we will box here.
- ///
- /// After
[TAIT](https://rust-lang.github.io/rfcs/2515-type_alias_impl_trait.html)
- /// has been stable, we can change `IncomingAsyncBody` into
`IncomingAsyncBody<S>`.
- #[allow(unused)]
- inner: reqwest::blocking::Response,
- #[allow(unused)]
- size: Option<u64>,
-}
-
-impl IncomingBody {
- pub fn new(resp: reqwest::blocking::Response, size: Option<u64>) -> Self {
- IncomingBody { inner: resp, size }
- }
-}
-
/// Body used in async HTTP requests.
#[derive(Default)]
pub enum AsyncBody {
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 8bd2c318..bd4722ff 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -18,7 +18,6 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::str::FromStr;
-use std::thread;
use futures::TryStreamExt;
use http::Request;
@@ -27,10 +26,8 @@ use reqwest::redirect::Policy;
use reqwest::Url;
use super::body::IncomingAsyncBody;
-use super::body::IncomingBody;
use super::parse_content_length;
use super::AsyncBody;
-use super::Body;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
@@ -38,8 +35,7 @@ use crate::Result;
/// HttpClient that used across opendal.
#[derive(Clone)]
pub struct HttpClient {
- async_client: reqwest::Client,
- blocking_client: reqwest::blocking::Client,
+ client: reqwest::Client,
}
/// We don't want users to know details about our clients.
@@ -52,25 +48,11 @@ impl Debug for HttpClient {
impl HttpClient {
/// Create a new http client in async context.
pub fn new() -> Result<Self> {
- Self::build(
- reqwest::ClientBuilder::new(),
- reqwest::blocking::ClientBuilder::new(),
- )
+ Self::build(reqwest::ClientBuilder::new())
}
/// Build a new http client in async context.
- pub fn build(
- async_builder: reqwest::ClientBuilder,
- blocking_builder: reqwest::blocking::ClientBuilder,
- ) -> Result<Self> {
- Ok(HttpClient {
- async_client: Self::build_async_client(async_builder)?,
- blocking_client: Self::build_blocking_client(blocking_builder)?,
- })
- }
-
- /// Build a new blocking client with given builder.
- fn build_async_client(mut builder: reqwest::ClientBuilder) ->
Result<reqwest::Client> {
+ pub fn build(mut builder: reqwest::ClientBuilder) -> Result<Self> {
// Make sure we don't enable auto gzip decompress.
builder = builder.no_gzip();
// Make sure we don't enable auto brotli decompress.
@@ -83,122 +65,25 @@ impl HttpClient {
#[cfg(feature = "trust-dns")]
let builder = builder.trust_dns(true);
- builder.build().map_err(|err| {
- Error::new(ErrorKind::Unexpected, "async client build
failed").set_source(err)
+ Ok(Self {
+ client: builder.build().map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "async client build
failed").set_source(err)
+ })?,
})
}
- /// Build a new blocking client with given builder.
- ///
- /// # Notes
- ///
- /// `reqwest::blocking::ClientBuilder::build` will panic if called
- /// inside async context. So we need to spawn a thread to build it.
- ///
- /// And users SHOULD never call blocking clients in async context.
- fn build_blocking_client(
- mut builder: reqwest::blocking::ClientBuilder,
- ) -> Result<reqwest::blocking::Client> {
- // Make sure we don't enable auto gzip decompress.
- builder = builder.no_gzip();
- // Make sure we don't enable auto brotli decompress.
- builder = builder.no_brotli();
- // Make sure we don't enable auto deflate decompress.
- builder = builder.no_deflate();
- // Redirect will be handled by ourselves.
- builder = builder.redirect(Policy::none());
-
- #[cfg(feature = "trust-dns")]
- let builder = builder.trust_dns(true);
-
- thread::spawn(|| {
- builder.build().map_err(|err| {
- Error::new(ErrorKind::Unexpected, "blocking client build
failed").set_source(err)
- })
- })
- .join()
- .expect("the thread of building blocking client join failed")
- }
-
/// Get the async client from http client.
- pub async fn async_client(&self) -> reqwest::Client {
- self.async_client.clone()
- }
-
- /// Get the blocking client from http client.
- pub fn blocking_client(&self) -> reqwest::blocking::Client {
- self.blocking_client.clone()
- }
-
- /// Send a request in blocking way.
- pub fn send(&self, req: Request<Body>) -> Result<Response<IncomingBody>> {
- let is_head = req.method() == http::Method::HEAD;
- let (parts, body) = req.into_parts();
-
- let mut req_builder = self
- .blocking_client
- .request(
- parts.method,
- Url::from_str(&parts.uri.to_string()).expect("input request
url must be valid"),
- )
- .version(parts.version)
- .headers(parts.headers);
-
- req_builder = req_builder.body(body);
-
- let resp = req_builder.send().map_err(|err| {
- let is_temporary = !(
- // Builder related error should not be retried.
- err.is_builder() ||
- // Error returned by RedirectPolicy.
- //
- // We don't set this by hand, just don't allow retry.
- err.is_redirect() ||
- // We never use `Response::error_for_status`, just don't
allow retry.
- //
- // Status should be checked by our services.
- err.is_status()
- );
-
- let mut oerr = Error::new(ErrorKind::Unexpected, "send blocking
request")
- .with_operation("http_util::Client::send")
- .set_source(err);
- if is_temporary {
- oerr = oerr.set_temporary();
- }
-
- oerr
- })?;
-
- // Get content length from header so that we can check it.
- // If the request method is HEAD, we will ignore this.
- let content_length = if is_head {
- None
- } else {
- parse_content_length(resp.headers()).expect("response content
length must be valid")
- };
-
- let mut hr = Response::builder()
- .version(resp.version())
- .status(resp.status());
- for (k, v) in resp.headers().iter() {
- hr = hr.header(k, v);
- }
-
- let body = IncomingBody::new(resp, content_length);
-
- let resp = hr.body(body).expect("response must build succeed");
-
- Ok(resp)
+ pub fn client(&self) -> reqwest::Client {
+ self.client.clone()
}
/// Send a request in async way.
- pub async fn send_async(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
let is_head = req.method() == http::Method::HEAD;
let (parts, body) = req.into_parts();
let mut req_builder = self
- .async_client
+ .client
.request(
parts.method,
Url::from_str(&parts.uri.to_string()).expect("input request
url must be valid"),
diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs
index b2878bd0..9cb60a04 100644
--- a/core/src/raw/http_util/mod.rs
+++ b/core/src/raw/http_util/mod.rs
@@ -27,7 +27,6 @@ pub use client::HttpClient;
mod body;
pub use body::AsyncBody;
-pub use body::Body;
pub use body::IncomingAsyncBody;
mod header;
diff --git a/core/src/raw/rps.rs b/core/src/raw/rps.rs
index 7ce8ba0c..101310e7 100644
--- a/core/src/raw/rps.rs
+++ b/core/src/raw/rps.rs
@@ -247,16 +247,7 @@ mod tests {
},
};
- let req: Request<AsyncBody> = pr.clone().into();
- assert_eq!(Method::PATCH, req.method());
- assert_eq!(
- "https://opendal.apache.org/path/to/file",
- req.uri().to_string()
- );
- assert_eq!("123", req.headers().get(CONTENT_LENGTH).unwrap());
- assert_eq!("application/json",
req.headers().get(CONTENT_TYPE).unwrap());
-
- let req: Request<Body> = pr.into();
+ let req: Request<AsyncBody> = pr.into();
assert_eq!(Method::PATCH, req.method());
assert_eq!(
"https://opendal.apache.org/path/to/file",
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index c5ec262e..20b2d596 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -502,7 +502,7 @@ impl Accessor for AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
@@ -706,7 +706,7 @@ impl AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub fn azblob_put_blob_request(
@@ -761,7 +761,7 @@ impl AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn azblob_delete_blob(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -782,7 +782,7 @@ impl AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn azblob_copy_blob(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -809,7 +809,7 @@ impl AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(crate) async fn azblob_list_blobs(
@@ -845,7 +845,7 @@ impl AzblobBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn azblob_batch_delete(&self, paths: &[String]) ->
Result<Response<IncomingAsyncBody>> {
@@ -881,7 +881,7 @@ impl AzblobBackend {
let mut req = batch_delete_req_builder.try_into_req()?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/azblob/writer.rs
b/core/src/services/azblob/writer.rs
index e5c0a6f2..8cad24c6 100644
--- a/core/src/services/azblob/writer.rs
+++ b/core/src/services/azblob/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for AzblobWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
diff --git a/core/src/services/azdfs/backend.rs
b/core/src/services/azdfs/backend.rs
index f750d103..5a810ac2 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -342,7 +342,7 @@ impl Accessor for AzdfsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
@@ -462,7 +462,7 @@ impl AzdfsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
/// resource should be one of `file` or `directory`
@@ -556,7 +556,7 @@ impl AzdfsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn azdfs_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -579,7 +579,7 @@ impl AzdfsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(crate) async fn azdfs_list(
@@ -613,7 +613,7 @@ impl AzdfsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/azdfs/writer.rs
b/core/src/services/azdfs/writer.rs
index 327c82d2..2d85f9fb 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -54,7 +54,7 @@ impl oio::Write for AzdfsWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
match status {
@@ -77,7 +77,7 @@ impl oio::Write for AzdfsWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
match status {
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 77314abe..eda03ae6 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -371,7 +371,7 @@ impl Accessor for GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
@@ -420,7 +420,7 @@ impl Accessor for GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
if !resp.status().is_success() {
return Err(parse_error(resp).await?);
@@ -535,7 +535,7 @@ impl GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub fn gcs_insert_object_request(
@@ -588,7 +588,7 @@ impl GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn gcs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -607,7 +607,7 @@ impl GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(crate) async fn gcs_list_objects(
@@ -648,7 +648,7 @@ impl GcsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 0b984f74..1dda8d9b 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for GcsWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
diff --git a/core/src/services/ghac/backend.rs
b/core/src/services/ghac/backend.rs
index 34ff3bc1..a3279b9d 100644
--- a/core/src/services/ghac/backend.rs
+++ b/core/src/services/ghac/backend.rs
@@ -319,7 +319,7 @@ impl Accessor for GhacBackend {
let req = self.ghac_reserve(path).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let cache_id = if resp.status().is_success() {
let slc = resp.into_body().bytes().await?;
@@ -340,7 +340,7 @@ impl Accessor for GhacBackend {
.ghac_upload(cache_id, 1,
AsyncBody::Bytes(Bytes::from_static(&[0])))
.await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
@@ -351,7 +351,7 @@ impl Accessor for GhacBackend {
}
let req = self.ghac_commit(cache_id, 1).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
@@ -366,7 +366,7 @@ impl Accessor for GhacBackend {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
let req = self.ghac_query(path).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let location = if resp.status() == StatusCode::OK {
let slc = resp.into_body().bytes().await?;
@@ -378,7 +378,7 @@ impl Accessor for GhacBackend {
};
let req = self.ghac_get_location(&location, args.range()).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
match status {
@@ -400,7 +400,7 @@ impl Accessor for GhacBackend {
let req = self.ghac_reserve(path).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let cache_id = if resp.status().is_success() {
let slc = resp.into_body().bytes().await?;
@@ -424,7 +424,7 @@ impl Accessor for GhacBackend {
let req = self.ghac_query(path).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let location = if resp.status() == StatusCode::OK {
let slc = resp.into_body().bytes().await?;
@@ -438,7 +438,7 @@ impl Accessor for GhacBackend {
};
let req = self.ghac_head_location(&location).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
match status {
@@ -615,7 +615,7 @@ impl GhacBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs
index a650d18d..0f2caa85 100644
--- a/core/src/services/ghac/writer.rs
+++ b/core/src/services/ghac/writer.rs
@@ -49,7 +49,7 @@ impl oio::Write for GhacWriter {
.ghac_upload(self.cache_id, size, AsyncBody::Bytes(bs))
.await?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
@@ -73,7 +73,7 @@ impl oio::Write for GhacWriter {
async fn close(&mut self) -> Result<()> {
let req = self.backend.ghac_commit(self.cache_id, self.size).await?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
diff --git a/core/src/services/http/backend.rs
b/core/src/services/http/backend.rs
index 8a0e71bb..39fb10f5 100644
--- a/core/src/services/http/backend.rs
+++ b/core/src/services/http/backend.rs
@@ -318,7 +318,7 @@ impl HttpBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn http_head(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -336,7 +336,7 @@ impl HttpBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/ipfs/backend.rs
b/core/src/services/ipfs/backend.rs
index da83fdf7..b73108cc 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -414,7 +414,7 @@ impl IpfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn ipfs_head(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -428,7 +428,7 @@ impl IpfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn ipfs_list(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -448,7 +448,7 @@ impl IpfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/ipmfs/backend.rs
b/core/src/services/ipmfs/backend.rs
index 967a6c29..9984a4d1 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -196,7 +196,7 @@ impl IpmfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn ipmfs_read(
@@ -224,7 +224,7 @@ impl IpmfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn ipmfs_rm(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -241,7 +241,7 @@ impl IpmfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(crate) async fn ipmfs_ls(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -258,7 +258,7 @@ impl IpmfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn ipmfs_mkdir(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -275,7 +275,7 @@ impl IpmfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
/// Support write from reader.
@@ -296,7 +296,7 @@ impl IpmfsBackend {
let req = req.body(body).map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 93ee55ff..840956fd 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -326,7 +326,7 @@ impl Accessor for ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
@@ -451,7 +451,7 @@ impl ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub fn obs_put_object_request(
@@ -496,7 +496,7 @@ impl ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn obs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -512,7 +512,7 @@ impl ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn obs_copy_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -529,7 +529,7 @@ impl ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(crate) async fn obs_list_objects(
@@ -567,6 +567,6 @@ impl ObsBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs
index 97c16b1d..df7db515 100644
--- a/core/src/services/obs/writer.rs
+++ b/core/src/services/obs/writer.rs
@@ -53,7 +53,7 @@ impl oio::Write for ObsWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 1e141519..c6276982 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -739,14 +739,14 @@ impl OssBackend {
let mut req = self.oss_get_object_request(path, range, false)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn oss_head_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let mut req = self.oss_head_object_request(path, false)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn oss_put_object(
@@ -767,7 +767,7 @@ impl OssBackend {
)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn oss_copy_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -787,7 +787,7 @@ impl OssBackend {
.map_err(new_request_build_error)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub(super) async fn oss_list_object(
@@ -800,13 +800,13 @@ impl OssBackend {
let mut req = self.oss_list_object_request(path, token, delimiter,
limit)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn oss_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let mut req = self.oss_delete_object_request(path)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn oss_delete_objects(&self, paths: Vec<String>) ->
Result<Response<IncomingAsyncBody>> {
@@ -837,7 +837,7 @@ impl OssBackend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
fn get_endpoint(&self, is_presign: bool) -> &str {
@@ -850,7 +850,7 @@ impl OssBackend {
async fn oss_initiate_upload(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let req = self.oss_initiate_upload_request(path, None, None,
AsyncBody::Empty, false)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
/// Creates a request that initiates multipart upload
@@ -941,7 +941,7 @@ impl OssBackend {
.map_err(new_request_build_error)?;
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index ce7eb222..cb4ea23b 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -63,7 +63,7 @@ impl oio::Write for OssWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
@@ -96,7 +96,7 @@ impl oio::Write for OssWriter {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
let etag = parse_etag(resp.headers())?
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index d27f0049..e68c376f 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -1097,7 +1097,7 @@ impl Accessor for S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
@@ -1385,7 +1385,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub fn s3_put_object_request(
@@ -1457,7 +1457,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn s3_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -1471,7 +1471,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn s3_copy_object(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -1529,7 +1529,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
/// Make this functions as `pub(suber)` because `DirStream` depends
@@ -1570,7 +1570,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn s3_initiate_multipart_upload(
@@ -1612,7 +1612,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub fn s3_upload_part_request(
@@ -1683,7 +1683,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn s3_delete_objects(&self, paths: Vec<String>) ->
Result<Response<IncomingAsyncBody>> {
@@ -1714,7 +1714,7 @@ impl S3Backend {
self.signer.sign(&mut req).map_err(new_request_sign_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs
index bbdb9045..8114f43d 100644
--- a/core/src/services/s3/writer.rs
+++ b/core/src/services/s3/writer.rs
@@ -70,7 +70,7 @@ impl oio::Write for S3Writer {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
@@ -103,7 +103,7 @@ impl oio::Write for S3Writer {
.sign(&mut req)
.map_err(new_request_sign_error)?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index ad80a0b3..2f7fb30c 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -448,7 +448,7 @@ impl WebdavBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
pub async fn webdav_put(
@@ -482,7 +482,7 @@ impl WebdavBackend {
// Set body
let req = req.body(body).map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webdav_mkcol(
@@ -509,7 +509,7 @@ impl WebdavBackend {
let req = req.body(body).map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webdav_propfind(
@@ -548,7 +548,7 @@ impl WebdavBackend {
let req = req.body(body).map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webdav_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -566,7 +566,7 @@ impl WebdavBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webdav_copy(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -591,7 +591,7 @@ impl WebdavBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webdav_move(&self, from: &str, to: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -616,7 +616,7 @@ impl WebdavBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn create_internal(&self, abs_path: &str) -> Result<RpCreate> {
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index a9aa64aa..3bd32631 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -288,7 +288,7 @@ impl WebhdfsBackend {
return Ok(req);
}
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
// should be a 307 TEMPORARY_REDIRECT
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
@@ -391,7 +391,7 @@ impl WebhdfsBackend {
range: BytesRange,
) -> Result<Response<IncomingAsyncBody>> {
let req = self.webhdfs_open_req(path, &range).await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
// this should be a 307 redirect
if resp.status() != StatusCode::TEMPORARY_REDIRECT {
@@ -402,7 +402,7 @@ impl WebhdfsBackend {
let re_req = Request::get(&re_url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(re_req).await
+ self.client.send(re_req).await
}
async fn webhdfs_status_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -422,7 +422,7 @@ impl WebhdfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
async fn webhdfs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -441,7 +441,7 @@ impl WebhdfsBackend {
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
- self.client.send_async(req).await
+ self.client.send(req).await
}
}
@@ -564,7 +564,7 @@ impl Accessor for WebhdfsBackend {
.webhdfs_create_object_req(&path, Some(0), None, AsyncBody::Empty)
.await?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
let status = resp.status();
@@ -666,7 +666,7 @@ impl Accessor for WebhdfsBackend {
let path = path.trim_end_matches('/');
let req = self.webhdfs_list_status_req(path)?;
- let resp = self.client.send_async(req).await?;
+ let resp = self.client.send(req).await?;
match resp.status() {
StatusCode::OK => {
let body_bs = resp.into_body().bytes().await?;
diff --git a/core/src/services/webhdfs/writer.rs
b/core/src/services/webhdfs/writer.rs
index d0db0f8f..8fea27a6 100644
--- a/core/src/services/webhdfs/writer.rs
+++ b/core/src/services/webhdfs/writer.rs
@@ -51,7 +51,7 @@ impl oio::Write for WebhdfsWriter {
)
.await?;
- let resp = self.backend.client.send_async(req).await?;
+ let resp = self.backend.client.send(req).await?;
let status = resp.status();
match status {