This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch bb8-to-fastpool in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 3debc8f621ae7d88fde5825289561f48a3a6b13b Author: tison <[email protected]> AuthorDate: Tue Nov 25 19:29:01 2025 +0800 for sftp Signed-off-by: tison <[email protected]> --- core/src/services/sftp/backend.rs | 20 +++---- core/src/services/sftp/core.rs | 115 ++++++++++++++++++++------------------ core/src/services/sftp/reader.rs | 6 +- 3 files changed, 73 insertions(+), 68 deletions(-) diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index ca2e17a81..384c0b3e4 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -21,7 +21,6 @@ use std::path::PathBuf; use std::sync::Arc; use log::debug; -use mea::once::OnceCell; use openssh::KnownHosts; use tokio::io::AsyncSeekExt; @@ -187,17 +186,14 @@ impl Builder for SftpBuilder { ..Default::default() }); - let accessor_info = Arc::new(info); - let core = Arc::new(SftpCore { - info: accessor_info, + let core = Arc::new(SftpCore::new( + Arc::new(info), endpoint, root, user, - key: self.config.key.clone(), + self.config.key.clone(), known_hosts_strategy, - - client: OnceCell::new(), - }); + )); debug!("sftp backend finished: {:?}", &self); Ok(SftpBackend { core }) @@ -319,11 +315,11 @@ impl Access for SftpBackend { let dir = match fs.open_dir(&file_path).await { Ok(dir) => dir, Err(e) => { - if is_not_found(&e) { - return Ok((RpList::default(), None)); + return if is_not_found(&e) { + Ok((RpList::default(), None)) } else { - return Err(parse_sftp_error(e)); - } + Err(parse_sftp_error(e)) + }; } } .read_dir(); diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs index 327cf57af..2520a03d9 100644 --- a/core/src/services/sftp/core.rs +++ b/core/src/services/sftp/core.rs @@ -15,35 +15,28 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::path::Path; -use std::path::PathBuf; -use std::sync::Arc; - -use bb8::PooledConnection; -use bb8::RunError; -use log::debug; -use mea::once::OnceCell; -use openssh::KnownHosts; -use openssh::SessionBuilder; -use openssh_sftp_client::Sftp; -use openssh_sftp_client::SftpOptions; - use super::error::is_sftp_protocol_error; use super::error::parse_sftp_error; use super::error::parse_ssh_error; use crate::raw::*; use crate::*; +use fastpool::{ManageObject, ObjectStatus, bounded}; +use log::debug; +use openssh::KnownHosts; +use openssh::SessionBuilder; +use openssh_sftp_client::Sftp; +use openssh_sftp_client::SftpOptions; +use std::fmt::Debug; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; pub struct SftpCore { pub info: Arc<AccessorInfo>, pub endpoint: String, pub root: String, - pub user: Option<String>, - pub key: Option<String>, - pub known_hosts_strategy: KnownHosts, - - pub client: OnceCell<bb8::Pool<Manager>>, + client: Arc<bounded::Pool<Manager>>, } impl Debug for SftpCore { @@ -56,29 +49,45 @@ impl Debug for SftpCore { } impl SftpCore { - pub async fn connect(&self) -> Result<PooledConnection<'static, Manager>> { - let client = self - .client - .get_or_try_init(|| async { - bb8::Pool::builder() - .max_size(64) - .build(Manager { - endpoint: self.endpoint.clone(), - root: self.root.clone(), - user: self.user.clone(), - key: self.key.clone(), - known_hosts_strategy: self.known_hosts_strategy.clone(), - }) - .await - }) - .await?; - - client.get_owned().await.map_err(|err| match err { - RunError::User(err) => err, - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + pub fn new( + info: Arc<AccessorInfo>, + endpoint: String, + root: String, + user: Option<String>, + key: Option<String>, + known_hosts_strategy: KnownHosts, + ) -> Self { + let client = bounded::Pool::new( + bounded::PoolConfig::new(64), + Manager { + endpoint: endpoint.clone(), + root: root.clone(), + user, + key, + known_hosts_strategy, + }, + ); + + SftpCore { + info, + endpoint, + root, + client, + } + } + + pub async fn connect(&self) -> Result<bounded::Object<Manager>> { + let fut = self.client.get(); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()) + } + result = fut => match result { + Ok(conn) => Ok(conn), + Err(err) => Err(err), } - }) + } } } @@ -90,11 +99,11 @@ pub struct Manager { known_hosts_strategy: KnownHosts, } -impl bb8::ManageConnection for Manager { - type Connection = Sftp; +impl ManageObject for Manager { + type Object = Sftp; type Error = Error; - async fn connect(&self) -> Result<Self::Connection, Self::Error> { + async fn create(&self) -> Result<Self::Object, Self::Error> { let mut session = SessionBuilder::default(); if let Some(user) = &self.user { @@ -140,14 +149,14 @@ impl bb8::ManageConnection for Manager { } // Check if connect valid by checking the root path. - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - let _ = conn.fs().metadata("./").await.map_err(parse_sftp_error)?; - - Ok(()) - } - - /// Always allow reuse conn. - fn has_broken(&self, _: &mut Self::Connection) -> bool { - false + async fn is_recyclable( + &self, + o: &mut Self::Object, + _: &ObjectStatus, + ) -> Result<(), Self::Error> { + match o.fs().metadata("./").await { + Ok(_) => Ok(()), + Err(e) => Err(parse_sftp_error(e)), + } } } diff --git a/core/src/services/sftp/reader.rs b/core/src/services/sftp/reader.rs index 781f27dd7..676389123 100644 --- a/core/src/services/sftp/reader.rs +++ b/core/src/services/sftp/reader.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use bb8::PooledConnection; use bytes::BytesMut; +use fastpool::bounded; use openssh_sftp_client::file::File; use super::core::Manager; @@ -26,7 +26,7 @@ use crate::*; pub struct SftpReader { /// Keep the connection alive while data stream is alive. - _conn: PooledConnection<'static, Manager>, + _conn: bounded::Object<Manager>, file: File, chunk: usize, @@ -36,7 +36,7 @@ pub struct SftpReader { } impl SftpReader { - pub fn new(conn: PooledConnection<'static, Manager>, file: File, size: Option<u64>) -> Self { + pub fn new(conn: bounded::Object<Manager>, file: File, size: Option<u64>) -> Self { Self { _conn: conn, file,
