This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-etcd-invalid-auth in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 0a1ad3c48848d3977d0b7a3de1361da3cc067481 Author: Xuanwo <[email protected]> AuthorDate: Tue Sep 12 21:19:46 2023 +0800 feat: Enable etcd connection pool Signed-off-by: Xuanwo <[email protected]> --- core/Cargo.toml | 2 +- core/src/services/etcd/backend.rs | 51 ++++++++++++++++++++++++++++++++++----- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index c2cf34eac..74e9512ab 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -119,7 +119,7 @@ services-cos = [ ] services-dashmap = ["dep:dashmap"] services-dropbox = [] -services-etcd = ["dep:etcd-client"] +services-etcd = ["dep:etcd-client", "dep:bb8"] services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs"] services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"] diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index e5a0334be..c26af3f00 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -20,6 +20,7 @@ use std::fmt::Debug; use std::fmt::Formatter; use async_trait::async_trait; +use bb8::{PooledConnection, RunError}; use etcd_client::Certificate; use etcd_client::Client; use etcd_client::ConnectOptions; @@ -230,11 +231,37 @@ impl EtcdBuilder { /// Backend for etcd services. pub type EtcdBackend = kv::Backend<Adapter>; +#[derive(Clone)] +pub struct Manager { + endpoints: Vec<String>, + options: ConnectOptions, +} + +#[async_trait] +impl bb8::ManageConnection for Manager { + type Connection = Client; + type Error = Error; + + async fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> { + Ok(Client::connect(self.endpoints.clone(), Some(self.options.clone())).await?) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> std::result::Result<(), Self::Error> { + let _ = conn.status().await?; + Ok(()) + } + + /// Always allow reuse conn. + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + #[derive(Clone)] pub struct Adapter { endpoints: Vec<String>, - client: OnceCell<Client>, options: ConnectOptions, + client: OnceCell<bb8::Pool<Manager>>, } // implement `Debug` manually, or password may be leaked. @@ -249,14 +276,26 @@ impl Debug for Adapter { } impl Adapter { - async fn conn(&self) -> Result<Client> { - Ok(self + async fn conn(&self) -> Result<PooledConnection<'static, Manager>> { + let client = self .client .get_or_try_init(|| async { - Client::connect(self.endpoints.clone(), Some(self.options.clone())).await + bb8::Pool::builder() + .max_size(64) + .build(Manager { + endpoints: self.endpoints.clone(), + options: self.options.clone(), + }) + .await }) - .await? - .clone()) + .await?; + + client.get_owned().await.map_err(|err| match err { + RunError::User(err) => err, + RunError::TimedOut => { + Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary() + } + }) } }
