This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch bb8-to-fastpool in repository https://gitbox.apache.org/repos/asf/opendal.git
commit e7fc638f5ab369231be75d2b79285efab9edd4ce Author: tison <[email protected]> AuthorDate: Tue Nov 25 19:55:50 2025 +0800 for memcached Signed-off-by: tison <[email protected]> --- core/src/services/memcached/backend.rs | 26 ++++------ core/src/services/memcached/binary.rs | 1 + core/src/services/memcached/config.rs | 2 +- core/src/services/memcached/core.rs | 92 +++++++++++++++++----------------- 4 files changed, 56 insertions(+), 65 deletions(-) diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index 22ce701c6..22b5f835a 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -85,7 +85,7 @@ impl MemcachedBuilder { /// /// Will panic if `max_size` is 0. #[must_use] - pub fn connection_pool_max_size(mut self, max_size: u32) -> Self { + pub fn connection_pool_max_size(mut self, max_size: usize) -> Self { assert!(max_size > 0, "max_size must be greater than zero!"); self.config.connection_pool_max_size = Some(max_size); self @@ -144,23 +144,15 @@ impl Builder for MemcachedBuilder { }; let endpoint = format!("{host}:{port}",); - let root = normalize_root( - self.config - .root - .clone() - .unwrap_or_else(|| "/".to_string()) - .as_str(), - ); - - let conn = OnceCell::new(); - Ok(MemcachedBackend::new(MemcachedCore { - conn, + let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str()); + + Ok(MemcachedBackend::new(MemcachedCore::new( endpoint, - username: self.config.username.clone(), - password: self.config.password.clone(), - default_ttl: self.config.default_ttl, - connection_pool_max_size: self.config.connection_pool_max_size, - }) + self.config.username, + self.config.password, + self.config.default_ttl, + self.config.connection_pool_max_size, + )) .with_normalized_root(root)) } } diff --git a/core/src/services/memcached/binary.rs b/core/src/services/memcached/binary.rs index e3db22abc..d433f3fce 100644 --- a/core/src/services/memcached/binary.rs +++ b/core/src/services/memcached/binary.rs @@ -97,6 +97,7 @@ pub struct Response { value: Vec<u8>, } +#[derive(Debug)] pub struct Connection { io: BufReader<TcpStream>, } diff --git a/core/src/services/memcached/config.rs b/core/src/services/memcached/config.rs index 8b82537d7..ac8990a1d 100644 --- a/core/src/services/memcached/config.rs +++ b/core/src/services/memcached/config.rs @@ -46,7 +46,7 @@ pub struct MemcachedConfig { /// The maximum number of connections allowed. /// /// default is 10 - pub connection_pool_max_size: Option<u32>, + pub connection_pool_max_size: Option<usize>, } impl Debug for MemcachedConfig { diff --git a/core/src/services/memcached/core.rs b/core/src/services/memcached/core.rs index add582f80..c88d7b0be 100644 --- a/core/src/services/memcached/core.rs +++ b/core/src/services/memcached/core.rs @@ -15,19 +15,18 @@ // specific language governing permissions and limitations // under the License. +use fastpool::{ManageObject, ObjectStatus, bounded}; +use std::sync::Arc; use std::time::Duration; - -use bb8::RunError; -use mea::once::OnceCell; use tokio::net::TcpStream; use super::binary; use crate::raw::*; use crate::*; -/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`. -#[derive(Clone, Debug)] -pub struct MemcacheConnectionManager { +/// A connection manager for `memcache_async::ascii::Protocol`. +#[derive(Clone)] +struct MemcacheConnectionManager { address: String, username: Option<String>, password: Option<String>, @@ -43,12 +42,12 @@ impl MemcacheConnectionManager { } } -impl bb8::ManageConnection for MemcacheConnectionManager { - type Connection = binary::Connection; +impl ManageObject for MemcacheConnectionManager { + type Object = binary::Connection; type Error = Error; /// TODO: Implement unix stream support. - async fn connect(&self) -> Result<Self::Connection, Self::Error> { + async fn create(&self) -> Result<Self::Object, Self::Error> { let conn = TcpStream::connect(&self.address) .await .map_err(new_std_io_error)?; @@ -60,53 +59,52 @@ impl bb8::ManageConnection for MemcacheConnectionManager { Ok(conn) } - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - conn.version().await.map(|_| ()) - } - - fn has_broken(&self, _: &mut Self::Connection) -> bool { - false + async fn is_recyclable( + &self, + o: &mut Self::Object, + _: &ObjectStatus, + ) -> Result<(), Self::Error> { + match o.version().await { + Ok(_) => Ok(()), + Err(err) => Err(err), + } } } #[derive(Clone, Debug)] pub struct MemcachedCore { - pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>, - pub endpoint: String, - pub username: Option<String>, - pub password: Option<String>, - pub default_ttl: Option<Duration>, - pub connection_pool_max_size: Option<u32>, + default_ttl: Option<Duration>, + conn: Arc<bounded::Pool<MemcacheConnectionManager>>, } impl MemcachedCore { - async fn conn(&self) -> Result<bb8::PooledConnection<'_, MemcacheConnectionManager>> { - let pool = self - .conn - .get_or_try_init(|| async { - let mgr = MemcacheConnectionManager::new( - &self.endpoint, - self.username.clone(), - self.password.clone(), - ); - - bb8::Pool::builder() - .max_size(self.connection_pool_max_size.unwrap_or(10)) - .build(mgr) - .await - .map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "connect to memcached failed") - .set_source(err) - }) - }) - .await?; - - pool.get().await.map_err(|err| match err { - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() + pub fn new( + endpoint: String, + username: Option<String>, + password: Option<String>, + default_ttl: Option<Duration>, + connection_pool_max_size: Option<usize>, + ) -> Self { + let conn = bounded::Pool::new( + bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)), + MemcacheConnectionManager::new(endpoint.as_str(), username, password), + ); + + Self { default_ttl, conn } + } + + async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>> { + let fut = self.conn.get(); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()) } - RunError::User(err) => err, - }) + result = fut => match result { + Ok(conn) => Ok(conn), + Err(err) => Err(err), + } + } } pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
