This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch bb8-to-fastpool in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 8105434d3577fceda07d071ee665270f45b25437 Author: tison <[email protected]> AuthorDate: Tue Nov 25 19:03:55 2025 +0800 chore: use fastpool as object pool impl Signed-off-by: tison <[email protected]> --- core/Cargo.lock | 23 +++++---- core/Cargo.toml | 12 ++--- core/src/services/etcd/backend.rs | 10 +--- core/src/services/etcd/core.rs | 100 ++++++++++++++++++++------------------ core/src/services/ftp/backend.rs | 20 +++----- core/src/services/ftp/core.rs | 74 +++++++++++++--------------- core/src/services/ftp/deleter.rs | 4 +- core/src/services/ftp/err.rs | 2 +- core/src/services/ftp/reader.rs | 12 ++--- core/src/services/ftp/writer.rs | 14 +++--- 10 files changed, 130 insertions(+), 141 deletions(-) diff --git a/core/Cargo.lock b/core/Cargo.lock index 7ab3d1485..d07ed620c 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1156,17 +1156,6 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" -[[package]] -name = "bb8" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d8b8e1a22743d9241575c6ba822cf9c8fef34771c86ab7e477a4fbfd254e5" -dependencies = [ - "futures-util", - "parking_lot 0.12.5", - "tokio", -] - [[package]] name = "bcrypt" version = "0.15.1" @@ -2885,6 +2874,16 @@ dependencies = [ "paste", ] +[[package]] +name = "fastpool" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6777b4743839a42fd32141d95d7adc4c98e3a1d5200a2598cf32ffd102c81e1" +dependencies = [ + "mea", + "scopeguard", +] + [[package]] name = "fastrace" version = "0.7.14" @@ -5351,7 +5350,6 @@ dependencies = [ "await-tree", "backon", "base64 0.22.1", - "bb8", "bytes", "cacache", "compio", @@ -5362,6 +5360,7 @@ dependencies = [ "dotenvy", "etcd-client", "fastmetrics", + "fastpool", "fastrace", "fastrace-jaeger", "flume", diff --git a/core/Cargo.toml b/core/Cargo.toml index 16224b93b..eccca34ef 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -141,12 +141,12 @@ services-d1 = [] services-dashmap = ["dep:dashmap"] services-dbfs = [] services-dropbox = [] -services-etcd = ["dep:etcd-client", "dep:bb8"] +services-etcd = ["dep:etcd-client", "dep:fastpool"] services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs", "internal-tokio-rt"] services-ftp = [ "dep:suppaftp", - "dep:bb8", + "dep:fastpool", "dep:futures-rustls", "dep:rustls-native-certs", ] @@ -167,7 +167,7 @@ services-ipfs = ["dep:prost"] services-ipmfs = [] services-koofr = [] services-lakefs = [] -services-memcached = ["dep:bb8"] +services-memcached = ["dep:fastpool"] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] @@ -195,7 +195,7 @@ services-pcloud = [] services-persy = ["dep:persy", "internal-tokio-rt"] services-postgresql = ["dep:sqlx", "sqlx?/postgres"] services-redb = ["dep:redb", "internal-tokio-rt"] -services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"] +services-redis = ["dep:redis", "dep:fastpool", "redis?/tokio-rustls-comp"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"] services-s3 = [ @@ -206,7 +206,7 @@ services-s3 = [ "dep:crc32c", ] services-seafile = [] -services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"] +services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:fastpool"] services-sled = ["dep:sled", "internal-tokio-rt"] services-sqlite = ["dep:sqlx", "sqlx?/sqlite", "dep:ouroboros"] services-surrealdb = ["dep:surrealdb"] @@ -272,7 +272,7 @@ rand = { version = "0.8", optional = true } # Services # general dependencies. -bb8 = { version = "0.9", optional = true } +fastpool = { version = "1.0.2", optional = true } prost = { version = "0.13", optional = true } sha1 = { version = "0.10.6", optional = true } sha2 = { version = "0.10", optional = true } diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 8a9d916f0..317345240 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -21,7 +21,6 @@ use etcd_client::Certificate; use etcd_client::ConnectOptions; use etcd_client::Identity; use etcd_client::TlsOptions; -use mea::once::OnceCell; use super::ETCD_SCHEME; use super::config::EtcdConfig; @@ -158,14 +157,7 @@ impl Builder for EtcdBuilder { .as_str(), ); - let client = OnceCell::new(); - - let core = EtcdCore { - endpoints, - client, - options, - }; - + let core = EtcdCore::new(endpoints, options); Ok(EtcdBackend::new(core, &root)) } } diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs index fcb07a73d..a2fa54b6f 100644 --- a/core/src/services/etcd/core.rs +++ b/core/src/services/etcd/core.rs @@ -15,56 +15,54 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; - -use bb8::Pool; -use bb8::PooledConnection; -use bb8::RunError; -use etcd_client::Client; -use etcd_client::ConnectOptions; -use mea::once::OnceCell; - use crate::services::etcd::error::format_etcd_error; use crate::{Buffer, Error, ErrorKind, Result}; +use etcd_client::Client; +use etcd_client::ConnectOptions; +use fastpool::ManageObject; +use fastpool::ObjectStatus; +use fastpool::bounded; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; pub mod constants { pub const DEFAULT_ETCD_ENDPOINTS: &str = "http://127.0.0.1:2379"; } #[derive(Clone)] -pub struct Manager { +struct Manager { endpoints: Vec<String>, options: ConnectOptions, } -impl bb8::ManageConnection for Manager { - type Connection = Client; +impl ManageObject for Manager { + type Object = Client; type Error = Error; - async fn connect(&self) -> Result<Self::Connection, Self::Error> { - let conn = Client::connect(self.endpoints.clone(), Some(self.options.clone())) + async fn create(&self) -> Result<Self::Object, Self::Error> { + Client::connect(self.endpoints.clone(), Some(self.options.clone())) .await - .map_err(format_etcd_error)?; - - Ok(conn) - } - - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - let _ = conn.status().await.map_err(format_etcd_error)?; - Ok(()) + .map_err(format_etcd_error) } - /// Always allow reuse conn. - fn has_broken(&self, _: &mut Self::Connection) -> bool { - false + async fn is_recyclable( + &self, + o: &mut Self::Object, + _: &ObjectStatus, + ) -> Result<(), Self::Error> { + match o.status().await { + Ok(_) => Ok(()), + Err(err) => Err(format_etcd_error(err)), + } } } #[derive(Clone)] pub struct EtcdCore { - pub endpoints: Vec<String>, - pub options: ConnectOptions, - pub client: OnceCell<Pool<Manager>>, + endpoints: Vec<String>, + options: ConnectOptions, + client: Arc<bounded::Pool<Manager>>, } impl Debug for EtcdCore { @@ -77,26 +75,34 @@ impl Debug for EtcdCore { } impl EtcdCore { - pub async fn conn(&self) -> Result<PooledConnection<'static, Manager>> { - let client = self - .client - .get_or_try_init(|| async { - Pool::builder() - .max_size(64) - .build(Manager { - endpoints: self.endpoints.clone(), - options: self.options.clone(), - }) - .await - }) - .await?; - - client.get_owned().await.map_err(|err| match err { - RunError::User(err) => err, - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + pub fn new(endpoints: Vec<String>, options: ConnectOptions) -> Self { + let client = bounded::Pool::new( + bounded::PoolConfig::new(64), + Manager { + endpoints: endpoints.clone(), + options: options.clone(), + }, + ); + + Self { + endpoints, + options, + client, + } + } + + pub async fn conn(&self) -> Result<bounded::Object<Manager>> { + let fut = self.client.get(); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()) + } + result = fut => match result { + Ok(conn) => Ok(conn), + Err(err) => Err(err), } - }) + } } pub async fn has_prefix(&self, prefix: &str) -> Result<bool> { diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 0b0c87b13..555487717 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use http::Uri; use log::debug; -use mea::once::OnceCell; use services::ftp::core::Manager; use suppaftp::FtpError; use suppaftp::Status; @@ -32,7 +31,7 @@ use super::FTP_SCHEME; use super::config::FtpConfig; use super::core::FtpCore; use super::deleter::FtpDeleter; -use super::err::parse_error; +use super::err::format_ftp_error; use super::lister::FtpLister; use super::reader::FtpReader; use super::writer::FtpWriter; @@ -165,6 +164,7 @@ impl Builder for FtpBuilder { ..Default::default() }); + let manager = Manager { endpoint: endpoint.clone(), root: root.clone(), @@ -172,12 +172,8 @@ impl Builder for FtpBuilder { password: password.clone(), enable_secure, }; - let core = Arc::new(FtpCore { - info: accessor_info.into(), - manager, - pool: OnceCell::new(), - }); + let core = Arc::new(FtpCore::new(accessor_info.into(), manager.clone())); Ok(FtpBackend { core }) } } @@ -201,7 +197,7 @@ impl Access for FtpBackend { type Deleter = oio::OneShotDeleter<FtpDeleter>; fn info(&self) -> Arc<AccessorInfo> { - self.core.info.clone() + self.core.info() } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { @@ -221,7 +217,7 @@ impl Access for FtpBackend { })) | Ok(()) => (), Err(e) => { - return Err(parse_error(e)); + return Err(format_ftp_error(e)); } } } @@ -277,7 +273,7 @@ impl Access for FtpBackend { })) | Ok(()) => (), Err(e) => { - return Err(parse_error(e)); + return Err(format_ftp_error(e)); } } } @@ -299,7 +295,7 @@ impl Access for FtpBackend { let mut ftp_stream = self.core.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; - let files = ftp_stream.list(pathname).await.map_err(parse_error)?; + let files = ftp_stream.list(pathname).await.map_err(format_ftp_error)?; Ok(( RpList::default(), @@ -316,7 +312,7 @@ impl FtpBackend { let pathname = if parent == "/" { None } else { Some(parent) }; - let resp = ftp_stream.list(pathname).await.map_err(parse_error)?; + let resp = ftp_stream.list(pathname).await.map_err(format_ftp_error)?; // Get stat of file. let mut files = resp diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs index 42d427096..afe227add 100644 --- a/core/src/services/ftp/core.rs +++ b/core/src/services/ftp/core.rs @@ -15,14 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use bb8::Pool; -use bb8::PooledConnection; -use bb8::RunError; +use fastpool::{ManageObject, ObjectStatus, bounded}; use futures_rustls::TlsConnector; -use mea::once::OnceCell; use raw::Operation; +use std::sync::Arc; +use std::time::Duration; use suppaftp::AsyncRustlsConnector; use suppaftp::AsyncRustlsFtpStream; use suppaftp::FtpError; @@ -31,35 +28,37 @@ use suppaftp::Status; use suppaftp::rustls::ClientConfig; use suppaftp::types::FileType; -use super::err::parse_error; +use super::err::format_ftp_error; use crate::raw::AccessorInfo; use crate::*; pub struct FtpCore { - pub info: Arc<AccessorInfo>, - pub manager: Manager, - pub pool: OnceCell<Pool<Manager>>, + info: Arc<AccessorInfo>, + pool: Arc<bounded::Pool<Manager>>, } impl FtpCore { - pub async fn ftp_connect(&self, _: Operation) -> Result<PooledConnection<'static, Manager>> { - let pool = self - .pool - .get_or_try_init(|| async { - Pool::builder() - .max_size(64) - .build(self.manager.clone()) - .await - }) - .await - .map_err(parse_error)?; - - pool.get_owned().await.map_err(|err| match err { - RunError::User(err) => parse_error(err), - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + pub fn new(info: Arc<AccessorInfo>, manager: Manager) -> Self { + let pool = bounded::Pool::new(bounded::PoolConfig::new(64), manager); + Self { info, pool } + } + + pub fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() + } + + pub async fn ftp_connect(&self, _: Operation) -> Result<bounded::Object<Manager>> { + let fut = self.pool.get(); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()) + } + result = fut => match result { + Ok(conn) => Ok(conn), + Err(err) => Err(format_ftp_error(err)), } - }) + } } } @@ -72,11 +71,11 @@ pub struct Manager { pub enable_secure: bool, } -impl bb8::ManageConnection for Manager { - type Connection = AsyncRustlsFtpStream; +impl ManageObject for Manager { + type Object = AsyncRustlsFtpStream; type Error = FtpError; - async fn connect(&self) -> Result<Self::Connection, Self::Error> { + async fn create(&self) -> Result<Self::Object, Self::Error> { let stream = ImplAsyncFtpStream::connect(&self.endpoint).await?; // switch to secure mode if ssl/tls is on. let mut ftp_stream = if self.enable_secure { @@ -123,14 +122,11 @@ impl bb8::ManageConnection for Manager { Ok(ftp_stream) } - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - conn.noop().await - } - - /// Don't allow reuse conn. - /// - /// We need to investigate why reuse conn will cause error. - fn has_broken(&self, _: &mut Self::Connection) -> bool { - true + async fn is_recyclable( + &self, + o: &mut Self::Object, + _: &ObjectStatus, + ) -> Result<(), Self::Error> { + o.noop().await } } diff --git a/core/src/services/ftp/deleter.rs b/core/src/services/ftp/deleter.rs index 38a5b9fdd..10b688912 100644 --- a/core/src/services/ftp/deleter.rs +++ b/core/src/services/ftp/deleter.rs @@ -22,7 +22,7 @@ use suppaftp::Status; use suppaftp::types::Response; use super::core::FtpCore; -use super::err::parse_error; +use super::err::format_ftp_error; use crate::raw::*; use crate::*; @@ -53,7 +53,7 @@ impl oio::OneShotDelete for FtpDeleter { })) | Ok(_) => (), Err(e) => { - return Err(parse_error(e)); + return Err(format_ftp_error(e)); } } diff --git a/core/src/services/ftp/err.rs b/core/src/services/ftp/err.rs index eac889880..a2ea33674 100644 --- a/core/src/services/ftp/err.rs +++ b/core/src/services/ftp/err.rs @@ -21,7 +21,7 @@ use suppaftp::Status; use crate::Error; use crate::ErrorKind; -pub(super) fn parse_error(err: FtpError) -> Error { +pub(super) fn format_ftp_error(err: FtpError) -> Error { let (kind, retryable) = match err { // Allow retry for error // diff --git a/core/src/services/ftp/reader.rs b/core/src/services/ftp/reader.rs index bebc53d8f..144b03d2b 100644 --- a/core/src/services/ftp/reader.rs +++ b/core/src/services/ftp/reader.rs @@ -15,19 +15,19 @@ // specific language governing permissions and limitations // under the License. -use bb8::PooledConnection; use bytes::BytesMut; +use fastpool::bounded; use futures::AsyncRead; use futures::AsyncReadExt; use super::core::Manager; -use super::err::parse_error; +use super::err::format_ftp_error; use crate::raw::*; use crate::*; pub struct FtpReader { /// Keep the connection alive while data stream is alive. - _ftp_stream: PooledConnection<'static, Manager>, + _ftp_stream: bounded::Object<Manager>, data_stream: Box<dyn AsyncRead + Sync + Send + Unpin + 'static>, chunk: usize, @@ -41,7 +41,7 @@ unsafe impl Sync for FtpReader {} impl FtpReader { pub async fn new( - mut ftp_stream: PooledConnection<'static, Manager>, + mut ftp_stream: bounded::Object<Manager>, path: String, args: OpRead, ) -> Result<Self> { @@ -53,12 +53,12 @@ impl FtpReader { ftp_stream .resume_transfer(offset as usize) .await - .map_err(parse_error)?; + .map_err(format_ftp_error)?; } let ds = ftp_stream .retr_as_stream(path) .await - .map_err(parse_error)? + .map_err(format_ftp_error)? .take(size as _); Ok(Self { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 52db47378..838e0dda7 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -15,20 +15,20 @@ // specific language governing permissions and limitations // under the License. -use bb8::PooledConnection; use bytes::Buf; +use fastpool::bounded; use futures::AsyncWrite; use futures::AsyncWriteExt; use super::core::Manager; -use super::err::parse_error; +use super::err::format_ftp_error; use crate::raw::*; use crate::*; pub struct FtpWriter { target_path: String, tmp_path: Option<String>, - ftp_stream: PooledConnection<'static, Manager>, + ftp_stream: bounded::Object<Manager>, data_stream: Option<Box<dyn AsyncWrite + Sync + Send + Unpin + 'static>>, } @@ -44,7 +44,7 @@ unsafe impl Sync for FtpWriter {} /// After we can use data stream, we should return it directly. impl FtpWriter { pub fn new( - ftp_stream: PooledConnection<'static, Manager>, + ftp_stream: bounded::Object<Manager>, target_path: String, tmp_path: Option<String>, ) -> Self { @@ -70,7 +70,7 @@ impl oio::Write for FtpWriter { self.ftp_stream .append_with_stream(path) .await - .map_err(parse_error)?, + .map_err(format_ftp_error)?, )); } @@ -100,13 +100,13 @@ impl oio::Write for FtpWriter { self.ftp_stream .finalize_put_stream(data_stream) .await - .map_err(parse_error)?; + .map_err(format_ftp_error)?; if let Some(tmp_path) = &self.tmp_path { self.ftp_stream .rename(tmp_path, &self.target_path) .await - .map_err(parse_error)?; + .map_err(format_ftp_error)?; } }
