This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 7906e2318 refactor: migrate tikv service from adapter::kv to impl
Access directly (#6713)
7906e2318 is described below
commit 7906e23186001a3dcb795c8cfee9ac9af16985b6
Author: Qinxuan Chen <[email protected]>
AuthorDate: Tue Oct 21 15:17:40 2025 +0800
refactor: migrate tikv service from adapter::kv to impl Access directly
(#6713)
* refactor: migrate tikv service from adapter::kv to Access directly
* rename module name: delete => deleter
* update docs.md
* adjust some imports
---
core/benches/vs_fs/Cargo.toml | 2 +-
core/src/services/tikv/backend.rs | 173 +++++++++++---------------
core/src/services/tikv/config.rs | 4 +-
core/src/services/tikv/core.rs | 112 +++++++++++++++++
core/src/services/tikv/{mod.rs => deleter.rs} | 28 ++++-
core/src/services/tikv/docs.md | 9 +-
core/src/services/tikv/mod.rs | 4 +
core/src/services/tikv/writer.rs | 59 +++++++++
8 files changed, 281 insertions(+), 110 deletions(-)
diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml
index 098c24663..a8b28e973 100644
--- a/core/benches/vs_fs/Cargo.toml
+++ b/core/benches/vs_fs/Cargo.toml
@@ -26,7 +26,7 @@ version = "0.0.0"
[dependencies]
criterion = { version = "0.7", features = ["async", "async_tokio"] }
-opendal = { path = "../..", features = ["tests"] }
+opendal = { path = "../..", features = ["blocking", "tests"] }
rand = "0.8"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1", features = ["v4"] }
diff --git a/core/src/services/tikv/backend.rs
b/core/src/services/tikv/backend.rs
index e15be8ff3..1d5060b66 100644
--- a/core/src/services/tikv/backend.rs
+++ b/core/src/services/tikv/backend.rs
@@ -17,19 +17,15 @@
use std::fmt::Debug;
use std::fmt::Formatter;
+use std::sync::Arc;
-use tikv_client::Config;
-use tikv_client::RawClient;
use tokio::sync::OnceCell;
-use crate::Builder;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::Access;
-use crate::raw::adapters::kv;
-use crate::services::TikvConfig;
+use super::config::TikvConfig;
+use super::core::*;
+use super::deleter::TikvDeleter;
+use super::writer::TikvWriter;
+use crate::raw::*;
use crate::*;
/// TiKV backend builder
@@ -112,7 +108,7 @@ impl Builder for TikvBuilder {
)?;
}
- Ok(TikvBackend::new(Adapter {
+ Ok(TikvBackend::new(TikvCore {
client: OnceCell::new(),
endpoints,
insecure: self.config.insecure,
@@ -124,107 +120,86 @@ impl Builder for TikvBuilder {
}
/// Backend for TiKV service
-pub type TikvBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- client: OnceCell<RawClient>,
- endpoints: Vec<String>,
- insecure: bool,
- ca_path: Option<String>,
- cert_path: Option<String>,
- key_path: Option<String>,
+#[derive(Clone, Debug)]
+pub struct TikvBackend {
+ core: Arc<TikvCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Adapter");
-
- ds.field("endpoints", &self.endpoints);
- ds.finish()
- }
-}
-
-impl Adapter {
- async fn get_connection(&self) -> Result<RawClient> {
- if let Some(client) = self.client.get() {
- return Ok(client.clone());
+impl TikvBackend {
+ fn new(core: TikvCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Tikv.into_static());
+ info.set_name("TiKV");
+ info.set_root("/");
+ info.set_native_capability(Capability {
+ read: true,
+ stat: true,
+ write: true,
+ write_can_empty: true,
+ delete: true,
+ shared: true,
+ ..Default::default()
+ });
+
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
}
- let client = if self.insecure {
- RawClient::new(self.endpoints.clone())
- .await
- .map_err(parse_tikv_config_error)?
- } else if self.ca_path.is_some() && self.key_path.is_some() &&
self.cert_path.is_some() {
- let (ca_path, key_path, cert_path) = (
- self.ca_path.clone().unwrap(),
- self.key_path.clone().unwrap(),
- self.cert_path.clone().unwrap(),
- );
- let config = Config::default().with_security(ca_path, cert_path,
key_path);
- RawClient::new_with_config(self.endpoints.clone(), config)
- .await
- .map_err(parse_tikv_config_error)?
- } else {
- return Err(
- Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
- .with_context("service", Scheme::Tikv)
- .with_context("endpoints", format!("{:?}",
self.endpoints)),
- );
- };
- self.client.set(client.clone()).ok();
- Ok(client)
}
}
-impl kv::Adapter for Adapter {
- type Scanner = ();
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Tikv,
- "TiKV",
- Capability {
- read: true,
- write: true,
- shared: true,
- ..Default::default()
- },
- )
+impl Access for TikvBackend {
+ type Reader = Buffer;
+ type Writer = TikvWriter;
+ type Lister = ();
+ type Deleter = oio::OneShotDeleter<TikvDeleter>;
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn get(&self, path: &str) -> Result<Option<Buffer>> {
- let result = self
- .get_connection()
- .await?
- .get(path.to_owned())
- .await
- .map_err(parse_tikv_error)?;
- Ok(result.map(Buffer::from))
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
+
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p).await?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
tikv")),
+ }
+ }
}
- async fn set(&self, path: &str, value: Buffer) -> Result<()> {
- self.get_connection()
- .await?
- .put(path.to_owned(), value.to_vec())
- .await
- .map_err(parse_tikv_error)
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p).await? {
+ Some(bs) => bs,
+ None => return Err(Error::new(ErrorKind::NotFound, "kv not found
in tikv")),
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
- async fn delete(&self, path: &str) -> Result<()> {
- self.get_connection()
- .await?
- .delete(path.to_owned())
- .await
- .map_err(parse_tikv_error)
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ Ok((RpWrite::new(), TikvWriter::new(self.core.clone(), p)))
}
-}
-fn parse_tikv_error(e: tikv_client::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
-}
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ Ok((
+ RpDelete::default(),
+ oio::OneShotDeleter::new(TikvDeleter::new(self.core.clone(),
self.root.clone())),
+ ))
+ }
-fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
- Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
- .with_context("service", Scheme::Tikv)
- .set_source(e)
+ async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
+ let _ = build_abs_path(&self.root, path);
+ Ok((RpList::default(), ()))
+ }
}
diff --git a/core/src/services/tikv/config.rs b/core/src/services/tikv/config.rs
index 88a17cd4f..c0ee1a1f0 100644
--- a/core/src/services/tikv/config.rs
+++ b/core/src/services/tikv/config.rs
@@ -18,10 +18,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
-use super::backend::TikvBuilder;
use serde::Deserialize;
use serde::Serialize;
+use super::backend::TikvBuilder;
+
/// Config for Tikv services support.
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -54,6 +55,7 @@ impl Debug for TikvConfig {
impl crate::Configurator for TikvConfig {
type Builder = TikvBuilder;
+
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let map = uri.options().clone();
diff --git a/core/src/services/tikv/core.rs b/core/src/services/tikv/core.rs
new file mode 100644
index 000000000..80a62a240
--- /dev/null
+++ b/core/src/services/tikv/core.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use tikv_client::Config;
+use tikv_client::RawClient;
+use tokio::sync::OnceCell;
+
+use crate::*;
+
+/// TikvCore holds the configuration and client for interacting with TiKV.
+#[derive(Clone)]
+pub struct TikvCore {
+ pub client: OnceCell<RawClient>,
+ pub endpoints: Vec<String>,
+ pub insecure: bool,
+ pub ca_path: Option<String>,
+ pub cert_path: Option<String>,
+ pub key_path: Option<String>,
+}
+
+impl Debug for TikvCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("TikvCore")
+ .field("endpoints", &self.endpoints)
+ .field("insecure", &self.insecure)
+ .finish()
+ }
+}
+
+impl TikvCore {
+ async fn get_connection(&self) -> Result<RawClient> {
+ if let Some(client) = self.client.get() {
+ return Ok(client.clone());
+ }
+ let client = if self.insecure {
+ RawClient::new(self.endpoints.clone())
+ .await
+ .map_err(parse_tikv_config_error)?
+ } else if self.ca_path.is_some() && self.key_path.is_some() &&
self.cert_path.is_some() {
+ let (ca_path, key_path, cert_path) = (
+ self.ca_path.clone().unwrap(),
+ self.key_path.clone().unwrap(),
+ self.cert_path.clone().unwrap(),
+ );
+ let config = Config::default().with_security(ca_path, cert_path,
key_path);
+ RawClient::new_with_config(self.endpoints.clone(), config)
+ .await
+ .map_err(parse_tikv_config_error)?
+ } else {
+ return Err(
+ Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
+ .with_context("service", Scheme::Tikv)
+ .with_context("endpoints", format!("{:?}",
self.endpoints)),
+ );
+ };
+ self.client.set(client.clone()).ok();
+ Ok(client)
+ }
+
+ pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let result = self
+ .get_connection()
+ .await?
+ .get(path.to_owned())
+ .await
+ .map_err(parse_tikv_error)?;
+ Ok(result.map(Buffer::from))
+ }
+
+ pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ self.get_connection()
+ .await?
+ .put(path.to_owned(), value.to_vec())
+ .await
+ .map_err(parse_tikv_error)
+ }
+
+ pub async fn delete(&self, path: &str) -> Result<()> {
+ self.get_connection()
+ .await?
+ .delete(path.to_owned())
+ .await
+ .map_err(parse_tikv_error)
+ }
+}
+
+fn parse_tikv_error(e: tikv_client::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "error from tikv").set_source(e)
+}
+
+fn parse_tikv_config_error(e: tikv_client::Error) -> Error {
+ Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
+ .with_context("service", Scheme::Tikv)
+ .set_source(e)
+}
diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/deleter.rs
similarity index 61%
copy from core/src/services/tikv/mod.rs
copy to core/src/services/tikv/deleter.rs
index f521407fb..96903bbae 100644
--- a/core/src/services/tikv/mod.rs
+++ b/core/src/services/tikv/deleter.rs
@@ -15,8 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::TikvBuilder as Tikv;
+use std::sync::Arc;
-mod config;
-pub use config::TikvConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct TikvDeleter {
+ core: Arc<TikvCore>,
+ root: String,
+}
+
+impl TikvDeleter {
+ pub fn new(core: Arc<TikvCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for TikvDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p).await?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/tikv/docs.md b/core/src/services/tikv/docs.md
index abccf800d..ac15f71e0 100644
--- a/core/src/services/tikv/docs.md
+++ b/core/src/services/tikv/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [x] create_dir
- [x] delete
-- [x] copy
-- [x] rename
-- [ ] ~~list~~
+- [ ] copy
+- [ ] rename
+- [ ] list
- [ ] ~~presign~~
-- [ ] ~~blocking~~
## Configuration
diff --git a/core/src/services/tikv/mod.rs b/core/src/services/tikv/mod.rs
index f521407fb..8844a85d1 100644
--- a/core/src/services/tikv/mod.rs
+++ b/core/src/services/tikv/mod.rs
@@ -16,6 +16,10 @@
// under the License.
mod backend;
+mod core;
+mod deleter;
+mod writer;
+
pub use backend::TikvBuilder as Tikv;
mod config;
diff --git a/core/src/services/tikv/writer.rs b/core/src/services/tikv/writer.rs
new file mode 100644
index 000000000..375b3f0d5
--- /dev/null
+++ b/core/src/services/tikv/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct TikvWriter {
+ core: Arc<TikvCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl TikvWriter {
+ pub fn new(core: Arc<TikvCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for TikvWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf).await?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}