This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 54fb2f526 chore: use fastpool as object pool impl  (#6820)
54fb2f526 is described below

commit 54fb2f526483972ad4cea40dfb4bb19e78fb1fb5
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 20:32:11 2025 +0800

    chore: use fastpool as object pool impl  (#6820)
    
    * chore: use fastpool as object pool impl
    
    Signed-off-by: tison <[email protected]>
    
    * for redis
    
    Signed-off-by: tison <[email protected]>
    
    * for sftp
    
    Signed-off-by: tison <[email protected]>
    
    * for memcached
    
    Signed-off-by: tison <[email protected]>
    
    * run format
    
    Signed-off-by: tison <[email protected]>
    
    * fixup compile
    
    Signed-off-by: tison <[email protected]>
    
    * Revert "run format"
    
    This reverts commit 052cb4cca1cf49a0577dcfbba3c641fe6637a704.
    
    ---------
    
    Signed-off-by: tison <[email protected]>
---
 core/Cargo.lock                        |  23 ++++---
 core/Cargo.toml                        |  14 ++--
 core/src/services/etcd/backend.rs      |  10 +--
 core/src/services/etcd/core.rs         |  98 +++++++++++++++-------------
 core/src/services/ftp/backend.rs       |  20 +++---
 core/src/services/ftp/core.rs          |  74 ++++++++++-----------
 core/src/services/ftp/deleter.rs       |   4 +-
 core/src/services/ftp/err.rs           |   2 +-
 core/src/services/ftp/reader.rs        |  12 ++--
 core/src/services/ftp/writer.rs        |  14 ++--
 core/src/services/memcached/backend.rs |  28 +++-----
 core/src/services/memcached/binary.rs  |   1 +
 core/src/services/memcached/config.rs  |   2 +-
 core/src/services/memcached/core.rs    |  92 +++++++++++++-------------
 core/src/services/redis/backend.rs     |  38 +++++------
 core/src/services/redis/config.rs      |   2 +-
 core/src/services/redis/core.rs        | 111 +++++++++++++++----------------
 core/src/services/sftp/backend.rs      |  20 +++---
 core/src/services/sftp/core.rs         | 115 ++++++++++++++++++---------------
 core/src/services/sftp/reader.rs       |   6 +-
 20 files changed, 332 insertions(+), 354 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 7ab3d1485..d07ed620c 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -1156,17 +1156,6 @@ version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
 
-[[package]]
-name = "bb8"
-version = "0.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5"
-dependencies = [
- "futures-util",
- "parking_lot 0.12.5",
- "tokio",
-]
-
 [[package]]
 name = "bcrypt"
 version = "0.15.1"
@@ -2885,6 +2874,16 @@ dependencies = [
  "paste",
 ]
 
+[[package]]
+name = "fastpool"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a6777b4743839a42fd32141d95d7adc4c98e3a1d5200a2598cf32ffd102c81e1"
+dependencies = [
+ "mea",
+ "scopeguard",
+]
+
 [[package]]
 name = "fastrace"
 version = "0.7.14"
@@ -5351,7 +5350,6 @@ dependencies = [
  "await-tree",
  "backon",
  "base64 0.22.1",
- "bb8",
  "bytes",
  "cacache",
  "compio",
@@ -5362,6 +5360,7 @@ dependencies = [
  "dotenvy",
  "etcd-client",
  "fastmetrics",
+ "fastpool",
  "fastrace",
  "fastrace-jaeger",
  "flume",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 16224b93b..a05c7a597 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -141,12 +141,12 @@ services-d1 = []
 services-dashmap = ["dep:dashmap"]
 services-dbfs = []
 services-dropbox = []
-services-etcd = ["dep:etcd-client", "dep:bb8"]
+services-etcd = ["dep:etcd-client", "dep:fastpool"]
 services-foundationdb = ["dep:foundationdb"]
 services-fs = ["tokio/fs", "internal-tokio-rt"]
 services-ftp = [
   "dep:suppaftp",
-  "dep:bb8",
+  "dep:fastpool",
   "dep:futures-rustls",
   "dep:rustls-native-certs",
 ]
@@ -167,7 +167,7 @@ services-ipfs = ["dep:prost"]
 services-ipmfs = []
 services-koofr = []
 services-lakefs = []
-services-memcached = ["dep:bb8"]
+services-memcached = ["dep:fastpool"]
 services-memory = []
 services-mini-moka = ["dep:mini-moka"]
 services-moka = ["dep:moka"]
@@ -195,7 +195,7 @@ services-pcloud = []
 services-persy = ["dep:persy", "internal-tokio-rt"]
 services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
 services-redb = ["dep:redb", "internal-tokio-rt"]
-services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
+services-redis = ["dep:redis", "dep:fastpool", "redis?/tokio-rustls-comp"]
 services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
 services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
 services-s3 = [
@@ -206,7 +206,7 @@ services-s3 = [
   "dep:crc32c",
 ]
 services-seafile = []
-services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
+services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:fastpool"]
 services-sled = ["dep:sled", "internal-tokio-rt"]
 services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
 services-surrealdb = ["dep:surrealdb"]
@@ -260,7 +260,7 @@ reqwest = { version = "0.12.24", features = [
 ], default-features = false }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
-tokio = { version = "1.48", features = ["io-util"] }
+tokio = { version = "1.48", features = ["macros", "io-util"] }
 url = "2.5"
 uuid = { version = "1", features = ["serde", "v4"] }
 
@@ -272,7 +272,7 @@ rand = { version = "0.8", optional = true }
 
 # Services
 # general dependencies.
-bb8 = { version = "0.9", optional = true }
+fastpool = { version = "1.0.2", optional = true }
 prost = { version = "0.13", optional = true }
 sha1 = { version = "0.10.6", optional = true }
 sha2 = { version = "0.10", optional = true }
diff --git a/core/src/services/etcd/backend.rs 
b/core/src/services/etcd/backend.rs
index 8a9d916f0..317345240 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -21,7 +21,6 @@ use etcd_client::Certificate;
 use etcd_client::ConnectOptions;
 use etcd_client::Identity;
 use etcd_client::TlsOptions;
-use mea::once::OnceCell;
 
 use super::ETCD_SCHEME;
 use super::config::EtcdConfig;
@@ -158,14 +157,7 @@ impl Builder for EtcdBuilder {
                 .as_str(),
         );
 
-        let client = OnceCell::new();
-
-        let core = EtcdCore {
-            endpoints,
-            client,
-            options,
-        };
-
+        let core = EtcdCore::new(endpoints, options);
         Ok(EtcdBackend::new(core, &root))
     }
 }
diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs
index fcb07a73d..24b6eea6e 100644
--- a/core/src/services/etcd/core.rs
+++ b/core/src/services/etcd/core.rs
@@ -15,17 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
-use etcd_client::Client;
-use etcd_client::ConnectOptions;
-use mea::once::OnceCell;
-
 use crate::services::etcd::error::format_etcd_error;
 use crate::{Buffer, Error, ErrorKind, Result};
+use etcd_client::Client;
+use etcd_client::ConnectOptions;
+use fastpool::ManageObject;
+use fastpool::ObjectStatus;
+use fastpool::bounded;
+use std::fmt::Debug;
+use std::sync::Arc;
+use std::time::Duration;
 
 pub mod constants {
     pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";;
@@ -37,34 +36,33 @@ pub struct Manager {
     options: ConnectOptions,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = Client;
+impl ManageObject for Manager {
+    type Object = Client;
     type Error = Error;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
-        let conn = Client::connect(self.endpoints.clone(), 
Some(self.options.clone()))
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
+        Client::connect(self.endpoints.clone(), Some(self.options.clone()))
             .await
-            .map_err(format_etcd_error)?;
-
-        Ok(conn)
-    }
-
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let _ = conn.status().await.map_err(format_etcd_error)?;
-        Ok(())
+            .map_err(format_etcd_error)
     }
 
-    /// Always allow reuse conn.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.status().await {
+            Ok(_) => Ok(()),
+            Err(err) => Err(format_etcd_error(err)),
+        }
     }
 }
 
 #[derive(Clone)]
 pub struct EtcdCore {
-    pub endpoints: Vec<String>,
-    pub options: ConnectOptions,
-    pub client: OnceCell<Pool<Manager>>,
+    endpoints: Vec<String>,
+    options: ConnectOptions,
+    client: Arc<bounded::Pool<Manager>>,
 }
 
 impl Debug for EtcdCore {
@@ -77,26 +75,34 @@ impl Debug for EtcdCore {
 }
 
 impl EtcdCore {
-    pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
-        let client = self
-            .client
-            .get_or_try_init(|| async {
-                Pool::builder()
-                    .max_size(64)
-                    .build(Manager {
-                        endpoints: self.endpoints.clone(),
-                        options: self.options.clone(),
-                    })
-                    .await
-            })
-            .await?;
-
-        client.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => err,
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self {
+        let client = bounded::Pool::new(
+            bounded::PoolConfig::new(64),
+            Manager {
+                endpoints: endpoints.clone(),
+                options: options.clone(),
+            },
+        );
+
+        Self {
+            endpoints,
+            options,
+            client,
+        }
+    }
+
+    pub async fn conn(&self) -> Result<bounded::Object<Manager>> {
+        let fut = self.client.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
-        })
+        }
     }
 
     pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 0b0c87b13..555487717 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
 
 use http::Uri;
 use log::debug;
