This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ac54d28f3 refactor: migrate surrealdb service from adapter::kv to impl
Access directly (#6723)
ac54d28f3 is described below
commit ac54d28f361e7072150b80bb99646d9b48635a54
Author: Qinxuan Chen <[email protected]>
AuthorDate: Wed Oct 22 17:28:27 2025 +0800
refactor: migrate surrealdb service from adapter::kv to impl Access
directly (#6723)
---
core/src/services/surrealdb/backend.rs | 220 ++++++++-------------
core/src/services/surrealdb/config.rs | 4 +-
core/src/services/surrealdb/core.rs | 160 +++++++++++++++
core/src/services/surrealdb/{mod.rs => deleter.rs} | 28 ++-
core/src/services/surrealdb/docs.md | 5 +-
core/src/services/surrealdb/mod.rs | 4 +
core/src/services/surrealdb/writer.rs | 59 ++++++
7 files changed, 331 insertions(+), 149 deletions(-)
diff --git a/core/src/services/surrealdb/backend.rs
b/core/src/services/surrealdb/backend.rs
index c80a48c2b..d06cc0f21 100644
--- a/core/src/services/surrealdb/backend.rs
+++ b/core/src/services/surrealdb/backend.rs
@@ -19,15 +19,13 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
-use surrealdb::Surreal;
-use surrealdb::engine::any::Any;
-use surrealdb::opt::auth::Database;
use tokio::sync::OnceCell;
-use crate::raw::Access;
-use crate::raw::adapters::kv;
-use crate::raw::normalize_root;
-use crate::services::SurrealdbConfig;
+use super::config::SurrealdbConfig;
+use super::core::*;
+use super::deleter::SurrealdbDeleter;
+use super::writer::SurrealdbWriter;
+use crate::raw::*;
use crate::*;
#[doc = include_str!("docs.md")]
@@ -192,7 +190,7 @@ impl Builder for SurrealdbBuilder {
.as_str(),
);
- Ok(SurrealdbBackend::new(Adapter {
+ Ok(SurrealdbBackend::new(SurrealdbCore {
db: OnceCell::new(),
connection_string,
username,
@@ -208,154 +206,94 @@ impl Builder for SurrealdbBuilder {
}
/// Backend for Surrealdb service
-pub type SurrealdbBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- db: OnceCell<Arc<Surreal<Any>>>,
- connection_string: String,
-
- username: String,
- password: String,
- namespace: String,
- database: String,
-
- table: String,
- key_field: String,
- value_field: String,
+#[derive(Clone, Debug)]
+pub struct SurrealdbBackend {
+ core: Arc<SurrealdbCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("Adapter")
- .field("connection_string", &self.connection_string)
- .field("username", &self.username)
- .field("password", &"<redacted>")
- .field("namespace", &self.namespace)
- .field("database", &self.database)
- .field("table", &self.table)
- .field("key_field", &self.key_field)
- .field("value_field", &self.value_field)
- .finish()
+impl SurrealdbBackend {
+ pub fn new(core: SurrealdbCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Surrealdb.into_static());
+ info.set_name(&core.table);
+ info.set_root("/");
+ info.set_native_capability(Capability {
+ read: true,
+ stat: true,
+ write: true,
+ write_can_empty: true,
+ delete: true,
+ shared: true,
+ ..Default::default()
+ });
+
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
+ }
}
-}
-impl Adapter {
- async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
- self.db
- .get_or_try_init(|| async {
- let namespace = self.namespace.as_str();
- let database = self.database.as_str();
-
- let db: Surreal<Any> = Surreal::init();
- db.connect(self.connection_string.clone())
- .await
- .map_err(parse_surrealdb_error)?;
-
- if !self.username.is_empty() && !self.password.is_empty() {
- db.signin(Database {
- namespace,
- database,
- username: self.username.as_str(),
- password: self.password.as_str(),
- })
- .await
- .map_err(parse_surrealdb_error)?;
- }
- db.use_ns(namespace)
- .use_db(database)
- .await
- .map_err(parse_surrealdb_error)?;
-
- Ok(Arc::new(db))
- })
- .await
- .map(|v| v.as_ref())
+ fn with_normalized_root(mut self, root: String) -> Self {
+ self.info.set_root(&root);
+ self.root = root;
+ self
}
}
-impl kv::Adapter for Adapter {
- type Scanner = ();
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Surrealdb,
- &self.table,
- Capability {
- read: true,
- write: true,
- shared: true,
- ..Default::default()
- },
- )
+impl Access for SurrealdbBackend {
+ type Reader = Buffer;
+ type Writer = SurrealdbWriter;
+ type Lister = ();
+ type Deleter = oio::OneShotDeleter<SurrealdbDeleter>;
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
- let query: String = if self.key_field == "id" {
- "SELECT type::field($value_field) FROM type::thing($table,
$path)".to_string()
- } else {
- format!(
- "SELECT type::field($value_field) FROM type::table($table)
WHERE {} = $path LIMIT 1",
- self.key_field
- )
- };
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
- let mut result = self
- .get_connection()
- .await?
- .query(query)
- .bind(("namespace", "opendal"))
- .bind(("path", path.to_string()))
- .bind(("table", self.table.to_string()))
- .bind(("value_field", self.value_field.to_string()))
- .await
- .map_err(parse_surrealdb_error)?;
-
- let value: Option<Vec<u8>> = result
- .take((0, self.value_field.as_str()))
- .map_err(parse_surrealdb_error)?;
-
- Ok(value.map(Buffer::from))
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p).await?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
surrealdb")),
+ }
+ }
}
- async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
- let query = format!(
- "INSERT INTO {} ({}, {}) \
- VALUES ($path, $value) \
- ON DUPLICATE KEY UPDATE {} = $value",
- self.table, self.key_field, self.value_field, self.value_field
- );
- self.get_connection()
- .await?
- .query(query)
- .bind(("path", path.to_string()))
- .bind(("value", value.to_vec()))
- .await
- .map_err(parse_surrealdb_error)?;
- Ok(())
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p).await? {
+ Some(bs) => bs,
+ None => {
+ return Err(Error::new(ErrorKind::NotFound, "kv not found in
surrealdb"));
+ }
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
- async fn delete(&self, path: &str) -> crate::Result<()> {
- let query: String = if self.key_field == "id" {
- "DELETE FROM type::thing($table, $path)".to_string()
- } else {
- format!(
- "DELETE FROM type::table($table) WHERE {} = $path",
- self.key_field
- )
- };
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ Ok((RpWrite::new(), SurrealdbWriter::new(self.core.clone(), p)))
+ }
- self.get_connection()
- .await?
- .query(query.as_str())
- .bind(("path", path.to_string()))
- .bind(("table", self.table.to_string()))
- .await
- .map_err(parse_surrealdb_error)?;
- Ok(())
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ Ok((
+ RpDelete::default(),
+ oio::OneShotDeleter::new(SurrealdbDeleter::new(self.core.clone(),
self.root.clone())),
+ ))
}
-}
-fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "unhandled error from
surrealdb").set_source(err)
+ async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
+ let _ = build_abs_path(&self.root, path);
+ Ok((RpList::default(), ()))
+ }
}
diff --git a/core/src/services/surrealdb/config.rs
b/core/src/services/surrealdb/config.rs
index 76b102d3e..a09711653 100644
--- a/core/src/services/surrealdb/config.rs
+++ b/core/src/services/surrealdb/config.rs
@@ -18,10 +18,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
-use super::backend::SurrealdbBuilder;
use serde::Deserialize;
use serde::Serialize;
+use super::backend::SurrealdbBuilder;
+
/// Config for Surrealdb services support.
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -66,6 +67,7 @@ impl Debug for SurrealdbConfig {
impl crate::Configurator for SurrealdbConfig {
type Builder = SurrealdbBuilder;
+
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let mut map = uri.options().clone();
diff --git a/core/src/services/surrealdb/core.rs
b/core/src/services/surrealdb/core.rs
new file mode 100644
index 000000000..9172ed951
--- /dev/null
+++ b/core/src/services/surrealdb/core.rs
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use surrealdb::Surreal;
+use surrealdb::engine::any::Any;
+use surrealdb::opt::auth::Database;
+use tokio::sync::OnceCell;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct SurrealdbCore {
+ pub db: OnceCell<Arc<Surreal<Any>>>,
+ pub connection_string: String,
+
+ pub username: String,
+ pub password: String,
+ pub namespace: String,
+ pub database: String,
+
+ pub table: String,
+ pub key_field: String,
+ pub value_field: String,
+}
+
+impl Debug for SurrealdbCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("SurrealdbCore")
+ .field("connection_string", &self.connection_string)
+ .field("username", &self.username)
+ .field("password", &"<redacted>")
+ .field("namespace", &self.namespace)
+ .field("database", &self.database)
+ .field("table", &self.table)
+ .field("key_field", &self.key_field)
+ .field("value_field", &self.value_field)
+ .finish()
+ }
+}
+
+impl SurrealdbCore {
+ async fn get_connection(&self) -> Result<&Surreal<Any>> {
+ self.db
+ .get_or_try_init(|| async {
+ let namespace = self.namespace.as_str();
+ let database = self.database.as_str();
+
+ let db: Surreal<Any> = Surreal::init();
+ db.connect(self.connection_string.clone())
+ .await
+ .map_err(parse_surrealdb_error)?;
+
+ if !self.username.is_empty() && !self.password.is_empty() {
+ db.signin(Database {
+ namespace,
+ database,
+ username: self.username.as_str(),
+ password: self.password.as_str(),
+ })
+ .await
+ .map_err(parse_surrealdb_error)?;
+ }
+ db.use_ns(namespace)
+ .use_db(database)
+ .await
+ .map_err(parse_surrealdb_error)?;
+
+ Ok(Arc::new(db))
+ })
+ .await
+ .map(|v| v.as_ref())
+ }
+
+ pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let query: String = if self.key_field == "id" {
+ "SELECT type::field($value_field) FROM type::thing($table,
$path)".to_string()
+ } else {
+ format!(
+ "SELECT type::field($value_field) FROM type::table($table)
WHERE {} = $path LIMIT 1",
+ self.key_field
+ )
+ };
+
+ let mut result = self
+ .get_connection()
+ .await?
+ .query(query)
+ .bind(("namespace", "opendal"))
+ .bind(("path", path.to_string()))
+ .bind(("table", self.table.to_string()))
+ .bind(("value_field", self.value_field.to_string()))
+ .await
+ .map_err(parse_surrealdb_error)?;
+
+ let value: Option<Vec<u8>> = result
+ .take((0, self.value_field.as_str()))
+ .map_err(parse_surrealdb_error)?;
+
+ Ok(value.map(Buffer::from))
+ }
+
+ pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ let query = format!(
+ "INSERT INTO {} ({}, {}) \
+ VALUES ($path, $value) \
+ ON DUPLICATE KEY UPDATE {} = $value",
+ self.table, self.key_field, self.value_field, self.value_field
+ );
+ self.get_connection()
+ .await?
+ .query(query)
+ .bind(("path", path.to_string()))
+ .bind(("value", value.to_vec()))
+ .await
+ .map_err(parse_surrealdb_error)?;
+ Ok(())
+ }
+
+ pub async fn delete(&self, path: &str) -> Result<()> {
+ let query: String = if self.key_field == "id" {
+ "DELETE FROM type::thing($table, $path)".to_string()
+ } else {
+ format!(
+ "DELETE FROM type::table($table) WHERE {} = $path",
+ self.key_field
+ )
+ };
+
+ self.get_connection()
+ .await?
+ .query(query.as_str())
+ .bind(("path", path.to_string()))
+ .bind(("table", self.table.to_string()))
+ .await
+ .map_err(parse_surrealdb_error)?;
+ Ok(())
+ }
+}
+
+fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
surrealdb").set_source(err)
+}
diff --git a/core/src/services/surrealdb/mod.rs
b/core/src/services/surrealdb/deleter.rs
similarity index 60%
copy from core/src/services/surrealdb/mod.rs
copy to core/src/services/surrealdb/deleter.rs
index a427aac3b..8dfc62253 100644
--- a/core/src/services/surrealdb/mod.rs
+++ b/core/src/services/surrealdb/deleter.rs
@@ -15,8 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::SurrealdbBuilder as Surrealdb;
+use std::sync::Arc;
-mod config;
-pub use config::SurrealdbConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct SurrealdbDeleter {
+ core: Arc<SurrealdbCore>,
+ root: String,
+}
+
+impl SurrealdbDeleter {
+ pub fn new(core: Arc<SurrealdbCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for SurrealdbDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p).await?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/surrealdb/docs.md
b/core/src/services/surrealdb/docs.md
index 52e04df12..862736ad1 100644
--- a/core/src/services/surrealdb/docs.md
+++ b/core/src/services/surrealdb/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [ ] create_dir
- [x] delete
- [ ] copy
- [ ] rename
-- [ ] ~~list~~
+- [ ] list
- [ ] ~~presign~~
-- [ ] blocking
## Configuration
diff --git a/core/src/services/surrealdb/mod.rs
b/core/src/services/surrealdb/mod.rs
index a427aac3b..c54f14740 100644
--- a/core/src/services/surrealdb/mod.rs
+++ b/core/src/services/surrealdb/mod.rs
@@ -16,6 +16,10 @@
// under the License.
mod backend;
+mod core;
+mod deleter;
+mod writer;
+
pub use backend::SurrealdbBuilder as Surrealdb;
mod config;
diff --git a/core/src/services/surrealdb/writer.rs
b/core/src/services/surrealdb/writer.rs
new file mode 100644
index 000000000..95c0efc9a
--- /dev/null
+++ b/core/src/services/surrealdb/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct SurrealdbWriter {
+ core: Arc<SurrealdbCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl SurrealdbWriter {
+ pub fn new(core: Arc<SurrealdbCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for SurrealdbWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf).await?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}