This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch bb8-to-fastpool
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 8105434d3577fceda07d071ee665270f45b25437
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 19:03:55 2025 +0800

    chore: use fastpool as object pool impl
    
    Signed-off-by: tison <[email protected]>
---
 core/Cargo.lock                   |  23 +++++----
 core/Cargo.toml                   |  12 ++---
 core/src/services/etcd/backend.rs |  10 +---
 core/src/services/etcd/core.rs    | 100 ++++++++++++++++++++------------------
 core/src/services/ftp/backend.rs  |  20 +++-----
 core/src/services/ftp/core.rs     |  74 +++++++++++++---------------
 core/src/services/ftp/deleter.rs  |   4 +-
 core/src/services/ftp/err.rs      |   2 +-
 core/src/services/ftp/reader.rs   |  12 ++---
 core/src/services/ftp/writer.rs   |  14 +++---
 10 files changed, 130 insertions(+), 141 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index 7ab3d1485..d07ed620c 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -1156,17 +1156,6 @@ version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba"
 
-[[package]]
-name = "bb8"
-version = "0.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5"
-dependencies = [
- "futures-util",
- "parking_lot 0.12.5",
- "tokio",
-]
-
 [[package]]
 name = "bcrypt"
 version = "0.15.1"
@@ -2885,6 +2874,16 @@ dependencies = [
  "paste",
 ]
 
+[[package]]
+name = "fastpool"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a6777b4743839a42fd32141d95d7adc4c98e3a1d5200a2598cf32ffd102c81e1"
+dependencies = [
+ "mea",
+ "scopeguard",
+]
+
 [[package]]
 name = "fastrace"
 version = "0.7.14"