-use mea::once::OnceCell;
 use services::ftp::core::Manager;
 use suppaftp::FtpError;
 use suppaftp::Status;
@@ -32,7 +31,7 @@ use super::FTP_SCHEME;
 use super::config::FtpConfig;
 use super::core::FtpCore;
 use super::deleter::FtpDeleter;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use super::lister::FtpLister;
 use super::reader::FtpReader;
 use super::writer::FtpWriter;
@@ -165,6 +164,7 @@ impl Builder for FtpBuilder {
 
                 ..Default::default()
             });
+
         let manager = Manager {
             endpoint: endpoint.clone(),
             root: root.clone(),
@@ -172,12 +172,8 @@ impl Builder for FtpBuilder {
             password: password.clone(),
             enable_secure,
         };
-        let core = Arc::new(FtpCore {
-            info: accessor_info.into(),
-            manager,
-            pool: OnceCell::new(),
-        });
 
+        let core = Arc::new(FtpCore::new(accessor_info.into(), 
manager.clone()));
         Ok(FtpBackend { core })
     }
 }
@@ -201,7 +197,7 @@ impl Access for FtpBackend {
     type Deleter = oio::OneShotDeleter<FtpDeleter>;
 
     fn info(&self) -> Arc<AccessorInfo> {
-        self.core.info.clone()
+        self.core.info()
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
@@ -221,7 +217,7 @@ impl Access for FtpBackend {
                 }))
                 | Ok(()) => (),
                 Err(e) => {
-                    return Err(parse_error(e));
+                    return Err(format_ftp_error(e));
                 }
             }
         }
