This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 082ebca8f feat(services/etcd): Enable etcd connection pool (#3041)
082ebca8f is described below

commit 082ebca8f624536967838a9fa904f11e05be5411
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 22:03:39 2023 +0800

    feat(services/etcd): Enable etcd connection pool (#3041)
    
    * feat: Enable etcd connection pool
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix format
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix future resume
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix wirter close is not safe for re-enter
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/Cargo.toml                           |  2 +-
 core/src/raw/adapters/kv/backend.rs       | 91 ++++++++++++++++++-------------
 core/src/raw/adapters/typed_kv/backend.rs | 51 +++++++++++------
 core/src/raw/oio/buf/chunked_bytes.rs     | 16 ++++--
 core/src/services/etcd/backend.rs         | 52 ++++++++++++++++--
 5 files changed, 143 insertions(+), 69 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index c2cf34eac..74e9512ab 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -119,7 +119,7 @@ services-cos = [
 ]
 services-dashmap = ["dep:dashmap"]
 services-dropbox = []
-services-etcd = ["dep:etcd-client"]
+services-etcd = ["dep:etcd-client", "dep:bb8"]
 services-foundationdb = ["dep:foundationdb"]
 services-fs = ["tokio/fs"]
 services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"]
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 2b843ad3e..db82e1cd6 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,6 +21,7 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
+use bytes::{Bytes, BytesMut};
 use futures::future::BoxFuture;
 use futures::FutureExt;
 
@@ -377,8 +378,7 @@ pub struct KvWriter<S> {
     kv: Arc<S>,
     path: String,
 
-    /// TODO: if kv supports append, we can use them directly.
-    buf: Option<Vec<u8>>,
+    buffer: Buffer,
     future: Option<BoxFuture<'static, Result<()>>>,
 }
 
@@ -387,12 +387,17 @@ impl<S> KvWriter<S> {
         KvWriter {
             kv,
             path,
-            buf: None,
+            buffer: Buffer::Active(BytesMut::new()),
             future: None,
         }
     }
 }
 
+enum Buffer {
+    Active(BytesMut),
+    Frozen(Bytes),
+}
+
 /// # Safety
 ///
 /// We will only take `&mut Self` reference for KvWriter.
@@ -400,7 +405,6 @@ unsafe impl<S: Adapter> Sync for KvWriter<S> {}
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    // TODO: we need to support append in the future.
     fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
         if self.future.is_some() {
             self.future = None;
@@ -410,42 +414,33 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
             )));
         }
 
-        let size = bs.chunk().len();
-
-        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
-        buf.extend_from_slice(bs.chunk());
-        self.buf = Some(buf);
-
-        Poll::Ready(Ok(size))
-    }
-
-    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        if self.future.is_some() {
-            self.future = None;
-            return Poll::Ready(Err(Error::new(
-                ErrorKind::Unexpected,
-                "there is a future on going, it's maybe a bug to go into this 
case",
-            )));
+        match &mut self.buffer {
+            Buffer::Active(buf) => {
+                buf.extend_from_slice(bs.chunk());
+                Poll::Ready(Ok(bs.chunk().len()))
+            }
+            Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen 
during poll_write"),
         }
-
-        self.buf = None;
-        Poll::Ready(Ok(()))
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match self.future.as_mut() {
                 Some(fut) => {
-                    ready!(fut.poll_unpin(cx))?;
+                    let res = ready!(fut.poll_unpin(cx));
                     self.future = None;
-                    return Poll::Ready(Ok(()));
+                    return Poll::Ready(res);
                 }
                 None => {
                     let kv = self.kv.clone();
                     let path = self.path.clone();
-                    let buf = match self.buf.take() {
-                        Some(buf) => buf,
-                        None => return Poll::Ready(Ok(())),
+                    let buf = match &mut self.buffer {
+                        Buffer::Active(buf) => {
+                            let buf = buf.split().freeze();
+                            self.buffer = Buffer::Frozen(buf.clone());
+                            buf
+                        }
+                        Buffer::Frozen(buf) => buf.clone(),
                     };
 
                     let fut = async move { kv.set(&path, &buf).await };
@@ -454,25 +449,43 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
             }
         }
     }
+
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
+        self.buffer = Buffer::Active(BytesMut::new());
+        Poll::Ready(Ok(()))
+    }
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
     fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
-        let size = bs.chunk().len();
-
-        let mut buf = self.buf.take().unwrap_or_else(|| 
Vec::with_capacity(size));
-        buf.extend_from_slice(bs.chunk());
-
-        self.buf = Some(buf);
-
-        Ok(size)
+        match &mut self.buffer {
+            Buffer::Active(buf) => {
+                buf.extend_from_slice(bs.chunk());
+                Ok(bs.chunk().len())
+            }
+            Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen 
during poll_write"),
+        }
     }
 
     fn close(&mut self) -> Result<()> {
-        if let Some(buf) = self.buf.as_deref() {
-            self.kv.blocking_set(&self.path, buf)?;
-        }
+        let buf = match &mut self.buffer {
+            Buffer::Active(buf) => {
+                let buf = buf.split().freeze();
+                self.buffer = Buffer::Frozen(buf.clone());
+                buf
+            }
+            Buffer::Frozen(buf) => buf.clone(),
+        };
 
+        self.kv.blocking_set(&self.path, &buf)?;
         Ok(())
     }
 }
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 4a2ae17ab..e04822cdd 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -368,6 +368,7 @@ pub struct KvWriter<S> {
 
     op: OpWrite,
     buf: Option<Vec<u8>>,
+    value: Option<Value>,
     future: Option<BoxFuture<'static, Result<()>>>,
 }
 
