This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch bb8-to-fastpool
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 3debc8f621ae7d88fde5825289561f48a3a6b13b
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 19:29:01 2025 +0800

    for sftp
    
    Signed-off-by: tison <[email protected]>
---
 core/src/services/sftp/backend.rs |  20 +++----
 core/src/services/sftp/core.rs    | 115 ++++++++++++++++++++------------------
 core/src/services/sftp/reader.rs  |   6 +-
 3 files changed, 73 insertions(+), 68 deletions(-)

diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index ca2e17a81..384c0b3e4 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -21,7 +21,6 @@ use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
-use mea::once::OnceCell;
 use openssh::KnownHosts;
 use tokio::io::AsyncSeekExt;
 
@@ -187,17 +186,14 @@ impl Builder for SftpBuilder {
                 ..Default::default()
             });
 
-        let accessor_info = Arc::new(info);
-        let core = Arc::new(SftpCore {
-            info: accessor_info,
+        let core = Arc::new(SftpCore::new(
+            Arc::new(info),
             endpoint,
             root,
             user,
-            key: self.config.key.clone(),
+            self.config.key.clone(),
             known_hosts_strategy,
-
-            client: OnceCell::new(),
-        });
+        ));
 
         debug!("sftp backend finished: {:?}", &self);
         Ok(SftpBackend { core })
@@ -319,11 +315,11 @@ impl Access for SftpBackend {
         let dir = match fs.open_dir(&file_path).await {
             Ok(dir) => dir,
             Err(e) => {
-                if is_not_found(&e) {
-                    return Ok((RpList::default(), None));
+                return if is_not_found(&e) {
+                    Ok((RpList::default(), None))
                 } else {
-                    return Err(parse_sftp_error(e));
-                }
+                    Err(parse_sftp_error(e))
+                };
             }
         }
         .read_dir();
diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs
index 327cf57af..2520a03d9 100644
--- a/core/src/services/sftp/core.rs
+++ b/core/src/services/sftp/core.rs
@@ -15,35 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::path::Path;
-use std::path::PathBuf;
-use std::sync::Arc;
-
-use bb8::PooledConnection;
-use bb8::RunError;
-use log::debug;
-use mea::once::OnceCell;
-use openssh::KnownHosts;
-use openssh::SessionBuilder;
-use openssh_sftp_client::Sftp;
-use openssh_sftp_client::SftpOptions;
-
 use super::error::is_sftp_protocol_error;
 use super::error::parse_sftp_error;
 use super::error::parse_ssh_error;
 use crate::raw::*;
 use crate::*;
+use fastpool::{ManageObject, ObjectStatus, bounded};
+use log::debug;
+use openssh::KnownHosts;
+use openssh::SessionBuilder;
+use openssh_sftp_client::Sftp;
+use openssh_sftp_client::SftpOptions;
+use std::fmt::Debug;
+use std::path::Path;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
 
 pub struct SftpCore {
     pub info: Arc<AccessorInfo>,
     pub endpoint: String,
     pub root: String,
-    pub user: Option<String>,
-    pub key: Option<String>,
-    pub known_hosts_strategy: KnownHosts,
-
-    pub client: OnceCell<bb8::Pool<Manager>>,
+    client: Arc<bounded::Pool<Manager>>,
 }
 
 impl Debug for SftpCore {
@@ -56,29 +49,45 @@ impl Debug for SftpCore {
 }
 
 impl SftpCore {
-    pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> {
-        let client = self
-            .client
-            .get_or_try_init(|| async {
-                bb8::Pool::builder()
-                    .max_size(64)
-                    .build(Manager {
-                        endpoint: self.endpoint.clone(),
-                        root: self.root.clone(),
-                        user: self.user.clone(),
-                        key: self.key.clone(),
-                        known_hosts_strategy: 
self.known_hosts_strategy.clone(),
-                    })
-                    .await
-            })
-            .await?;
-
-        client.get_owned().await.map_err(|err| match err {
-            RunError::User(err) => err,
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+    pub fn new(
+        info: Arc<AccessorInfo>,
+        endpoint: String,
+        root: String,
+        user: Option<String>,
+        key: Option<String>,
+        known_hosts_strategy: KnownHosts,
+    ) -> Self {
+        let client = bounded::Pool::new(
+            bounded::PoolConfig::new(64),
+            Manager {
+                endpoint: endpoint.clone(),
+                root: root.clone(),
+                user,
+                key,
+                known_hosts_strategy,
+            },
+        );
+
+        SftpCore {
+            info,
+            endpoint,
+            root,
+            client,
+        }
+    }
+
+    pub async fn connect(&self) -> Result<bounded::Object<Manager>> {
+        let fut = self.client.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
+            }
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
-        })
+        }
     }
 }
 
@@ -90,11 +99,11 @@ pub struct Manager {
     known_hosts_strategy: KnownHosts,
 }
 
-impl bb8::ManageConnection for Manager {
-    type Connection = Sftp;
+impl ManageObject for Manager {
+    type Object = Sftp;
     type Error = Error;
 
-    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+    async fn create(&self) -> Result<Self::Object, Self::Error> {
         let mut session = SessionBuilder::default();
 
         if let Some(user) = &self.user {
@@ -140,14 +149,14 @@ impl bb8::ManageConnection for Manager {
     }
 
     // Check if connect valid by checking the root path.
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?;
-
-        Ok(())
-    }
-
-    /// Always allow reuse conn.
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.fs().metadata("./").await {
+            Ok(_) => Ok(()),
+            Err(e) => Err(parse_sftp_error(e)),
+        }
     }
 }
diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs
index 781f27dd7..676389123 100644
--- a/core/src/services/sftp/reader.rs
+++ b/core/src/services/sftp/reader.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bb8::PooledConnection;
 use bytes::BytesMut;
+use fastpool::bounded;
 use openssh_sftp_client::file::File;
 
 use super::core::Manager;
@@ -26,7 +26,7 @@ use crate::*;
 
 pub struct SftpReader {
     /// Keep the connection alive while data stream is alive.
-    _conn: PooledConnection<'static, Manager>,
+    _conn: bounded::Object<Manager>,
 
     file: File,
     chunk: usize,
@@ -36,7 +36,7 @@ pub struct SftpReader {
 }
 
 impl SftpReader {
-    pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: 
Option<u64>) -> Self {
+    pub fn new(conn: bounded::Object<Manager>, file: File, size: Option<u64>) 
-> Self {
         Self {
             _conn: conn,
             file,

Reply via email to