@@ -277,7 +273,7 @@ impl Access for FtpBackend {
                 }))
                 | Ok(()) => (),
                 Err(e) => {
-                    return Err(parse_error(e));
+                    return Err(format_ftp_error(e));
                 }
             }
         }
@@ -299,7 +295,7 @@ impl Access for FtpBackend {
         let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
 
         let pathname = if path == "/" { None } else { Some(path) };
-        let files = ftp_stream.list(pathname).await.map_err(parse_error)?;
+        let files = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
 
         Ok((
             RpList::default(),
@@ -316,7 +312,7 @@ impl FtpBackend {
 
         let pathname = if parent == "/" { None } else { Some(parent) };
 
-        let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;
+        let resp = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
 
         // Get stat of file.
         let mut files = resp
diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs
index 42d427096..afe227add 100644
--- a/core/src/services/ftp/core.rs
+++ b/core/src/services/ftp/core.rs
@@ -15,14 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
+use fastpool::{ManageObject, ObjectStatus, bounded};
 use futures_rustls::TlsConnector;
-use mea::once::OnceCell;
 use raw::Operation;
+use std::sync::Arc;
+use std::time::Duration;
 use suppaftp::AsyncRustlsConnector;
 use suppaftp::AsyncRustlsFtpStream;
 use suppaftp::FtpError;
@@ -31,35 +28,37 @@ use suppaftp::Status;
 use suppaftp::rustls::ClientConfig;
 use suppaftp::types::FileType;
 
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::AccessorInfo;
 use crate::*;
 
 pub struct FtpCore {
-    pub info: Arc<AccessorInfo>,
-    pub manager: Manager,
-    pub pool: OnceCell<Pool<Manager>>,
+    info: Arc<AccessorInfo>,
+    pool: Arc<bounded::Pool<Manager>>,
 }
 
 impl FtpCore {
-    pub async fn ftp_connect(&self, _: Operation) -> 
Result<PooledConnection<'static, Manager>> {
-        let pool = self
-            .pool
-            .get_or_try_init(|| async {
-                Pool::builder()
-                    .max_size(64)
-                    .build(self.manager.clone())
-                    .await
-            })
-            .await
-            .map_err(parse_error)?;
-
-        pool.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => parse_error(err),
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self {
+        let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager);
+        Self { info, pool }
+    }
+
+    pub fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
+    }
+
+    pub async fn ftp_connect(&self, _: Operation) -> 
Result<bounded::Object<Manager>> {
+        let fut = self.pool.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(format_ftp_error(err)),
             }
-        })
+        }
     }
 }
 
