This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 54fb2f526 chore: use fastpool as object pool impl (#6820)
54fb2f526 is described below
commit 54fb2f526483972ad4cea40dfb4bb19e78fb1fb5
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 20:32:11 2025 +0800
chore: use fastpool as object pool impl (#6820)
* chore: use fastpool as object pool impl
Signed-off-by: tison <[email protected]>
* for redis
Signed-off-by: tison <[email protected]>
* for sftp
Signed-off-by: tison <[email protected]>
* for memcached
Signed-off-by: tison <[email protected]>
* run format
Signed-off-by: tison <[email protected]>
* fixup compile
Signed-off-by: tison <[email protected]>
* Revert "run format"
This reverts commit 052cb4cca1cf49a0577dcfbba3c641fe6637a704.
---------
Signed-off-by: tison <[email protected]>
---
core/Cargo.lock | 23 ++++---
core/Cargo.toml | 14 ++--
core/src/services/etcd/backend.rs | 10 +--
core/src/services/etcd/core.rs | 98 +++++++++++++++-------------
core/src/services/ftp/backend.rs | 20 +++---
core/src/services/ftp/core.rs | 74 ++++++++++-----------
core/src/services/ftp/deleter.rs | 4 +-
core/src/services/ftp/err.rs | 2 +-
core/src/services/ftp/reader.rs | 12 ++--
core/src/services/ftp/writer.rs | 14 ++--
core/src/services/memcached/backend.rs | 28 +++-----
core/src/services/memcached/binary.rs | 1 +
core/src/services/memcached/config.rs | 2 +-
core/src/services/memcached/core.rs | 92 +++++++++++++-------------
core/src/services/redis/backend.rs | 38 +++++------
core/src/services/redis/config.rs | 2 +-
core/src/services/redis/core.rs | 111 +++++++++++++++----------------
core/src/services/sftp/backend.rs | 20 +++---
core/src/services/sftp/core.rs | 115 ++++++++++++++++++---------------
core/src/services/sftp/reader.rs | 6 +-
20 files changed, 332 insertions(+), 354 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 7ab3d1485..d07ed620c 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -1156,17 +1156,6 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
-[[package]]
-name = "bb8"
-version = "0.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5"
-dependencies = [
- "futures-util",
- "parking_lot 0.12.5",
- "tokio",
-]
-
[[package]]
name = "bcrypt"
version = "0.15.1"
@@ -2885,6 +2874,16 @@ dependencies = [
"paste",
]
+[[package]]
+name = "fastpool"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a6777b4743839a42fd32141d95d7adc4c98e3a1d5200a2598cf32ffd102c81e1"
+dependencies = [
+ "mea",
+ "scopeguard",
+]
+
[[package]]
name = "fastrace"
version = "0.7.14"
@@ -5351,7 +5350,6 @@ dependencies = [
"await-tree",
"backon",
"base64 0.22.1",
- "bb8",
"bytes",
"cacache",
"compio",
@@ -5362,6 +5360,7 @@ dependencies = [
"dotenvy",
"etcd-client",
"fastmetrics",
+ "fastpool",
"fastrace",
"fastrace-jaeger",
"flume",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 16224b93b..a05c7a597 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -141,12 +141,12 @@ services-d1 = []
services-dashmap = ["dep:dashmap"]
services-dbfs = []
services-dropbox = []
-services-etcd = ["dep:etcd-client", "dep:bb8"]
+services-etcd = ["dep:etcd-client", "dep:fastpool"]
services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs", "internal-tokio-rt"]
services-ftp = [
"dep:suppaftp",
- "dep:bb8",
+ "dep:fastpool",
"dep:futures-rustls",
"dep:rustls-native-certs",
]
@@ -167,7 +167,7 @@ services-ipfs = ["dep:prost"]
services-ipmfs = []
services-koofr = []
services-lakefs = []
-services-memcached = ["dep:bb8"]
+services-memcached = ["dep:fastpool"]
services-memory = []
services-mini-moka = ["dep:mini-moka"]
services-moka = ["dep:moka"]
@@ -195,7 +195,7 @@ services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
services-redb = ["dep:redb", "internal-tokio-rt"]
-services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
+services-redis = ["dep:redis", "dep:fastpool", "redis?/tokio-rustls-comp"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
@@ -206,7 +206,7 @@ services-s3 = [
"dep:crc32c",
]
services-seafile = []
-services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
+services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:fastpool"]
services-sled = ["dep:sled", "internal-tokio-rt"]
services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
services-surrealdb = ["dep:surrealdb"]
@@ -260,7 +260,7 @@ reqwest = { version = "0.12.24", features = [
], default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
-tokio = { version = "1.48", features = ["io-util"] }
+tokio = { version = "1.48", features = ["macros", "io-util"] }
url = "2.5"
uuid = { version = "1", features = ["serde", "v4"] }
@@ -272,7 +272,7 @@ rand = { version = "0.8", optional = true }
# Services
# general dependencies.
-bb8 = { version = "0.9", optional = true }
+fastpool = { version = "1.0.2", optional = true }
prost = { version = "0.13", optional = true }
sha1 = { version = "0.10.6", optional = true }
sha2 = { version = "0.10", optional = true }
diff --git a/core/src/services/etcd/backend.rs
b/core/src/services/etcd/backend.rs
index 8a9d916f0..317345240 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -21,7 +21,6 @@ use etcd_client::Certificate;
use etcd_client::ConnectOptions;
use etcd_client::Identity;
use etcd_client::TlsOptions;
-use mea::once::OnceCell;
use super::ETCD_SCHEME;
use super::config::EtcdConfig;
@@ -158,14 +157,7 @@ impl Builder for EtcdBuilder {
.as_str(),
);
- let client = OnceCell::new();
-
- let core = EtcdCore {
- endpoints,
- client,
- options,
- };
-
+ let core = EtcdCore::new(endpoints, options);
Ok(EtcdBackend::new(core, &root))
}
}
diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs
index fcb07a73d..24b6eea6e 100644
--- a/core/src/services/etcd/core.rs
+++ b/core/src/services/etcd/core.rs
@@ -15,17 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Debug;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
-use etcd_client::Client;
-use etcd_client::ConnectOptions;
-use mea::once::OnceCell;
-
use crate::services::etcd::error::format_etcd_error;
use crate::{Buffer, Error, ErrorKind, Result};
+use etcd_client::Client;
+use etcd_client::ConnectOptions;
+use fastpool::ManageObject;
+use fastpool::ObjectStatus;
+use fastpool::bounded;
+use std::fmt::Debug;
+use std::sync::Arc;
+use std::time::Duration;
pub mod constants {
pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";
@@ -37,34 +36,33 @@ pub struct Manager {
options: ConnectOptions,
}
-impl bb8::ManageConnection for Manager {
- type Connection = Client;
+impl ManageObject for Manager {
+ type Object = Client;
type Error = Error;
- async fn connect(&self) -> Result<Self::Connection, Self::Error> {
- let conn = Client::connect(self.endpoints.clone(),
Some(self.options.clone()))
+ async fn create(&self) -> Result<Self::Object, Self::Error> {
+ Client::connect(self.endpoints.clone(), Some(self.options.clone()))
.await
- .map_err(format_etcd_error)?;
-
- Ok(conn)
- }
-
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- let _ = conn.status().await.map_err(format_etcd_error)?;
- Ok(())
+ .map_err(format_etcd_error)
}
- /// Always allow reuse conn.
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
+ async fn is_recyclable(
+ &self,
+ o: &mut Self::Object,
+ _: &ObjectStatus,
+ ) -> Result<(), Self::Error> {
+ match o.status().await {
+ Ok(_) => Ok(()),
+ Err(err) => Err(format_etcd_error(err)),
+ }
}
}
#[derive(Clone)]
pub struct EtcdCore {
- pub endpoints: Vec<String>,
- pub options: ConnectOptions,
- pub client: OnceCell<Pool<Manager>>,
+ endpoints: Vec<String>,
+ options: ConnectOptions,
+ client: Arc<bounded::Pool<Manager>>,
}
impl Debug for EtcdCore {
@@ -77,26 +75,34 @@ impl Debug for EtcdCore {
}
impl EtcdCore {
- pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
- let client = self
- .client
- .get_or_try_init(|| async {
- Pool::builder()
- .max_size(64)
- .build(Manager {
- endpoints: self.endpoints.clone(),
- options: self.options.clone(),
- })
- .await
- })
- .await?;
-
- client.get_owned().await.map_err(|err| match err {
- RunError::User(err) => err,
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self {
+ let client = bounded::Pool::new(
+ bounded::PoolConfig::new(64),
+ Manager {
+ endpoints: endpoints.clone(),
+ options: options.clone(),
+ },
+ );
+
+ Self {
+ endpoints,
+ options,
+ client,
+ }
+ }
+
+ pub async fn conn(&self) -> Result<bounded::Object<Manager>> {
+ let fut = self.client.get();
+
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => {
+ Err(Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary())
+ }
+ result = fut => match result {
+ Ok(conn) => Ok(conn),
+ Err(err) => Err(err),
}
- })
+ }
}
pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 0b0c87b13..555487717 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
use http::Uri;
use log::debug;
-use mea::once::OnceCell;
use services::ftp::core::Manager;
use suppaftp::FtpError;
use suppaftp::Status;
@@ -32,7 +31,7 @@ use super::FTP_SCHEME;
use super::config::FtpConfig;
use super::core::FtpCore;
use super::deleter::FtpDeleter;
-use super::err::parse_error;
+use super::err::format_ftp_error;
use super::lister::FtpLister;
use super::reader::FtpReader;
use super::writer::FtpWriter;
@@ -165,6 +164,7 @@ impl Builder for FtpBuilder {
..Default::default()
});
+
let manager = Manager {
endpoint: endpoint.clone(),
root: root.clone(),
@@ -172,12 +172,8 @@ impl Builder for FtpBuilder {
password: password.clone(),
enable_secure,
};
- let core = Arc::new(FtpCore {
- info: accessor_info.into(),
- manager,
- pool: OnceCell::new(),
- });
+ let core = Arc::new(FtpCore::new(accessor_info.into(),
manager.clone()));
Ok(FtpBackend { core })
}
}
@@ -201,7 +197,7 @@ impl Access for FtpBackend {
type Deleter = oio::OneShotDeleter<FtpDeleter>;
fn info(&self) -> Arc<AccessorInfo> {
- self.core.info.clone()
+ self.core.info()
}
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
@@ -221,7 +217,7 @@ impl Access for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
- return Err(parse_error(e));
+ return Err(format_ftp_error(e));
}
}
}
@@ -277,7 +273,7 @@ impl Access for FtpBackend {
}))
| Ok(()) => (),
Err(e) => {
- return Err(parse_error(e));
+ return Err(format_ftp_error(e));
}
}
}
@@ -299,7 +295,7 @@ impl Access for FtpBackend {
let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
let pathname = if path == "/" { None } else { Some(path) };
- let files = ftp_stream.list(pathname).await.map_err(parse_error)?;
+ let files = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
Ok((
RpList::default(),
@@ -316,7 +312,7 @@ impl FtpBackend {
let pathname = if parent == "/" { None } else { Some(parent) };
- let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;
+ let resp = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
// Get stat of file.
let mut files = resp
diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs
index 42d427096..afe227add 100644
--- a/core/src/services/ftp/core.rs
+++ b/core/src/services/ftp/core.rs
@@ -15,14 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
+use fastpool::{ManageObject, ObjectStatus, bounded};
use futures_rustls::TlsConnector;
-use mea::once::OnceCell;
use raw::Operation;
+use std::sync::Arc;
+use std::time::Duration;
use suppaftp::AsyncRustlsConnector;
use suppaftp::AsyncRustlsFtpStream;
use suppaftp::FtpError;
@@ -31,35 +28,37 @@ use suppaftp::Status;
use suppaftp::rustls::ClientConfig;
use suppaftp::types::FileType;
-use super::err::parse_error;
+use super::err::format_ftp_error;
use crate::raw::AccessorInfo;
use crate::*;
pub struct FtpCore {
- pub info: Arc<AccessorInfo>,
- pub manager: Manager,
- pub pool: OnceCell<Pool<Manager>>,
+ info: Arc<AccessorInfo>,
+ pool: Arc<bounded::Pool<Manager>>,
}
impl FtpCore {
- pub async fn ftp_connect(&self, _: Operation) ->
Result<PooledConnection<'static, Manager>> {
- let pool = self
- .pool
- .get_or_try_init(|| async {
- Pool::builder()
- .max_size(64)
- .build(self.manager.clone())
- .await
- })
- .await
- .map_err(parse_error)?;
-
- pool.get_owned().await.map_err(|err| match err {
- RunError::User(err) => parse_error(err),
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self {
+ let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager);
+ Self { info, pool }
+ }
+
+ pub fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
+ }
+
+ pub async fn ftp_connect(&self, _: Operation) ->
Result<bounded::Object<Manager>> {
+ let fut = self.pool.get();
+
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => {
+ Err(Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary())
+ }
+ result = fut => match result {
+ Ok(conn) => Ok(conn),
+ Err(err) => Err(format_ftp_error(err)),
}
- })
+ }
}
}
@@ -72,11 +71,11 @@ pub struct Manager {
pub enable_secure: bool,
}
-impl bb8::ManageConnection for Manager {
- type Connection = AsyncRustlsFtpStream;
+impl ManageObject for Manager {
+ type Object = AsyncRustlsFtpStream;
type Error = FtpError;
- async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ async fn create(&self) -> Result<Self::Object, Self::Error> {
let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
// switch to secure mode if ssl/tls is on.
let mut ftp_stream = if self.enable_secure {
@@ -123,14 +122,11 @@ impl bb8::ManageConnection for Manager {
Ok(ftp_stream)
}
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- conn.noop().await
- }
-
- /// Don't allow reuse conn.
- ///
- /// We need to investigate why reuse conn will cause error.
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- true
+ async fn is_recyclable(
+ &self,
+ o: &mut Self::Object,
+ _: &ObjectStatus,
+ ) -> Result<(), Self::Error> {
+ o.noop().await
}
}
diff --git a/core/src/services/ftp/deleter.rs b/core/src/services/ftp/deleter.rs
index 38a5b9fdd..10b688912 100644
--- a/core/src/services/ftp/deleter.rs
+++ b/core/src/services/ftp/deleter.rs
@@ -22,7 +22,7 @@ use suppaftp::Status;
use suppaftp::types::Response;
use super::core::FtpCore;
-use super::err::parse_error;
+use super::err::format_ftp_error;
use crate::raw::*;
use crate::*;
@@ -53,7 +53,7 @@ impl oio::OneShotDelete for FtpDeleter {
}))
| Ok(_) => (),
Err(e) => {
- return Err(parse_error(e));
+ return Err(format_ftp_error(e));
}
}
diff --git a/core/src/services/ftp/err.rs b/core/src/services/ftp/err.rs
index eac889880..a2ea33674 100644
--- a/core/src/services/ftp/err.rs
+++ b/core/src/services/ftp/err.rs
@@ -21,7 +21,7 @@ use suppaftp::Status;
use crate::Error;
use crate::ErrorKind;
-pub(super) fn parse_error(err: FtpError) -> Error {
+pub(super) fn format_ftp_error(err: FtpError) -> Error {
let (kind, retryable) = match err {
// Allow retry for error
//
diff --git a/core/src/services/ftp/reader.rs b/core/src/services/ftp/reader.rs
index bebc53d8f..144b03d2b 100644
--- a/core/src/services/ftp/reader.rs
+++ b/core/src/services/ftp/reader.rs
@@ -15,19 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-use bb8::PooledConnection;
use bytes::BytesMut;
+use fastpool::bounded;
use futures::AsyncRead;
use futures::AsyncReadExt;
use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
use crate::raw::*;
use crate::*;
pub struct FtpReader {
/// Keep the connection alive while data stream is alive.
- _ftp_stream: PooledConnection<'static, Manager>,
+ _ftp_stream: bounded::Object<Manager>,
data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>,
chunk: usize,
@@ -41,7 +41,7 @@ unsafe impl Sync for FtpReader {}
impl FtpReader {
pub async fn new(
- mut ftp_stream: PooledConnection<'static, Manager>,
+ mut ftp_stream: bounded::Object<Manager>,
path: String,
args: OpRead,
) -> Result<Self> {
@@ -53,12 +53,12 @@ impl FtpReader {
ftp_stream
.resume_transfer(offset as usize)
.await
- .map_err(parse_error)?;
+ .map_err(format_ftp_error)?;
}
let ds = ftp_stream
.retr_as_stream(path)
.await
- .map_err(parse_error)?
+ .map_err(format_ftp_error)?
.take(size as _);
Ok(Self {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 52db47378..838e0dda7 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,20 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-use bb8::PooledConnection;
use bytes::Buf;
+use fastpool::bounded;
use futures::AsyncWrite;
use futures::AsyncWriteExt;
use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
use crate::raw::*;
use crate::*;
pub struct FtpWriter {
target_path: String,
tmp_path: Option<String>,
- ftp_stream: PooledConnection<'static, Manager>,
+ ftp_stream: bounded::Object<Manager>,
data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>,
}
@@ -44,7 +44,7 @@ unsafe impl Sync for FtpWriter {}
/// After we can use data stream, we should return it directly.
impl FtpWriter {
pub fn new(
- ftp_stream: PooledConnection<'static, Manager>,
+ ftp_stream: bounded::Object<Manager>,
target_path: String,
tmp_path: Option<String>,
) -> Self {
@@ -70,7 +70,7 @@ impl oio::Write for FtpWriter {
self.ftp_stream
.append_with_stream(path)
.await
- .map_err(parse_error)?,
+ .map_err(format_ftp_error)?,
));
}
@@ -100,13 +100,13 @@ impl oio::Write for FtpWriter {
self.ftp_stream
.finalize_put_stream(data_stream)
.await
- .map_err(parse_error)?;
+ .map_err(format_ftp_error)?;
if let Some(tmp_path) = &self.tmp_path {
self.ftp_stream
.rename(tmp_path, &self.target_path)
.await
- .map_err(parse_error)?;
+ .map_err(format_ftp_error)?;
}
}
diff --git a/core/src/services/memcached/backend.rs
b/core/src/services/memcached/backend.rs
index 22ce701c6..cbf6aa5c4 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -18,8 +18,6 @@
use std::sync::Arc;
use std::time::Duration;
-use mea::once::OnceCell;
-
use super::MEMCACHED_SCHEME;
use super::config::MemcachedConfig;
use super::core::*;
@@ -85,7 +83,7 @@ impl MemcachedBuilder {
///
/// Will panic if `max_size` is 0.
#[must_use]
- pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+ pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
assert!(max_size > 0, "max_size must be greater than zero!");
self.config.connection_pool_max_size = Some(max_size);
self
@@ -144,23 +142,15 @@ impl Builder for MemcachedBuilder {
};
let endpoint = format!("{host}:{port}",);
- let root = normalize_root(
- self.config
- .root
- .clone()
- .unwrap_or_else(|| "/".to_string())
- .as_str(),
- );
-
- let conn = OnceCell::new();
- Ok(MemcachedBackend::new(MemcachedCore {
- conn,
+ let root = normalize_root(self.config.root.unwrap_or_else(||
"/".to_string()).as_str());
+
+ Ok(MemcachedBackend::new(MemcachedCore::new(
endpoint,
- username: self.config.username.clone(),
- password: self.config.password.clone(),
- default_ttl: self.config.default_ttl,
- connection_pool_max_size: self.config.connection_pool_max_size,
- })
+ self.config.username,
+ self.config.password,
+ self.config.default_ttl,
+ self.config.connection_pool_max_size,
+ ))
.with_normalized_root(root))
}
}
diff --git a/core/src/services/memcached/binary.rs
b/core/src/services/memcached/binary.rs
index e3db22abc..d433f3fce 100644
--- a/core/src/services/memcached/binary.rs
+++ b/core/src/services/memcached/binary.rs
@@ -97,6 +97,7 @@ pub struct Response {
value: Vec<u8>,
}
+#[derive(Debug)]
pub struct Connection {
io: BufReader<TcpStream>,
}
diff --git a/core/src/services/memcached/config.rs
b/core/src/services/memcached/config.rs
index 8b82537d7..ac8990a1d 100644
--- a/core/src/services/memcached/config.rs
+++ b/core/src/services/memcached/config.rs
@@ -46,7 +46,7 @@ pub struct MemcachedConfig {
/// The maximum number of connections allowed.
///
/// default is 10
- pub connection_pool_max_size: Option<u32>,
+ pub connection_pool_max_size: Option<usize>,
}
impl Debug for MemcachedConfig {
diff --git a/core/src/services/memcached/core.rs
b/core/src/services/memcached/core.rs
index add582f80..c88d7b0be 100644
--- a/core/src/services/memcached/core.rs
+++ b/core/src/services/memcached/core.rs
@@ -15,19 +15,18 @@
// specific language governing permissions and limitations
// under the License.
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use std::sync::Arc;
use std::time::Duration;
-
-use bb8::RunError;
-use mea::once::OnceCell;
use tokio::net::TcpStream;
use super::binary;
use crate::raw::*;
use crate::*;
-/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
-#[derive(Clone, Debug)]
-pub struct MemcacheConnectionManager {
+/// A connection manager for `memcache_async::ascii::Protocol`.
+#[derive(Clone)]
+struct MemcacheConnectionManager {
address: String,
username: Option<String>,
password: Option<String>,
@@ -43,12 +42,12 @@ impl MemcacheConnectionManager {
}
}
-impl bb8::ManageConnection for MemcacheConnectionManager {
- type Connection = binary::Connection;
+impl ManageObject for MemcacheConnectionManager {
+ type Object = binary::Connection;
type Error = Error;
/// TODO: Implement unix stream support.
- async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ async fn create(&self) -> Result<Self::Object, Self::Error> {
let conn = TcpStream::connect(&self.address)
.await
.map_err(new_std_io_error)?;
@@ -60,53 +59,52 @@ impl bb8::ManageConnection for MemcacheConnectionManager {
Ok(conn)
}
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- conn.version().await.map(|_| ())
- }
-
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
+ async fn is_recyclable(
+ &self,
+ o: &mut Self::Object,
+ _: &ObjectStatus,
+ ) -> Result<(), Self::Error> {
+ match o.version().await {
+ Ok(_) => Ok(()),
+ Err(err) => Err(err),
+ }
}
}
#[derive(Clone, Debug)]
pub struct MemcachedCore {
- pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
- pub endpoint: String,
- pub username: Option<String>,
- pub password: Option<String>,
- pub default_ttl: Option<Duration>,
- pub connection_pool_max_size: Option<u32>,
+ default_ttl: Option<Duration>,
+ conn: Arc<bounded::Pool<MemcacheConnectionManager>>,
}
impl MemcachedCore {
- async fn conn(&self) -> Result<bb8::PooledConnection<'_,
MemcacheConnectionManager>> {
- let pool = self
- .conn
- .get_or_try_init(|| async {
- let mgr = MemcacheConnectionManager::new(
- &self.endpoint,
- self.username.clone(),
- self.password.clone(),
- );
-
- bb8::Pool::builder()
- .max_size(self.connection_pool_max_size.unwrap_or(10))
- .build(mgr)
- .await
- .map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "connect to
memcached failed")
- .set_source(err)
- })
- })
- .await?;
-
- pool.get().await.map_err(|err| match err {
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "get connection from pool
failed").set_temporary()
+ pub fn new(
+ endpoint: String,
+ username: Option<String>,
+ password: Option<String>,
+ default_ttl: Option<Duration>,
+ connection_pool_max_size: Option<usize>,
+ ) -> Self {
+ let conn = bounded::Pool::new(
+ bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+ MemcacheConnectionManager::new(endpoint.as_str(), username,
password),
+ );
+
+ Self { default_ttl, conn }
+ }
+
+ async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>>
{
+ let fut = self.conn.get();
+
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => {
+ Err(Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary())
}
- RunError::User(err) => err,
- })
+ result = fut => match result {
+ Ok(conn) => Ok(conn),
+ Err(err) => Err(err),
+ }
+ }
}
pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
diff --git a/core/src/services/redis/backend.rs
b/core/src/services/redis/backend.rs
index 5365422cc..e9b3d58a1 100644
--- a/core/src/services/redis/backend.rs
+++ b/core/src/services/redis/backend.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
use std::time::Duration;
use http::Uri;
-use mea::once::OnceCell;
use redis::Client;
use redis::ConnectionAddr;
use redis::ConnectionInfo;
@@ -133,7 +132,7 @@ impl RedisBuilder {
///
/// Will panic if `max_size` is 0.
#[must_use]
- pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+ pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
assert!(max_size > 0, "max_size must be greater than zero!");
self.config.connection_pool_max_size = Some(max_size);
self
@@ -166,16 +165,13 @@ impl Builder for RedisBuilder {
}
let client = client_builder.build().map_err(format_redis_error)?;
- let conn = OnceCell::new();
-
- Ok(RedisBackend::new(RedisCore {
- addr: endpoints,
- client: None,
- cluster_client: Some(client),
- conn,
- default_ttl: self.config.default_ttl,
- connection_pool_max_size: self.config.connection_pool_max_size,
- })
+ Ok(RedisBackend::new(RedisCore::new(
+ endpoints,
+ None,
+ Some(client),
+ self.config.default_ttl,
+ self.config.connection_pool_max_size,
+ ))
.with_normalized_root(root))
} else {
let endpoint = self
@@ -193,15 +189,13 @@ impl Builder for RedisBuilder {
.set_source(e)
})?;
- let conn = OnceCell::new();
- Ok(RedisBackend::new(RedisCore {
- addr: endpoint,
- client: Some(client),
- cluster_client: None,
- conn,
- default_ttl: self.config.default_ttl,
- connection_pool_max_size: self.config.connection_pool_max_size,
- })
+ Ok(RedisBackend::new(RedisCore::new(
+ endpoint,
+ Some(client),
+ None,
+ self.config.default_ttl,
+ self.config.connection_pool_max_size,
+ ))
.with_normalized_root(root))
}
}
@@ -277,7 +271,7 @@ impl RedisBackend {
fn new(core: RedisCore) -> Self {
let info = AccessorInfo::default();
info.set_scheme(REDIS_SCHEME);
- info.set_name(&core.addr);
+ info.set_name(core.addr());
info.set_root("/");
info.set_native_capability(Capability {
read: true,
diff --git a/core/src/services/redis/config.rs
b/core/src/services/redis/config.rs
index c5d134f80..1f07f4a04 100644
--- a/core/src/services/redis/config.rs
+++ b/core/src/services/redis/config.rs
@@ -40,7 +40,7 @@ pub struct RedisConfig {
/// The maximum number of connections allowed.
///
/// default is 10
- pub connection_pool_max_size: Option<u32>,
+ pub connection_pool_max_size: Option<usize>,
/// the username to connect redis service.
///
/// default is None
diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs
index 21f609f51..f0f5de20a 100644
--- a/core/src/services/redis/core.rs
+++ b/core/src/services/redis/core.rs
@@ -16,11 +16,12 @@
// under the License.
use std::fmt::Debug;
+use std::sync::Arc;
use std::time::Duration;
-use bb8::RunError;
+use crate::*;
use bytes::Bytes;
-use mea::once::OnceCell;
+use fastpool::{ManageObject, ObjectStatus, bounded};
use redis::AsyncCommands;
use redis::Client;
use redis::Cmd;
@@ -33,8 +34,6 @@ use redis::aio::ConnectionManager;
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
-use crate::*;
-
#[derive(Clone)]
pub enum RedisConnection {
Normal(ConnectionManager),
@@ -71,15 +70,15 @@ impl ConnectionLike for RedisConnection {
#[derive(Clone)]
pub struct RedisConnectionManager {
- pub client: Option<Client>,
- pub cluster_client: Option<ClusterClient>,
+ client: Option<Client>,
+ cluster_client: Option<ClusterClient>,
}
-impl bb8::ManageConnection for RedisConnectionManager {
- type Connection = RedisConnection;
+impl ManageObject for RedisConnectionManager {
+ type Object = RedisConnection;
type Error = Error;
- async fn connect(&self) -> Result<RedisConnection, Self::Error> {
+ async fn create(&self) -> Result<RedisConnection, Self::Error> {
if let Some(client) = self.client.clone() {
ConnectionManager::new(client.clone())
.await
@@ -96,30 +95,27 @@ impl bb8::ManageConnection for RedisConnectionManager {
}
}
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- let pong: String = conn.ping().await.map_err(format_redis_error)?;
-
- if pong == "PONG" {
- Ok(())
- } else {
- Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
+ async fn is_recyclable(
+ &self,
+ o: &mut Self::Object,
+ _: &ObjectStatus,
+ ) -> Result<(), Self::Error> {
+ match o.ping::<String>().await {
+ Ok(ref pong) => match pong.as_bytes() {
+ b"PONG" => Ok(()),
+ _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")),
+ },
+ Err(err) => Err(format_redis_error(err)),
}
}
-
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
- }
}
/// RedisCore holds the Redis connection and configuration
#[derive(Clone)]
pub struct RedisCore {
- pub addr: String,
- pub client: Option<Client>,
- pub cluster_client: Option<ClusterClient>,
- pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>,
- pub default_ttl: Option<Duration>,
- pub connection_pool_max_size: Option<u32>,
+ addr: String,
+ conn: Arc<bounded::Pool<RedisConnectionManager>>,
+ default_ttl: Option<Duration>,
}
impl Debug for RedisCore {
@@ -131,38 +127,43 @@ impl Debug for RedisCore {
}
impl RedisCore {
- pub async fn conn(&self) -> Result<bb8::PooledConnection<'_,
RedisConnectionManager>> {
- let pool = self
- .conn
- .get_or_try_init(|| async {
- bb8::Pool::builder()
- .max_size(self.connection_pool_max_size.unwrap_or(10))
- .build(self.get_redis_connection_manager())
- .await
- .map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "connect to redis
failed")
- .set_source(err)
- })
- })
- .await?;
- pool.get().await.map_err(|err| match err {
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "get connection from pool
failed").set_temporary()
- }
- RunError::User(err) => err,
- })
+ pub fn new(
+ endpoint: String,
+ client: Option<Client>,
+ cluster_client: Option<ClusterClient>,
+ default_ttl: Option<Duration>,
+ connection_pool_max_size: Option<usize>,
+ ) -> Self {
+ let manager = RedisConnectionManager {
+ client,
+ cluster_client,
+ };
+ let pool = bounded::Pool::new(
+ bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+ manager,
+ );
+
+ Self {
+ addr: endpoint,
+ conn: pool,
+ default_ttl,
+ }
+ }
+
+ pub fn addr(&self) -> &str {
+ &self.addr
}
- pub fn get_redis_connection_manager(&self) -> RedisConnectionManager {
- if let Some(_client) = self.client.clone() {
- RedisConnectionManager {
- client: self.client.clone(),
- cluster_client: None,
+ pub async fn conn(&self) ->
Result<bounded::Object<RedisConnectionManager>> {
+ let fut = self.conn.get();
+
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => {
+ Err(Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary())
}
- } else {
- RedisConnectionManager {
- client: None,
- cluster_client: self.cluster_client.clone(),
+ result = fut => match result {
+ Ok(conn) => Ok(conn),
+ Err(err) => Err(err),
}
}
}
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
index ca2e17a81..384c0b3e4 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -21,7 +21,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use log::debug;
-use mea::once::OnceCell;
use openssh::KnownHosts;
use tokio::io::AsyncSeekExt;
@@ -187,17 +186,14 @@ impl Builder for SftpBuilder {
..Default::default()
});
- let accessor_info = Arc::new(info);
- let core = Arc::new(SftpCore {
- info: accessor_info,
+ let core = Arc::new(SftpCore::new(
+ Arc::new(info),
endpoint,
root,
user,
- key: self.config.key.clone(),
+ self.config.key.clone(),
known_hosts_strategy,
-
- client: OnceCell::new(),
- });
+ ));
debug!("sftp backend finished: {:?}", &self);
Ok(SftpBackend { core })
@@ -319,11 +315,11 @@ impl Access for SftpBackend {
let dir = match fs.open_dir(&file_path).await {
Ok(dir) => dir,
Err(e) => {
- if is_not_found(&e) {
- return Ok((RpList::default(), None));
+ return if is_not_found(&e) {
+ Ok((RpList::default(), None))
} else {
- return Err(parse_sftp_error(e));
- }
+ Err(parse_sftp_error(e))
+ };
}
}
.read_dir();
diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs
index 327cf57af..2520a03d9 100644
--- a/core/src/services/sftp/core.rs
+++ b/core/src/services/sftp/core.rs
@@ -15,35 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Debug;
-use std::path::Path;
-use std::path::PathBuf;
-use std::sync::Arc;
-
-use bb8::PooledConnection;
-use bb8::RunError;
-use log::debug;
-use mea::once::OnceCell;
-use openssh::KnownHosts;
-use openssh::SessionBuilder;
-use openssh_sftp_client::Sftp;
-use openssh_sftp_client::SftpOptions;
-
use super::error::is_sftp_protocol_error;
use super::error::parse_sftp_error;
use super::error::parse_ssh_error;
use crate::raw::*;
use crate::*;
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use log::debug;
+use openssh::KnownHosts;
+use openssh::SessionBuilder;
+use openssh_sftp_client::Sftp;
+use openssh_sftp_client::SftpOptions;
+use std::fmt::Debug;
+use std::path::Path;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
pub struct SftpCore {
pub info: Arc<AccessorInfo>,
pub endpoint: String,
pub root: String,
- pub user: Option<String>,
- pub key: Option<String>,
- pub known_hosts_strategy: KnownHosts,
-
- pub client: OnceCell<bb8::Pool<Manager>>,
+ client: Arc<bounded::Pool<Manager>>,
}
impl Debug for SftpCore {
@@ -56,29 +49,45 @@ impl Debug for SftpCore {
}
impl SftpCore {
- pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
- let client = self
- .client
- .get_or_try_init(|| async {
- bb8::Pool::builder()
- .max_size(64)
- .build(Manager {
- endpoint: self.endpoint.clone(),
- root: self.root.clone(),
- user: self.user.clone(),
- key: self.key.clone(),
- known_hosts_strategy:
self.known_hosts_strategy.clone(),
- })
- .await
- })
- .await?;
-
- client.get_owned().await.map_err(|err| match err {
- RunError::User(err) => err,
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ pub fn new(
+ info: Arc<AccessorInfo>,
+ endpoint: String,
+ root: String,
+ user: Option<String>,
+ key: Option<String>,
+ known_hosts_strategy: KnownHosts,
+ ) -> Self {
+ let client = bounded::Pool::new(
+ bounded::PoolConfig::new(64),
+ Manager {
+ endpoint: endpoint.clone(),
+ root: root.clone(),
+ user,
+ key,
+ known_hosts_strategy,
+ },
+ );
+
+ SftpCore {
+ info,
+ endpoint,
+ root,
+ client,
+ }
+ }
+
+ pub async fn connect(&self) -> Result<bounded::Object<Manager>> {
+ let fut = self.client.get();
+
+ tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => {
+ Err(Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary())
+ }
+ result = fut => match result {
+ Ok(conn) => Ok(conn),
+ Err(err) => Err(err),
}
- })
+ }
}
}
@@ -90,11 +99,11 @@ pub struct Manager {
known_hosts_strategy: KnownHosts,
}
-impl bb8::ManageConnection for Manager {
- type Connection = Sftp;
+impl ManageObject for Manager {
+ type Object = Sftp;
type Error = Error;
- async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ async fn create(&self) -> Result<Self::Object, Self::Error> {
let mut session = SessionBuilder::default();
if let Some(user) = &self.user {
@@ -140,14 +149,14 @@ impl bb8::ManageConnection for Manager {
}
// Check if connect valid by checking the root path.
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
-
- Ok(())
- }
-
- /// Always allow reuse conn.
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
+ async fn is_recyclable(
+ &self,
+ o: &mut Self::Object,
+ _: &ObjectStatus,
+ ) -> Result<(), Self::Error> {
+ match o.fs().metadata("./").await {
+ Ok(_) => Ok(()),
+ Err(e) => Err(parse_sftp_error(e)),
+ }
}
}
diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs
index 781f27dd7..676389123 100644
--- a/core/src/services/sftp/reader.rs
+++ b/core/src/services/sftp/reader.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use bb8::PooledConnection;
use bytes::BytesMut;
+use fastpool::bounded;
use openssh_sftp_client::file::File;
use super::core::Manager;
@@ -26,7 +26,7 @@ use crate::*;
pub struct SftpReader {
/// Keep the connection alive while data stream is alive.
- _conn: PooledConnection<'static, Manager>,
+ _conn: bounded::Object<Manager>,
file: File,
chunk: usize,
@@ -36,7 +36,7 @@ pub struct SftpReader {
}
impl SftpReader {
- pub fn new(conn: PooledConnection<'static, Manager>, file: File, size:
Option<u64>) -> Self {
+ pub fn new(conn: bounded::Object<Manager>, file: File, size: Option<u64>)
-> Self {
Self {
_conn: conn,
file,