@@ -5351,7 +5350,6 @@ dependencies = [
  "await-tree",
  "backon",
  "base64 0.22.1",
- "bb8",
  "bytes",
  "cacache",
  "compio",
@@ -5362,6 +5360,7 @@ dependencies = [
  "dotenvy",
  "etcd-client",
  "fastmetrics",
+ "fastpool",
  "fastrace",
  "fastrace-jaeger",
  "flume",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 16224b93b..eccca34ef 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -141,12 +141,12 @@ services-d1 = []
 services-dashmap = ["dep:dashmap"]
 services-dbfs = []
 services-dropbox = []
-services-etcd = ["dep:etcd-client", "dep:bb8"]
+services-etcd = ["dep:etcd-client", "dep:fastpool"]
 services-foundationdb = ["dep:foundationdb"]
 services-fs = ["tokio/fs", "internal-tokio-rt"]
 services-ftp = [
   "dep:suppaftp",
-  "dep:bb8",
+  "dep:fastpool",
   "dep:futures-rustls",
   "dep:rustls-native-certs",
 ]
@@ -167,7 +167,7 @@ services-ipfs = ["dep:prost"]
 services-ipmfs = []
 services-koofr = []
 services-lakefs = []
-services-memcached = ["dep:bb8"]
+services-memcached = ["dep:fastpool"]
 services-memory = []
 services-mini-moka = ["dep:mini-moka"]
 services-moka = ["dep:moka"]
@@ -195,7 +195,7 @@ services-pcloud = []
 services-persy = ["dep:persy", "internal-tokio-rt"]
 services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
 services-redb = ["dep:redb", "internal-tokio-rt"]
-services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
+services-redis = ["dep:redis", "dep:fastpool", "redis?/tokio-rustls-comp"]
 services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
 services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
 services-s3 = [
@@ -206,7 +206,7 @@ services-s3 = [
   "dep:crc32c",
 ]
 services-seafile = []
-services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
+services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:fastpool"]
 services-sled = ["dep:sled", "internal-tokio-rt"]
 services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"]
 services-surrealdb = ["dep:surrealdb"]
@@ -272,7 +272,7 @@ rand = { version = "0.8", optional = true }
 
 # Services
 # general dependencies.
-bb8 = { version = "0.9", optional = true }
+fastpool = { version = "1.0.2", optional = true }
 prost = { version = "0.13", optional = true }
 sha1 = { version = "0.10.6", optional = true }
 sha2 = { version = "0.10", optional = true }
diff --git a/core/src/services/etcd/backend.rs 
b/core/src/services/etcd/backend.rs
index 8a9d916f0..317345240 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -21,7 +21,6 @@ use etcd_client::Certificate;
 use etcd_client::ConnectOptions;
 use etcd_client::Identity;
 use etcd_client::TlsOptions;
-use mea::once::OnceCell;
 
 use super::ETCD_SCHEME;
 use super::config::EtcdConfig;
@@ -158,14 +157,7 @@ impl Builder for EtcdBuilder {
                 .as_str(),
         );
 
-        let client = OnceCell::new();
-
-        let core = EtcdCore {
-            endpoints,
-            client,
-            options,
-        };
-
+        let core = EtcdCore::new(endpoints, options);
         Ok(EtcdBackend::new(core, &root))
     }
 }
diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs
index fcb07a73d..a2fa54b6f 100644
--- a/core/src/services/etcd/core.rs
+++ b/core/src/services/etcd/core.rs
@@ -15,56 +15,54 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
-use etcd_client::Client;
-use etcd_client::ConnectOptions;
-use mea::once::OnceCell;
-
 use crate::services::etcd::error::format_etcd_error;
 use crate::{Buffer, Error, ErrorKind, Result};
+use etcd_client::Client;
+use etcd_client::ConnectOptions;
+use fastpool::ManageObject;
+use fastpool::ObjectStatus;
+use fastpool::bounded;
+use std::fmt::Debug;
+use std::sync::Arc;
+use std::time::Duration;
 
 pub mod constants {
     pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379";;
 }
 
 #[derive(Clone)]
-pub struct Manager {
+struct Manager {
     endpoints: Vec<String>,
     options: ConnectOptions,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = Client;
+impl ManageObject for Manager {
+    type Object = Client;
     type Error = Error;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
-        let conn = Client::connect(self.endpoints.clone(), 
Some(self.options.clone()))
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
+        Client::connect(self.endpoints.clone(), Some(self.options.clone()))
             .await
-            .map_err(format_etcd_error)?;
-
-        Ok(conn)
-    }
-
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let _ = conn.status().await.map_err(format_etcd_error)?;
-        Ok(())
+            .map_err(format_etcd_error)
     }
 
-    /// Always allow reuse conn.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.status().await {
+            Ok(_) => Ok(()),
+            Err(err) => Err(format_etcd_error(err)),
+        }
     }
 }
 
 #[derive(Clone)]
 pub struct EtcdCore {
-    pub endpoints: Vec<String>,
-    pub options: ConnectOptions,
-    pub client: OnceCell<Pool<Manager>>,
+    endpoints: Vec<String>,
+    options: ConnectOptions,
+    client: Arc<bounded::Pool<Manager>>,
 }
 
 impl Debug for EtcdCore {
@@ -77,26 +75,34 @@ impl Debug for EtcdCore {
 }
 
 impl EtcdCore {
-    pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
-        let client = self
-            .client
-            .get_or_try_init(|| async {
-                Pool::builder()
-                    .max_size(64)
-                    .build(Manager {
-                        endpoints: self.endpoints.clone(),
-                        options: self.options.clone(),
-                    })
-                    .await
-            })
-            .await?;
-
-        client.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => err,
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self {
+        let client = bounded::Pool::new(
+            bounded::PoolConfig::new(64),
+            Manager {
+                endpoints: endpoints.clone(),
+                options: options.clone(),
+            },
+        );
+
+        Self {
+            endpoints,
+            options,
+            client,
+        }
+    }
+
+    pub async fn conn(&self) -> Result<bounded::Object<Manager>> {
+        let fut = self.client.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
-        })
+        }
     }
 
     pub async fn has_prefix(&self, prefix: &str) -> Result<bool> {
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index 0b0c87b13..555487717 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
 
 use http::Uri;
 use log::debug;
-use mea::once::OnceCell;
 use services::ftp::core::Manager;
 use suppaftp::FtpError;
 use suppaftp::Status;
@@ -32,7 +31,7 @@ use super::FTP_SCHEME;
 use super::config::FtpConfig;
 use super::core::FtpCore;
 use super::deleter::FtpDeleter;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use super::lister::FtpLister;
 use super::reader::FtpReader;
 use super::writer::FtpWriter;
@@ -165,6 +164,7 @@ impl Builder for FtpBuilder {
 
                 ..Default::default()
             });