@@ -72,11 +71,11 @@ pub struct Manager {
     pub enable_secure: bool,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = AsyncRustlsFtpStream;
+impl ManageObject for Manager {
+    type Object = AsyncRustlsFtpStream;
     type Error = FtpError;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
         // switch to secure mode if ssl/tls is on.
         let mut ftp_stream = if self.enable_secure {
@@ -123,14 +122,11 @@ impl bb8::ManageConnection for Manager {
         Ok(ftp_stream)
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        conn.noop().await
-    }
-
-    /// Don't allow reuse conn.
-    ///
-    /// We need to investigate why reuse conn will cause error.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        true
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        o.noop().await
     }
 }
diff --git a/core/src/services/ftp/deleter.rs b/core/src/services/ftp/deleter.rs
index 38a5b9fdd..10b688912 100644
--- a/core/src/services/ftp/deleter.rs
+++ b/core/src/services/ftp/deleter.rs
@@ -22,7 +22,7 @@ use suppaftp::Status;
 use suppaftp::types::Response;
 
 use super::core::FtpCore;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
@@ -53,7 +53,7 @@ impl oio::OneShotDelete for FtpDeleter {
             }))
             | Ok(_) => (),
             Err(e) => {
-                return Err(parse_error(e));
+                return Err(format_ftp_error(e));
             }
         }
 
diff --git a/core/src/services/ftp/err.rs b/core/src/services/ftp/err.rs
index eac889880..a2ea33674 100644
--- a/core/src/services/ftp/err.rs
+++ b/core/src/services/ftp/err.rs
@@ -21,7 +21,7 @@ use suppaftp::Status;
 use crate::Error;
 use crate::ErrorKind;
 
