This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 082ebca8f feat(services/etcd): Enable etcd connection pool (#3041)
082ebca8f is described below
commit 082ebca8f624536967838a9fa904f11e05be5411
Author: Xuanwo <[email protected]>
AuthorDate: Tue Sep 12 22:03:39 2023 +0800
feat(services/etcd): Enable etcd connection pool (#3041)
* feat: Enable etcd connection pool
Signed-off-by: Xuanwo <[email protected]>
* Fix format
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Fix future resume
Signed-off-by: Xuanwo <[email protected]>
* Fix wirter close is not safe for re-enter
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/Cargo.toml | 2 +-
core/src/raw/adapters/kv/backend.rs | 91 ++++++++++++++++++-------------
core/src/raw/adapters/typed_kv/backend.rs | 51 +++++++++++------
core/src/raw/oio/buf/chunked_bytes.rs | 16 ++++--
core/src/services/etcd/backend.rs | 52 ++++++++++++++++--
5 files changed, 143 insertions(+), 69 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index c2cf34eac..74e9512ab 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -119,7 +119,7 @@ services-cos = [
]
services-dashmap = ["dep:dashmap"]
services-dropbox = []
-services-etcd = ["dep:etcd-client"]
+services-etcd = ["dep:etcd-client", "dep:bb8"]
services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs"]
services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"]
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 2b843ad3e..db82e1cd6 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,6 +21,7 @@ use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
+use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use futures::FutureExt;
@@ -377,8 +378,7 @@ pub struct KvWriter<S> {
kv: Arc<S>,
path: String,
- /// TODO: if kv supports append, we can use them directly.
- buf: Option<Vec<u8>>,
+ buffer: Buffer,
future: Option<BoxFuture<'static, Result<()>>>,
}
@@ -387,12 +387,17 @@ impl<S> KvWriter<S> {
KvWriter {
kv,
path,
- buf: None,
+ buffer: Buffer::Active(BytesMut::new()),
future: None,
}
}
}
+enum Buffer {
+ Active(BytesMut),
+ Frozen(Bytes),
+}
+
/// # Safety
///
/// We will only take `&mut Self` reference for KvWriter.
@@ -400,7 +405,6 @@ unsafe impl<S: Adapter> Sync for KvWriter<S> {}
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
- // TODO: we need to support append in the future.
fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
if self.future.is_some() {
self.future = None;
@@ -410,42 +414,33 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
)));
}
- let size = bs.chunk().len();
-
- let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
- buf.extend_from_slice(bs.chunk());
- self.buf = Some(buf);
-
- Poll::Ready(Ok(size))
- }
-
- fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- if self.future.is_some() {
- self.future = None;
- return Poll::Ready(Err(Error::new(
- ErrorKind::Unexpected,
- "there is a future on going, it's maybe a bug to go into this
case",
- )));
+ match &mut self.buffer {
+ Buffer::Active(buf) => {
+ buf.extend_from_slice(bs.chunk());
+ Poll::Ready(Ok(bs.chunk().len()))
+ }
+ Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen
during poll_write"),
}
-
- self.buf = None;
- Poll::Ready(Ok(()))
}
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match self.future.as_mut() {
Some(fut) => {
- ready!(fut.poll_unpin(cx))?;
+ let res = ready!(fut.poll_unpin(cx));
self.future = None;
- return Poll::Ready(Ok(()));
+ return Poll::Ready(res);
}
None => {
let kv = self.kv.clone();
let path = self.path.clone();
- let buf = match self.buf.take() {
- Some(buf) => buf,
- None => return Poll::Ready(Ok(())),
+ let buf = match &mut self.buffer {
+ Buffer::Active(buf) => {
+ let buf = buf.split().freeze();
+ self.buffer = Buffer::Frozen(buf.clone());
+ buf
+ }
+ Buffer::Frozen(buf) => buf.clone(),
};
let fut = async move { kv.set(&path, &buf).await };
@@ -454,25 +449,43 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
}
}
+
+ fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ if self.future.is_some() {
+ self.future = None;
+ return Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "there is a future on going, it's maybe a bug to go into this
case",
+ )));
+ }
+
+ self.buffer = Buffer::Active(BytesMut::new());
+ Poll::Ready(Ok(()))
+ }
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
- let size = bs.chunk().len();
-
- let mut buf = self.buf.take().unwrap_or_else(||
Vec::with_capacity(size));
- buf.extend_from_slice(bs.chunk());
-
- self.buf = Some(buf);
-
- Ok(size)
+ match &mut self.buffer {
+ Buffer::Active(buf) => {
+ buf.extend_from_slice(bs.chunk());
+ Ok(bs.chunk().len())
+ }
+ Buffer::Frozen(_) => unreachable!("KvWriter should not be frozen
during poll_write"),
+ }
}
fn close(&mut self) -> Result<()> {
- if let Some(buf) = self.buf.as_deref() {
- self.kv.blocking_set(&self.path, buf)?;
- }
+ let buf = match &mut self.buffer {
+ Buffer::Active(buf) => {
+ let buf = buf.split().freeze();
+ self.buffer = Buffer::Frozen(buf.clone());
+ buf
+ }
+ Buffer::Frozen(buf) => buf.clone(),
+ };
+ self.kv.blocking_set(&self.path, &buf)?;
Ok(())
}
}
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index 4a2ae17ab..e04822cdd 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -368,6 +368,7 @@ pub struct KvWriter<S> {
op: OpWrite,
buf: Option<Vec<u8>>,
+ value: Option<Value>,
future: Option<BoxFuture<'static, Result<()>>>,
}
@@ -383,6 +384,7 @@ impl<S> KvWriter<S> {
path,
op,
buf: None,
+ value: None,
future: None,
}
}
@@ -412,7 +414,6 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
- // TODO: we need to support append in the future.
fn poll_write(&mut self, _: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
if self.future.is_some() {
self.future = None;
@@ -432,31 +433,25 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
Poll::Ready(Ok(size))
}
- fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
- if self.future.is_some() {
- self.future = None;
- return Poll::Ready(Err(Error::new(
- ErrorKind::Unexpected,
- "there is a future on going, it's maybe a bug to go into this
case",
- )));
- }
-
- self.buf = None;
- Poll::Ready(Ok(()))
- }
-
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
loop {
match self.future.as_mut() {
Some(fut) => {
- ready!(fut.poll_unpin(cx))?;
+ let res = ready!(fut.poll_unpin(cx));
self.future = None;
- return Poll::Ready(Ok(()));
+ return Poll::Ready(res);
}
None => {
let kv = self.kv.clone();
let path = self.path.clone();
- let value = self.build();
+ let value = match &self.value {
+ Some(value) => value.clone(),
+ None => {
+ let value = self.build();
+ self.value = Some(value.clone());
+ value
+ }
+ };
let fut = async move { kv.set(&path, value).await };
self.future = Some(Box::pin(fut));
@@ -464,6 +459,19 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
}
}
+
+ fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ if self.future.is_some() {
+ self.future = None;
+ return Poll::Ready(Err(Error::new(
+ ErrorKind::Unexpected,
+ "there is a future on going, it's maybe a bug to go into this
case",
+ )));
+ }
+
+ self.buf = None;
+ Poll::Ready(Ok(()))
+ }
}
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
@@ -480,7 +488,14 @@ impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn close(&mut self) -> Result<()> {
let kv = self.kv.clone();
- let value = self.build();
+ let value = match &self.value {
+ Some(value) => value.clone(),
+ None => {
+ let value = self.build();
+ self.value = Some(value.clone());
+ value
+ }
+ };
kv.blocking_set(&self.path, value)?;
Ok(())
diff --git a/core/src/raw/oio/buf/chunked_bytes.rs
b/core/src/raw/oio/buf/chunked_bytes.rs
index 810c0dfda..73e94b44c 100644
--- a/core/src/raw/oio/buf/chunked_bytes.rs
+++ b/core/src/raw/oio/buf/chunked_bytes.rs
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use bytes::{Bytes, BytesMut};
-use futures::Stream;
use std::cmp::min;
use std::collections::VecDeque;
use std::io::IoSlice;
use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use bytes::BytesMut;
+use futures::Stream;
use crate::raw::*;
use crate::*;
@@ -356,8 +359,11 @@ impl Stream for ChunkedBytes {
mod tests {
use log::debug;
use pretty_assertions::assert_eq;
- use rand::{thread_rng, Rng, RngCore};
- use sha2::{Digest, Sha256};
+ use rand::thread_rng;
+ use rand::Rng;
+ use rand::RngCore;
+ use sha2::Digest;
+ use sha2::Sha256;
use super::*;
use crate::raw::oio::WriteBuf;
diff --git a/core/src/services/etcd/backend.rs
b/core/src/services/etcd/backend.rs
index e5a0334be..ef318fca9 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -20,6 +20,8 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use async_trait::async_trait;
+use bb8::PooledConnection;
+use bb8::RunError;
use etcd_client::Certificate;
use etcd_client::Client;
use etcd_client::ConnectOptions;
@@ -230,11 +232,37 @@ impl EtcdBuilder {
/// Backend for etcd services.
pub type EtcdBackend = kv::Backend<Adapter>;
+#[derive(Clone)]
+pub struct Manager {
+ endpoints: Vec<String>,
+ options: ConnectOptions,
+}
+
+#[async_trait]
+impl bb8::ManageConnection for Manager {
+ type Connection = Client;
+ type Error = Error;
+
+ async fn connect(&self) -> std::result::Result<Self::Connection,
Self::Error> {
+ Ok(Client::connect(self.endpoints.clone(),
Some(self.options.clone())).await?)
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) ->
std::result::Result<(), Self::Error> {
+ let _ = conn.status().await?;
+ Ok(())
+ }
+
+ /// Always allow reuse conn.
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
#[derive(Clone)]
pub struct Adapter {
endpoints: Vec<String>,
- client: OnceCell<Client>,
options: ConnectOptions,
+ client: OnceCell<bb8::Pool<Manager>>,
}
// implement `Debug` manually, or password may be leaked.
@@ -249,14 +277,26 @@ impl Debug for Adapter {
}
impl Adapter {
- async fn conn(&self) -> Result<Client> {
- Ok(self
+ async fn conn(&self) -> Result<PooledConnection<'static, Manager>> {
+ let client = self
.client
.get_or_try_init(|| async {
- Client::connect(self.endpoints.clone(),
Some(self.options.clone())).await
+ bb8::Pool::builder()
+ .max_size(64)
+ .build(Manager {
+ endpoints: self.endpoints.clone(),
+ options: self.options.clone(),
+ })
+ .await
})
- .await?
- .clone())
+ .await?;
+
+ client.get_owned().await.map_err(|err| match err {
+ RunError::User(err) => err,
+ RunError::TimedOut => {
+ Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ }
+ })
}
}