This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch fix-etcd-invalid-auth
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 0a1ad3c48848d3977d0b7a3de1361da3cc067481
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 21:19:46 2023 +0800

    feat: Enable etcd connection pool
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/Cargo.toml                   |  2 +-
 core/src/services/etcd/backend.rs | 51 ++++++++++++++++++++++++++++++++++-----
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index c2cf34eac..74e9512ab 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -119,7 +119,7 @@ services-cos = [
 ]
 services-dashmap = ["dep:dashmap"]
 services-dropbox = []
-services-etcd = ["dep:etcd-client"]
+services-etcd = ["dep:etcd-client", "dep:bb8"]
 services-foundationdb = ["dep:foundationdb"]
 services-fs = ["tokio/fs"]
 services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"]
diff --git a/core/src/services/etcd/backend.rs 
b/core/src/services/etcd/backend.rs
index e5a0334be..c26af3f00 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -20,6 +20,7 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 
 use async_trait::async_trait;
+use bb8::{PooledConnection, RunError};
 use etcd_client::Certificate;
 use etcd_client::Client;
 use etcd_client::ConnectOptions;
@@ -230,11 +231,37 @@ impl EtcdBuilder {
 /// Backend for etcd services.
 pub type EtcdBackend = kv::Backend<Adapter>;
 
+#[derive(Clone)]
+pub struct Manager {
+    endpoints: Vec<String>,
+    options: ConnectOptions,
+}
+
+#[async_trait]
+impl bb8::ManageConnection for Manager {
+    type Connection = Client;
+    type Error = Error;
+
+    async fn connect(&self) -> std::result::Result<Self::Connection, 
Self::Error> {
+        Ok(Client::connect(self.endpoints.clone(), 
Some(self.options.clone())).await?)
+    }
+
+    async fn is_valid(&self, conn: &mut Self::Connection) -> 
std::result::Result<(), Self::Error> {
+        let _ = conn.status().await?;
+        Ok(())
+    }
+
+    /// Always allow reuse conn.
+    fn has_broken(&self, _: &mut Self::Connection) -> bool {
+        false
+    }
+}
+
 #[derive(Clone)]
 pub struct Adapter {
     endpoints: Vec<String>,
-    client: OnceCell<Client>,
     options: ConnectOptions,
+    client: OnceCell<bb8::Pool<Manager>>,
 }
 
 // implement `Debug` manually, or password may be leaked.
@@ -249,14 +276,26 @@ impl Debug for Adapter {
 }
 
 impl Adapter {
-    async fn conn(&self) -> Result<Client> {
-        Ok(self
+    async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
+        let client = self
             .client
             .get_or_try_init(|| async {
-                Client::connect(self.endpoints.clone(), 
Some(self.options.clone())).await
+                bb8::Pool::builder()
+                    .max_size(64)
+                    .build(Manager {
+                        endpoints: self.endpoints.clone(),
+                        options: self.options.clone(),
+                    })
+                    .await
             })
-            .await?
-            .clone())
+            .await?;
+
+        client.get_owned().await.map_err(|err| match err {
+            RunError::User(err) => err,
+            RunError::TimedOut => {
+                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+            }
+        })
     }
 }
 

Reply via email to