+
         let manager = Manager {
             endpoint: endpoint.clone(),
             root: root.clone(),
@@ -172,12 +172,8 @@ impl Builder for FtpBuilder {
             password: password.clone(),
             enable_secure,
         };
-        let core = Arc::new(FtpCore {
-            info: accessor_info.into(),
-            manager,
-            pool: OnceCell::new(),
-        });
 
+        let core = Arc::new(FtpCore::new(accessor_info.into(), 
manager.clone()));
         Ok(FtpBackend { core })
     }
 }
@@ -201,7 +197,7 @@ impl Access for FtpBackend {
     type Deleter = oio::OneShotDeleter<FtpDeleter>;
 
     fn info(&self) -> Arc<AccessorInfo> {
-        self.core.info.clone()
+        self.core.info()
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
@@ -221,7 +217,7 @@ impl Access for FtpBackend {
                 }))
                 | Ok(()) => (),
                 Err(e) => {
-                    return Err(parse_error(e));
+                    return Err(format_ftp_error(e));
                 }
             }
         }
@@ -277,7 +273,7 @@ impl Access for FtpBackend {
                 }))
                 | Ok(()) => (),
                 Err(e) => {
-                    return Err(parse_error(e));
+                    return Err(format_ftp_error(e));
                 }
             }
         }
@@ -299,7 +295,7 @@ impl Access for FtpBackend {
         let mut ftp_stream = self.core.ftp_connect(Operation::List).await?;
 
         let pathname = if path == "/" { None } else { Some(path) };
-        let files = ftp_stream.list(pathname).await.map_err(parse_error)?;
+        let files = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
 
         Ok((
             RpList::default(),
@@ -316,7 +312,7 @@ impl FtpBackend {
 
         let pathname = if parent == "/" { None } else { Some(parent) };
 
-        let resp = ftp_stream.list(pathname).await.map_err(parse_error)?;
+        let resp = ftp_stream.list(pathname).await.map_err(format_ftp_error)?;
 
         // Get stat of file.
         let mut files = resp
diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs
index 42d427096..afe227add 100644
--- a/core/src/services/ftp/core.rs
+++ b/core/src/services/ftp/core.rs
@@ -15,14 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
-use bb8::Pool;
-use bb8::PooledConnection;
-use bb8::RunError;
+use fastpool::{ManageObject, ObjectStatus, bounded};
 use futures_rustls::TlsConnector;
-use mea::once::OnceCell;
 use raw::Operation;
+use std::sync::Arc;
+use std::time::Duration;
 use suppaftp::AsyncRustlsConnector;
 use suppaftp::AsyncRustlsFtpStream;
 use suppaftp::FtpError;
@@ -31,35 +28,37 @@ use suppaftp::Status;
 use suppaftp::rustls::ClientConfig;
 use suppaftp::types::FileType;
 
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::AccessorInfo;
 use crate::*;
 
 pub struct FtpCore {
-    pub info: Arc<AccessorInfo>,
-    pub manager: Manager,
-    pub pool: OnceCell<Pool<Manager>>,
+    info: Arc<AccessorInfo>,
+    pool: Arc<bounded::Pool<Manager>>,
 }
 
 impl FtpCore {
-    pub async fn ftp_connect(&self, _: Operation) -> 
Result<PooledConnection<'static, Manager>> {
-        let pool = self
-            .pool
-            .get_or_try_init(|| async {
-                Pool::builder()
-                    .max_size(64)
-                    .build(self.manager.clone())
-                    .await
-            })
-            .await
-            .map_err(parse_error)?;
-
-        pool.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => parse_error(err),
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self {
+        let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager);
+        Self { info, pool }
+    }
+
+    pub fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
+    }
+
+    pub async fn ftp_connect(&self, _: Operation) -> 
Result<bounded::Object<Manager>> {
+        let fut = self.pool.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(format_ftp_error(err)),
             }
-        })
+        }
     }
 }
 
@@ -72,11 +71,11 @@ pub struct Manager {
     pub enable_secure: bool,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = AsyncRustlsFtpStream;
+impl ManageObject for Manager {
+    type Object = AsyncRustlsFtpStream;
     type Error = FtpError;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?;
         // switch to secure mode if ssl/tls is on.
         let mut ftp_stream = if self.enable_secure {
@@ -123,14 +122,11 @@ impl bb8::ManageConnection for Manager {
         Ok(ftp_stream)
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        conn.noop().await
-    }
-
-    /// Don't allow reuse conn.
-    ///
-    /// We need to investigate why reuse conn will cause error.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        true
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        o.noop().await
     }
 }
diff --git a/core/src/services/ftp/deleter.rs b/core/src/services/ftp/deleter.rs
index 38a5b9fdd..10b688912 100644
--- a/core/src/services/ftp/deleter.rs
+++ b/core/src/services/ftp/deleter.rs
@@ -22,7 +22,7 @@ use suppaftp::Status;
 use suppaftp::types::Response;
 
 use super::core::FtpCore;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
@@ -53,7 +53,7 @@ impl oio::OneShotDelete for FtpDeleter {
             }))
             | Ok(_) => (),
             Err(e) => {
-                return Err(parse_error(e));
+                return Err(format_ftp_error(e));
             }
         }
 