@@ -383,6 +384,7 @@ impl<S> KvWriter<S> {
             path,
             op,
             buf: None,
+            value: None,
             future: None,
         }
     }
@@ -412,7 +414,6 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    // TODO: we need to support append in the future.
     fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
         if self.future.is_some() {
             self.future = None;
@@ -432,31 +433,25 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
         Poll::Ready(Ok(size))
     }
 
-    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
-        if self.future.is_some() {
-            self.future = None;
-            return Poll::Ready(Err(Error::new(
-                ErrorKind::Unexpected,
-                "there is a future on going, it's maybe a bug to go into this 
case",
-            )));
-        }
-
-        self.buf = None;
-        Poll::Ready(Ok(()))
-    }
-
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         loop {
             match self.future.as_mut() {
                 Some(fut) => {
-                    ready!(fut.poll_unpin(cx))?;
+                    let res = ready!(fut.poll_unpin(cx));
                     self.future = None;
-                    return Poll::Ready(Ok(()));
+                    return Poll::Ready(res);
                 }
                 None => {
                     let kv = self.kv.clone();
                     let path = self.path.clone();
-                    let value = self.build();
+                    let value = match &self.value {
+                        Some(value) => value.clone(),
+                        None => {
+                            let value = self.build();
+                            self.value = Some(value.clone());
+                            value
+                        }
+                    };
 
                     let fut = async move { kv.set(&path, value).await };
                     self.future = Some(Box::pin(fut));
@@ -464,6 +459,19 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
             }
         }
     }
+
+    fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        if self.future.is_some() {
+            self.future = None;
+            return Poll::Ready(Err(Error::new(
+                ErrorKind::Unexpected,
+                "there is a future on going, it's maybe a bug to go into this 
case",
+            )));
+        }
+
+        self.buf = None;
+        Poll::Ready(Ok(()))
+    }
 }
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
@@ -480,7 +488,14 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
 
     fn close(&mut self) -> Result<()> {
         let kv = self.kv.clone();
-        let value = self.build();
+        let value = match &self.value {
+            Some(value) => value.clone(),
+            None => {
+                let value = self.build();
+                self.value = Some(value.clone());
+                value
+            }
+        };
 
         kv.blocking_set(&self.path, value)?;
         Ok(())
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs 
b/core/src/raw/oio/buf/chunked_bytes.rs
index 810c0dfda..73e94b44c 100644
--- a/core/src/raw/oio/buf/chunked_bytes.rs
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::{Bytes, BytesMut};
-use futures::Stream;
 use std::cmp::min;
 use std::collections::VecDeque;
 use std::io::IoSlice;
 use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use bytes::BytesMut;
+use futures::Stream;
 
 use crate::raw::*;
 use crate::*;
@@ -356,8 +359,11 @@ impl Stream for ChunkedBytes {
 mod tests {
     use log::debug;
     use pretty_assertions::assert_eq;
-    use rand::{thread_rng, Rng, RngCore};
-    use sha2::{Digest, Sha256};
+    use rand::thread_rng;
+    use rand::Rng;
+    use rand::RngCore;
+    use sha2::Digest;
+    use sha2::Sha256;
 
     use super::*;
     use crate::raw::oio::WriteBuf;
diff --git a/core/src/services/etcd/backend.rs 
b/core/src/services/etcd/backend.rs
index e5a0334be..ef318fca9 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -20,6 +20,8 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 
 use async_trait::async_trait;
+use bb8::PooledConnection;
+use bb8::RunError;
 use etcd_client::Certificate;
 use etcd_client::Client;
 use etcd_client::ConnectOptions;
@@ -230,11 +232,37 @@ impl EtcdBuilder {
 /// Backend for etcd services.
 pub type EtcdBackend = kv::Backend<Adapter>;
 
+#[derive(Clone)]
+pub struct Manager {
+    endpoints: Vec<String>,
+    options: ConnectOptions,
+}
+
+#[async_trait]
+impl bb8::ManageConnection for Manager {
+    type Connection = Client;
+    type Error = Error;
+
+    async fn connect(&self) -> std::result::Result<Self::Connection, 
Self::Error> {
+        Ok(Client::connect(self.endpoints.clone(), 
Some(self.options.clone())).await?)
+    }
+
+    async fn is_valid(&self, conn: &mut Self::Connection) -> 
std::result::Result<(), Self::Error> {
+        let _ = conn.status().await?;
+        Ok(())
+    }
+
+    /// Always allow reuse conn.
+    fn has_broken(&self, _: &mut Self::Connection) -> bool {
+        false
+    }
+}
+
 #[derive(Clone)]
 pub struct Adapter {
     endpoints: Vec<String>,
-    client: OnceCell<Client>,
     options: ConnectOptions,
+    client: OnceCell<bb8::Pool<Manager>>,
 }
 
 // implement `Debug` manually, or password may be leaked.
@@ -249,14 +277,26 @@ impl Debug for Adapter {
 }
 
 impl Adapter {
-    async fn conn(&self) -> Result<Client> {
-        Ok(self
+    async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
+        let client = self
             .client
             .get_or_try_init(|| async {
-                Client::connect(self.endpoints.clone(), 
Some(self.options.clone())).await
+                bb8::Pool::builder()
+                    .max_size(64)
+                    .build(Manager {
+                        endpoints: self.endpoints.clone(),
+                        options: self.options.clone(),
+                    })
+                    .await
             })
-            .await?
-            .clone())
+            .await?;
+
+        client.get_owned().await.map_err(|err| match err {
+            RunError::User(err) => err,
+            RunError::TimedOut => {
+                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+            }
+        })
     }
 }
 

Reply via email to