This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch bb8-to-fastpool
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit e7fc638f5ab369231be75d2b79285efab9edd4ce
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 19:55:50 2025 +0800

    for memcached
    
    Signed-off-by: tison <[email protected]>
---
 core/src/services/memcached/backend.rs | 26 ++++------
 core/src/services/memcached/binary.rs  |  1 +
 core/src/services/memcached/config.rs  |  2 +-
 core/src/services/memcached/core.rs    | 92 +++++++++++++++++-----------------
 4 files changed, 56 insertions(+), 65 deletions(-)

diff --git a/core/src/services/memcached/backend.rs 
b/core/src/services/memcached/backend.rs
index 22ce701c6..22b5f835a 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -85,7 +85,7 @@ impl MemcachedBuilder {
     ///
     /// Will panic if `max_size` is 0.
     #[must_use]
-    pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
         assert!(max_size > 0, "max_size must be greater than zero!");
         self.config.connection_pool_max_size = Some(max_size);
         self
@@ -144,23 +144,15 @@ impl Builder for MemcachedBuilder {
         };
         let endpoint = format!("{host}:{port}",);
 
-        let root = normalize_root(
-            self.config
-                .root
-                .clone()
-                .unwrap_or_else(|| "/".to_string())
-                .as_str(),
-        );
-
-        let conn = OnceCell::new();
-        Ok(MemcachedBackend::new(MemcachedCore {
-            conn,
+        let root = normalize_root(self.config.root.unwrap_or_else(|| 
"/".to_string()).as_str());
+
+        Ok(MemcachedBackend::new(MemcachedCore::new(
             endpoint,
-            username: self.config.username.clone(),
-            password: self.config.password.clone(),
-            default_ttl: self.config.default_ttl,
-            connection_pool_max_size: self.config.connection_pool_max_size,
-        })
+            self.config.username,
+            self.config.password,
+            self.config.default_ttl,
+            self.config.connection_pool_max_size,
+        ))
         .with_normalized_root(root))
     }
 }
diff --git a/core/src/services/memcached/binary.rs 
b/core/src/services/memcached/binary.rs
index e3db22abc..d433f3fce 100644
--- a/core/src/services/memcached/binary.rs
+++ b/core/src/services/memcached/binary.rs
@@ -97,6 +97,7 @@ pub struct Response {
     value: Vec<u8>,
 }
 
+#[derive(Debug)]
 pub struct Connection {
     io: BufReader<TcpStream>,
 }
diff --git a/core/src/services/memcached/config.rs 
b/core/src/services/memcached/config.rs
index 8b82537d7..ac8990a1d 100644
--- a/core/src/services/memcached/config.rs
+++ b/core/src/services/memcached/config.rs
@@ -46,7 +46,7 @@ pub struct MemcachedConfig {
     /// The maximum number of connections allowed.
     ///
     /// default is 10
-    pub connection_pool_max_size: Option<u32>,
+    pub connection_pool_max_size: Option<usize>,
 }
 
 impl Debug for MemcachedConfig {
diff --git a/core/src/services/memcached/core.rs 
b/core/src/services/memcached/core.rs
index add582f80..c88d7b0be 100644
--- a/core/src/services/memcached/core.rs
+++ b/core/src/services/memcached/core.rs
@@ -15,19 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use std::sync::Arc;
 use std::time::Duration;
-
-use bb8::RunError;
-use mea::once::OnceCell;
 use tokio::net::TcpStream;
 
 use super::binary;
 use crate::raw::*;
 use crate::*;
 
-/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
-#[derive(Clone, Debug)]
-pub struct MemcacheConnectionManager {
+/// A connection manager for `memcache_async::ascii::Protocol`.
+#[derive(Clone)]
+struct MemcacheConnectionManager {
     address: String,
     username: Option<String>,
     password: Option<String>,
@@ -43,12 +42,12 @@ impl MemcacheConnectionManager {
     }
 }
 
-impl bb8::ManageConnection for MemcacheConnectionManager {
-    type Connection = binary::Connection;
+impl ManageObject for MemcacheConnectionManager {
+    type Object = binary::Connection;
     type Error = Error;
 
     /// TODO: Implement unix stream support.
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let conn = TcpStream::connect(&self.address)
             .await
             .map_err(new_std_io_error)?;
@@ -60,53 +59,52 @@ impl bb8::ManageConnection for MemcacheConnectionManager {
         Ok(conn)
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        conn.version().await.map(|_| ())
-    }
-
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.version().await {
+            Ok(_) => Ok(()),
+            Err(err) => Err(err),
+        }
     }
 }
 
 #[derive(Clone, Debug)]
 pub struct MemcachedCore {
-    pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
-    pub endpoint: String,
-    pub username: Option<String>,
-    pub password: Option<String>,
-    pub default_ttl: Option<Duration>,
-    pub connection_pool_max_size: Option<u32>,
+    default_ttl: Option<Duration>,
+    conn: Arc<bounded::Pool<MemcacheConnectionManager>>,
 }
 
 impl MemcachedCore {
-    async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
MemcacheConnectionManager>> {
-        let pool = self
-            .conn
-            .get_or_try_init(|| async {
-                let mgr = MemcacheConnectionManager::new(
-                    &self.endpoint,
-                    self.username.clone(),
-                    self.password.clone(),
-                );
-
-                bb8::Pool::builder()
-                    .max_size(self.connection_pool_max_size.unwrap_or(10))
-                    .build(mgr)
-                    .await
-                    .map_err(|err| {
-                        Error::new(ErrorKind::ConfigInvalid, "connect to 
memcached failed")
-                            .set_source(err)
-                    })
-            })
-            .await?;
-
-        pool.get().await.map_err(|err| match err {
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
+    pub fn new(
+        endpoint: String,
+        username: Option<String>,
+        password: Option<String>,
+        default_ttl: Option<Duration>,
+        connection_pool_max_size: Option<usize>,
+    ) -> Self {
+        let conn = bounded::Pool::new(
+            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+            MemcacheConnectionManager::new(endpoint.as_str(), username, 
password),
+        );
+
+        Self { default_ttl, conn }
+    }
+
+    async fn conn(&self) -> Result<bounded::Object<MemcacheConnectionManager>> 
{
+        let fut = self.conn.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
             }
-            RunError::User(err) => err,
-        })
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
+            }
+        }
     }
 
     pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {

Reply via email to