diff --git a/core/src/services/ftp/err.rs b/core/src/services/ftp/err.rs
index eac889880..a2ea33674 100644
--- a/core/src/services/ftp/err.rs
+++ b/core/src/services/ftp/err.rs
@@ -21,7 +21,7 @@ use suppaftp::Status;
 use crate::Error;
 use crate::ErrorKind;
 
-pub(super) fn parse_error(err: FtpError) -> Error {
+pub(super) fn format_ftp_error(err: FtpError) -> Error {
     let (kind, retryable) = match err {
         // Allow retry for error
         //
diff --git a/core/src/services/ftp/reader.rs b/core/src/services/ftp/reader.rs
index bebc53d8f..144b03d2b 100644
--- a/core/src/services/ftp/reader.rs
+++ b/core/src/services/ftp/reader.rs
@@ -15,19 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::BytesMut;
+use fastpool::bounded;
 use futures::AsyncRead;
 use futures::AsyncReadExt;
 
 use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct FtpReader {
     /// Keep the connection alive while data stream is alive.
-    _ftp_stream: PooledConnection<'static, Manager>,
+    _ftp_stream: bounded::Object<Manager>,
 
     data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>,
     chunk: usize,
@@ -41,7 +41,7 @@ unsafe impl Sync for FtpReader {}
 
 impl FtpReader {
     pub async fn new(
-        mut ftp_stream: PooledConnection<'static, Manager>,
+        mut ftp_stream: bounded::Object<Manager>,
         path: String,
         args: OpRead,
     ) -> Result<Self> {
@@ -53,12 +53,12 @@ impl FtpReader {
             ftp_stream
                 .resume_transfer(offset as usize)
                 .await
-                .map_err(parse_error)?;
+                .map_err(format_ftp_error)?;
         }
         let ds = ftp_stream
             .retr_as_stream(path)
             .await
-            .map_err(parse_error)?
+            .map_err(format_ftp_error)?
             .take(size as _);
 
         Ok(Self {
diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs
index 52db47378..838e0dda7 100644
--- a/core/src/services/ftp/writer.rs
+++ b/core/src/services/ftp/writer.rs
@@ -15,20 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::Buf;
+use fastpool::bounded;
 use futures::AsyncWrite;
 use futures::AsyncWriteExt;
 
 use super::core::Manager;
-use super::err::parse_error;
+use super::err::format_ftp_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct FtpWriter {
     target_path: String,
     tmp_path: Option<String>,
-    ftp_stream: PooledConnection<'static, Manager>,
+    ftp_stream: bounded::Object<Manager>,
     data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>,
 }
 
@@ -44,7 +44,7 @@ unsafe impl Sync for FtpWriter {}
 /// After we can use data stream, we should return it directly.
 impl FtpWriter {
     pub fn new(
-        ftp_stream: PooledConnection<'static, Manager>,
+        ftp_stream: bounded::Object<Manager>,
         target_path: String,
         tmp_path: Option<String>,
     ) -> Self {
@@ -70,7 +70,7 @@ impl oio::Write for FtpWriter {
                 self.ftp_stream
                     .append_with_stream(path)
                     .await
-                    .map_err(parse_error)?,
+                    .map_err(format_ftp_error)?,
             ));
         }
 
@@ -100,13 +100,13 @@ impl oio::Write for FtpWriter {
             self.ftp_stream
                 .finalize_put_stream(data_stream)
                 .await
-                .map_err(parse_error)?;
+                .map_err(format_ftp_error)?;
 
             if let Some(tmp_path) = &self.tmp_path {
                 self.ftp_stream
                     .rename(tmp_path, &self.target_path)
                     .await
-                    .map_err(parse_error)?;
+                    .map_err(format_ftp_error)?;
             }
         }
 

Reply via email to