-pub(super) fn parse_error(err: FtpError) -> Error {
+pub(super) fn format_ftp_error(err: FtpError) -> Error {
     let (kind, retryable) = match err {
         // Allow retry for error
         //
diff --git a/core/src/services/ftp/reader.rs b/core/src/services/ftp/reader.rs
index bebc53d8f..144b03d2b 100644
--- a/core/src/services/ftp/reader.rs
+++ b/core/src/services/ftp/reader.rs
@@ -15,19 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::BytesMut;
+use fastpool::bounded;
 use futures::AsyncRead;
 use futures::AsyncReadExt;
 
 use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct FtpReader {
     /// Keep the connection alive while data stream is alive.
-    _ftp_stream: PooledConnection<'static, Manager>,
+    _ftp_stream: bounded::Object<Manager>,
 
     data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>,
     chunk: usize,
@@ -41,7 +41,7 @@ unsafe impl Sync for FtpReader {}
 
 impl FtpReader {
     pub async fn new(
-        mut ftp_stream: PooledConnection<'static, Manager>,
+        mut ftp_stream: bounded::Object<Manager>,
         path: String,
         args: OpRead,
     ) -> Result<Self> {
@@ -53,12 +53,12 @@ impl FtpReader {
             ftp_stream
                 .resume_transfer(offset as usize)
                 .await
-                .map_err(parse_error)?;
+                .map_err(format_ftp_error)?;
         }
         let ds = ftp_stream
             .retr_as_stream(path)
             .await
-            .map_err(parse_error)?
+            .map_err(format_ftp_error)?
             .take(size as _);
 
         Ok(Self {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 52db47378..838e0dda7 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,20 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::Buf;
+use fastpool::bounded;
 use futures::AsyncWrite;
 use futures::AsyncWriteExt;
 
 use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct FtpWriter {
     target_path: String,
     tmp_path: Option<String>,
-    ftp_stream: PooledConnection<'static, Manager>,
+    ftp_stream: bounded::Object<Manager>,
     data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>,
 }
 
@@ -44,7 +44,7 @@ unsafe impl Sync for FtpWriter {}
 /// After we can use data stream, we should return it directly.
 impl FtpWriter {
     pub fn new(
-        ftp_stream: PooledConnection<'static, Manager>,
+        ftp_stream: bounded::Object<Manager>,
         target_path: String,
         tmp_path: Option<String>,
     ) -> Self {
@@ -70,7 +70,7 @@ impl oio::Write for FtpWriter {
                 self.ftp_stream
                     .append_with_stream(path)
                     .await
-                    .map_err(parse_error)?,
+                    .map_err(format_ftp_error)?,
             ));
         }
 
@@ -100,13 +100,13 @@ impl oio::Write for FtpWriter {
             self.ftp_stream
                 .finalize_put_stream(data_stream)
                 .await
-                .map_err(parse_error)?;
+                .map_err(format_ftp_error)?;
 
             if let Some(tmp_path) = &self.tmp_path {
                 self.ftp_stream
                     .rename(tmp_path, &self.target_path)
                     .await
-                    .map_err(parse_error)?;
+                    .map_err(format_ftp_error)?;
             }
         }
 
diff --git a/core/src/services/memcached/backend.rs 
b/core/src/services/memcached/backend.rs
index 22ce701c6..cbf6aa5c4 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -18,8 +18,6 @@
 use std::sync::Arc;
 use std::time::Duration;
 
-use mea::once::OnceCell;
-
 use super::MEMCACHED_SCHEME;
 use super::config::MemcachedConfig;
 use super::core::*;
@@ -85,7 +83,7 @@ impl MemcachedBuilder {
     ///
     /// Will panic if `max_size` is 0.
     #[must_use]
-    pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
         assert!(max_size > 0, "max_size must be greater than zero!");
         self.config.connection_pool_max_size = Some(max_size);
         self
@@ -144,23 +142,15 @@ impl Builder for MemcachedBuilder {
         };
         let endpoint = format!("{host}:{port}",);
 
-        let root = normalize_root(
-            self.config
-                .root
-                .clone()
-                .unwrap_or_else(|| "/".to_string())
-                .as_str(),
-        );
-
-        let conn = OnceCell::new();
-        Ok(MemcachedBackend::new(MemcachedCore {
-            conn,
+        let root = normalize_root(self.config.root.unwrap_or_else(|| 
"/".to_string()).as_str());
+
+        Ok(MemcachedBackend::new(MemcachedCore::new(
             endpoint,
-            username: self.config.username.clone(),
-            password: self.config.password.clone(),
-            default_ttl: self.config.default_ttl,
-            connection_pool_max_size: self.config.connection_pool_max_size,
-        })
+            self.config.username,
+            self.config.password,
+            self.config.default_ttl,
+            self.config.connection_pool_max_size,
+        ))
         .with_normalized_root(root))
     }
 }
diff --git a/core/src/services/memcached/binary.rs 
b/core/src/services/memcached/binary.rs
index e3db22abc..d433f3fce 100644
--- a/core/src/services/memcached/binary.rs
+++ b/core/src/services/memcached/binary.rs
@@ -97,6 +97,7 @@ pub struct Response {
     value: Vec<u8>,
 }
 
+#[derive(Debug)]
 pub struct Connection {
     io: BufReader<TcpStream>,
 }
diff --git a/core/src/services/memcached/config.rs 
b/core/src/services/memcached/config.rs
index 8b82537d7..ac8990a1d 100644
--- a/core/src/services/memcached/config.rs
+++ b/core/src/services/memcached/config.rs
@@ -46,7 +46,7 @@ pub struct MemcachedConfig {
     /// The maximum number of connections allowed.
     ///
     /// default is 10
-    pub connection_pool_max_size: Option<u32>,
+    pub connection_pool_max_size: Option<usize>,
 }
 
 impl Debug for MemcachedConfig {
diff --git a/core/src/services/memcached/core.rs 
b/core/src/services/memcached/core.rs
index add582f80..c88d7b0be 100644
--- a/core/src/services/memcached/core.rs
+++ b/core/src/services/memcached/core.rs
@@ -15,19 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use std::sync::Arc;
 use std::time::Duration;
-
-use bb8::RunError;
-use mea::once::OnceCell;
 use tokio::net::TcpStream;
 
 use super::binary;
 use crate::raw::*;
 use crate::*;
 
-/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
-#[derive(Clone, Debug)]
-pub struct MemcacheConnectionManager {
+/// A connection manager for `memcache_async::ascii::Protocol`.
+#[derive(Clone)]
+struct MemcacheConnectionManager {
     address: String,
     username: Option<String>,
     password: Option<String>,
@@ -43,12 +42,12 @@ impl MemcacheConnectionManager {
     }
 }
 
-impl bb8::ManageConnection for MemcacheConnectionManager {
-    type Connection = binary::Connection;
+impl ManageObject for MemcacheConnectionManager {
+    type Object = binary::Connection;
     type Error = Error;
 
     /// TODO: Implement unix stream support.
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let conn = TcpStream::connect(&self.address)
             .await
             .map_err(new_std_io_error)?;
@@ -60,53 +59,52 @@ impl bb8::ManageConnection for MemcacheConnectionManager {
         Ok(conn)
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        conn.version().await.map(|_| ())
-    }
-
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.version().await {
+            Ok(_) => Ok(()),
+            Err(err) => Err(err),
+        }
     }
 }
 
 #[derive(Clone, Debug)]
 pub struct MemcachedCore {
-    pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
-    pub endpoint: String,
-    pub username: Option<String>,
-    pub password: Option<String>,
-    pub default_ttl: Option<Duration>,
-    pub connection_pool_max_size: Option<u32>,
+    default_ttl: Option<Duration>,
+    conn: Arc<bounded::Pool<MemcacheConnectionManager>>,
 }
 
 impl MemcachedCore {
-    async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
MemcacheConnectionManager>> {
-        let pool = self
-            .conn
-            .get_or_try_init(|| async {
-                let mgr = MemcacheConnectionManager::new(
-                    &self.endpoint,
-                    self.username.clone(),
-                    self.password.clone(),
-                );
-
-                bb8::Pool::builder()
-                    .max_size(self.connection_pool_max_size.unwrap_or(10))
-                    .build(mgr)
-                    .await
-                    .map_err(|err| {
-                        Error::new(ErrorKind::ConfigInvalid, "connect to 
memcached failed")
-                            .set_source(err)
-                    })
-            })
-            .await?;
-
-        pool.get().await.map_err(|err| match err {
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
+    pub fn new(
+        endpoint: String,
+        username: Option<String>,
+        password: Option<String>,
+        default_ttl: Option<Duration>,
+        connection_pool_max_size: Option<usize>,
+    ) -> Self {
+        let conn = bounded::Pool::new(
+            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+            MemcacheConnectionManager::new(endpoint.as_str(), username, 
password),
+        );
+
+        Self { default_ttl, conn }
+    }
+
+    async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>> 
{
+        let fut = self.conn.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
             }
-            RunError::User(err) => err,
-        })
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
+            }
+        }
     }
 
     pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
diff --git a/core/src/services/redis/backend.rs 
b/core/src/services/redis/backend.rs
index 5365422cc..e9b3d58a1 100644
--- a/core/src/services/redis/backend.rs
+++ b/core/src/services/redis/backend.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use http::Uri;
-use mea::once::OnceCell;
 use redis::Client;
 use redis::ConnectionAddr;
 use redis::ConnectionInfo;
@@ -133,7 +132,7 @@ impl RedisBuilder {
     ///
     /// Will panic if `max_size` is 0.
     #[must_use]
-    pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
         assert!(max_size > 0, "max_size must be greater than zero!");
         self.config.connection_pool_max_size = Some(max_size);
         self
@@ -166,16 +165,13 @@ impl Builder for RedisBuilder {
             }
             let client = client_builder.build().map_err(format_redis_error)?;
 
-            let conn = OnceCell::new();
-
-            Ok(RedisBackend::new(RedisCore {
-                addr: endpoints,
-                client: None,
-                cluster_client: Some(client),
-                conn,
-                default_ttl: self.config.default_ttl,
-                connection_pool_max_size: self.config.connection_pool_max_size,
-            })
+            Ok(RedisBackend::new(RedisCore::new(
+                endpoints,
+                None,
+                Some(client),
+                self.config.default_ttl,
+                self.config.connection_pool_max_size,
+            ))
             .with_normalized_root(root))
         } else {
             let endpoint = self
@@ -193,15 +189,13 @@ impl Builder for RedisBuilder {
                         .set_source(e)
                 })?;
 
-            let conn = OnceCell::new();
-            Ok(RedisBackend::new(RedisCore {
-                addr: endpoint,
-                client: Some(client),
-                cluster_client: None,
-                conn,
-                default_ttl: self.config.default_ttl,
-                connection_pool_max_size: self.config.connection_pool_max_size,
-            })
+            Ok(RedisBackend::new(RedisCore::new(
+                endpoint,
+                Some(client),
+                None,
+                self.config.default_ttl,
+                self.config.connection_pool_max_size,
+            ))
             .with_normalized_root(root))
         }
     }
@@ -277,7 +271,7 @@ impl RedisBackend {
     fn new(core: RedisCore) -> Self {
         let info = AccessorInfo::default();
         info.set_scheme(REDIS_SCHEME);
-        info.set_name(&core.addr);
+        info.set_name(core.addr());
         info.set_root("/");
         info.set_native_capability(Capability {
             read: true,
diff --git a/core/src/services/redis/config.rs 
b/core/src/services/redis/config.rs
index c5d134f80..1f07f4a04 100644
--- a/core/src/services/redis/config.rs
+++ b/core/src/services/redis/config.rs
@@ -40,7 +40,7 @@ pub struct RedisConfig {
     /// The maximum number of connections allowed.
     ///
     /// default is 10
-    pub connection_pool_max_size: Option<u32>,
+    pub connection_pool_max_size: Option<usize>,
     /// the username to connect redis service.
     ///
     /// default is None
diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs
index 21f609f51..f0f5de20a 100644
--- a/core/src/services/redis/core.rs
+++ b/core/src/services/redis/core.rs
@@ -16,11 +16,12 @@
 // under the License.
 
 use std::fmt::Debug;
+use std::sync::Arc;
 use std::time::Duration;
 
-use bb8::RunError;
+use crate::*;
 use bytes::Bytes;
-use mea::once::OnceCell;
+use fastpool::{ManageObject, ObjectStatus, bounded};
 use redis::AsyncCommands;
 use redis::Client;
 use redis::Cmd;
@@ -33,8 +34,6 @@ use redis::aio::ConnectionManager;
 use redis::cluster::ClusterClient;
 use redis::cluster_async::ClusterConnection;
 
-use crate::*;
-
 #[derive(Clone)]
 pub enum RedisConnection {
     Normal(ConnectionManager),
@@ -71,15 +70,15 @@ impl ConnectionLike for RedisConnection {
 
 #[derive(Clone)]
 pub struct RedisConnectionManager {
-    pub client: Option<Client>,
-    pub cluster_client: Option<ClusterClient>,
+    client: Option<Client>,
+    cluster_client: Option<ClusterClient>,
 }
 
-impl bb8::ManageConnection for RedisConnectionManager {
-    type Connection = RedisConnection;
+impl ManageObject for RedisConnectionManager {
+    type Object = RedisConnection;
     type Error = Error;
 
-    async fn connect(&self) -> Result<RedisConnection, Self::Error> {
+    async fn create(&self) -> Result<RedisConnection, Self::Error> {
         if let Some(client) = self.client.clone() {
             ConnectionManager::new(client.clone())
                 .await
@@ -96,30 +95,27 @@ impl bb8::ManageConnection for RedisConnectionManager {
         }
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let pong: String = conn.ping().await.map_err(format_redis_error)?;
-
-        if pong == "PONG" {
-            Ok(())
-        } else {
-            Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.ping::<String>().await {
+            Ok(ref pong) => match pong.as_bytes() {
+                b"PONG" => Ok(()),
+                _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")),
+            },
+            Err(err) => Err(format_redis_error(err)),
         }
     }
-
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
-    }
 }
 
 /// RedisCore holds the Redis connection and configuration
 #[derive(Clone)]
 pub struct RedisCore {
-    pub addr: String,
-    pub client: Option<Client>,
-    pub cluster_client: Option<ClusterClient>,
-    pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>,
-    pub default_ttl: Option<Duration>,
-    pub connection_pool_max_size: Option<u32>,
+    addr: String,
+    conn: Arc<bounded::Pool<RedisConnectionManager>>,
+    default_ttl: Option<Duration>,
 }
 
 impl Debug for RedisCore {
@@ -131,38 +127,43 @@ impl Debug for RedisCore {
 }
 
 impl RedisCore {
-    pub async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
RedisConnectionManager>> {
-        let pool = self
-            .conn
-            .get_or_try_init(|| async {
-                bb8::Pool::builder()
-                    .max_size(self.connection_pool_max_size.unwrap_or(10))
-                    .build(self.get_redis_connection_manager())
-                    .await
-                    .map_err(|err| {
-                        Error::new(ErrorKind::ConfigInvalid, "connect to redis 
failed")
-                            .set_source(err)
-                    })
-            })
-            .await?;
-        pool.get().await.map_err(|err| match err {
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
-            }
-            RunError::User(err) => err,
-        })
+    pub fn new(
+        endpoint: String,
+        client: Option<Client>,
+        cluster_client: Option<ClusterClient>,
+        default_ttl: Option<Duration>,
+        connection_pool_max_size: Option<usize>,
+    ) -> Self {
+        let manager = RedisConnectionManager {
+            client,
+            cluster_client,
+        };
+        let pool = bounded::Pool::new(
+            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+            manager,
+        );
+
+        Self {
+            addr: endpoint,
+            conn: pool,
+            default_ttl,
+        }
+    }
+
+    pub fn addr(&self) -> &str {
+        &self.addr
     }
 
-    pub fn get_redis_connection_manager(&self) -> RedisConnectionManager {
-        if let Some(_client) = self.client.clone() {
-            RedisConnectionManager {
-                client: self.client.clone(),
-                cluster_client: None,
+    pub async fn conn(&self) -> 
Result<bounded::Object<RedisConnectionManager>> {
+        let fut = self.conn.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
             }
-        } else {
-            RedisConnectionManager {
-                client: None,
-                cluster_client: self.cluster_client.clone(),
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
         }
     }
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index ca2e17a81..384c0b3e4 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -21,7 +21,6 @@ use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
-use mea::once::OnceCell;
 use openssh::KnownHosts;
 use tokio::io::AsyncSeekExt;
 
@@ -187,17 +186,14 @@ impl Builder for SftpBuilder {
                 ..Default::default()
             });
 
-        let accessor_info = Arc::new(info);
-        let core = Arc::new(SftpCore {
-            info: accessor_info,
+        let core = Arc::new(SftpCore::new(
+            Arc::new(info),
             endpoint,
             root,
             user,
-            key: self.config.key.clone(),
+            self.config.key.clone(),
             known_hosts_strategy,
-
-            client: OnceCell::new(),
-        });
+        ));
 
         debug!("sftp backend finished: {:?}", &self);
         Ok(SftpBackend { core })
@@ -319,11 +315,11 @@ impl Access for SftpBackend {
         let dir = match fs.open_dir(&file_path).await {
             Ok(dir) => dir,
             Err(e) => {
-                if is_not_found(&e) {
-                    return Ok((RpList::default(), None));
+                return if is_not_found(&e) {
+                    Ok((RpList::default(), None))
                 } else {
-                    return Err(parse_sftp_error(e));
-                }
+                    Err(parse_sftp_error(e))
+                };
             }
         }
         .read_dir();
diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs
index 327cf57af..2520a03d9 100644
--- a/core/src/services/sftp/core.rs
+++ b/core/src/services/sftp/core.rs
@@ -15,35 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::path::Path;
-use std::path::PathBuf;
-use std::sync::Arc;
-
-use bb8::PooledConnection;
-use bb8::RunError;
-use log::debug;
-use mea::once::OnceCell;
-use openssh::KnownHosts;
-use openssh::SessionBuilder;
-use openssh_sftp_client::Sftp;
-use openssh_sftp_client::SftpOptions;
-
 use super::error::is_sftp_protocol_error;
 use super::error::parse_sftp_error;
 use super::error::parse_ssh_error;
 use crate::raw::*;
 use crate::*;
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use log::debug;
+use openssh::KnownHosts;
+use openssh::SessionBuilder;
+use openssh_sftp_client::Sftp;
+use openssh_sftp_client::SftpOptions;
+use std::fmt::Debug;
+use std::path::Path;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
 
 pub struct SftpCore {
     pub info: Arc<AccessorInfo>,
     pub endpoint: String,
     pub root: String,
-    pub user: Option<String>,
-    pub key: Option<String>,
-    pub known_hosts_strategy: KnownHosts,
-
-    pub client: OnceCell<bb8::Pool<Manager>>,
+    client: Arc<bounded::Pool<Manager>>,
 }
 
 impl Debug for SftpCore {
@@ -56,29 +49,45 @@ impl Debug for SftpCore {
 }
 
 impl SftpCore {
-    pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
-        let client = self
-            .client
-            .get_or_try_init(|| async {
-                bb8::Pool::builder()
-                    .max_size(64)
-                    .build(Manager {
-                        endpoint: self.endpoint.clone(),
-                        root: self.root.clone(),
-                        user: self.user.clone(),
-                        key: self.key.clone(),
-                        known_hosts_strategy: 
self.known_hosts_strategy.clone(),
-                    })
-                    .await
-            })
-            .await?;
-
-        client.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => err,
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(
+        info: Arc<AccessorInfo>,
+        endpoint: String,
+        root: String,
+        user: Option<String>,
+        key: Option<String>,
+        known_hosts_strategy: KnownHosts,
+    ) -> Self {
+        let client = bounded::Pool::new(
+            bounded::PoolConfig::new(64),
+            Manager {
+                endpoint: endpoint.clone(),
+                root: root.clone(),
+                user,
+                key,
+                known_hosts_strategy,
+            },
+        );
+
+        SftpCore {
+            info,
+            endpoint,
+            root,
+            client,
+        }
+    }
+
+    pub async fn connect(&self) -> Result<bounded::Object<Manager>> {
+        let fut = self.client.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
-        })
+        }
     }
 }
 
@@ -90,11 +99,11 @@ pub struct Manager {
     known_hosts_strategy: KnownHosts,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = Sftp;
+impl ManageObject for Manager {
+    type Object = Sftp;
     type Error = Error;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let mut session = SessionBuilder::default();
 
         if let Some(user) = &self.user {
@@ -140,14 +149,14 @@ impl bb8::ManageConnection for Manager {
     }
 
     // Check if connect valid by checking the root path.
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
-
-        Ok(())
-    }
-
-    /// Always allow reuse conn.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.fs().metadata("./").await {
+            Ok(_) => Ok(()),
+            Err(e) => Err(parse_sftp_error(e)),
+        }
     }
 }
diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs
index 781f27dd7..676389123 100644
--- a/core/src/services/sftp/reader.rs
+++ b/core/src/services/sftp/reader.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::BytesMut;
+use fastpool::bounded;
 use openssh_sftp_client::file::File;
 
 use super::core::Manager;
@@ -26,7 +26,7 @@ use crate::*;
 
 pub struct SftpReader {
     /// Keep the connection alive while data stream is alive.
-    _conn: PooledConnection<'static, Manager>,
+    _conn: bounded::Object<Manager>,
 
     file: File,
     chunk: usize,
@@ -36,7 +36,7 @@ pub struct SftpReader {
 }
 
 impl SftpReader {
-    pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: 
Option<u64>) -> Self {
+    pub fn new(conn: bounded::Object<Manager>, file: File, size: Option<u64>) 
-> Self {
         Self {
             _conn: conn,
             file,


Reply via email to