This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ac54d28f3 refactor: migrate surrealdb service from adapter::kv to impl 
Access directly (#6723)
ac54d28f3 is described below

commit ac54d28f361e7072150b80bb99646d9b48635a54
Author: Qinxuan Chen <[email protected]>
AuthorDate: Wed Oct 22 17:28:27 2025 +0800

    refactor: migrate surrealdb service from adapter::kv to impl Access 
directly (#6723)
---
 core/src/services/surrealdb/backend.rs             | 220 ++++++++-------------
 core/src/services/surrealdb/config.rs              |   4 +-
 core/src/services/surrealdb/core.rs                | 160 +++++++++++++++
 core/src/services/surrealdb/{mod.rs => deleter.rs} |  28 ++-
 core/src/services/surrealdb/docs.md                |   5 +-
 core/src/services/surrealdb/mod.rs                 |   4 +
 core/src/services/surrealdb/writer.rs              |  59 ++++++
 7 files changed, 331 insertions(+), 149 deletions(-)

diff --git a/core/src/services/surrealdb/backend.rs 
b/core/src/services/surrealdb/backend.rs
index c80a48c2b..d06cc0f21 100644
--- a/core/src/services/surrealdb/backend.rs
+++ b/core/src/services/surrealdb/backend.rs
@@ -19,15 +19,13 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-use surrealdb::Surreal;
-use surrealdb::engine::any::Any;
-use surrealdb::opt::auth::Database;
 use tokio::sync::OnceCell;
 
-use crate::raw::Access;
-use crate::raw::adapters::kv;
-use crate::raw::normalize_root;
-use crate::services::SurrealdbConfig;
+use super::config::SurrealdbConfig;
+use super::core::*;
+use super::deleter::SurrealdbDeleter;
+use super::writer::SurrealdbWriter;
+use crate::raw::*;
 use crate::*;
 
 #[doc = include_str!("docs.md")]
@@ -192,7 +190,7 @@ impl Builder for SurrealdbBuilder {
                 .as_str(),
         );
 
-        Ok(SurrealdbBackend::new(Adapter {
+        Ok(SurrealdbBackend::new(SurrealdbCore {
             db: OnceCell::new(),
             connection_string,
             username,
@@ -208,154 +206,94 @@ impl Builder for SurrealdbBuilder {
 }
 
 /// Backend for Surrealdb service
-pub type SurrealdbBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    db: OnceCell<Arc<Surreal<Any>>>,
-    connection_string: String,
-
-    username: String,
-    password: String,
-    namespace: String,
-    database: String,
-
-    table: String,
-    key_field: String,
-    value_field: String,
+#[derive(Clone, Debug)]
+pub struct SurrealdbBackend {
+    core: Arc<SurrealdbCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("Adapter")
-            .field("connection_string", &self.connection_string)
-            .field("username", &self.username)
-            .field("password", &"<redacted>")
-            .field("namespace", &self.namespace)
-            .field("database", &self.database)
-            .field("table", &self.table)
-            .field("key_field", &self.key_field)
-            .field("value_field", &self.value_field)
-            .finish()
+impl SurrealdbBackend {
+    pub fn new(core: SurrealdbCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Surrealdb.into_static());
+        info.set_name(&core.table);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            delete: true,
+            shared: true,
+            ..Default::default()
+        });
+
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
     }
-}
 
-impl Adapter {
-    async fn get_connection(&self) -> crate::Result<&Surreal<Any>> {
-        self.db
-            .get_or_try_init(|| async {
-                let namespace = self.namespace.as_str();
-                let database = self.database.as_str();
-
-                let db: Surreal<Any> = Surreal::init();
-                db.connect(self.connection_string.clone())
-                    .await
-                    .map_err(parse_surrealdb_error)?;
-
-                if !self.username.is_empty() && !self.password.is_empty() {
-                    db.signin(Database {
-                        namespace,
-                        database,
-                        username: self.username.as_str(),
-                        password: self.password.as_str(),
-                    })
-                    .await
-                    .map_err(parse_surrealdb_error)?;
-                }
-                db.use_ns(namespace)
-                    .use_db(database)
-                    .await
-                    .map_err(parse_surrealdb_error)?;
-
-                Ok(Arc::new(db))
-            })
-            .await
-            .map(|v| v.as_ref())
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
 }
 
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Surrealdb,
-            &self.table,
-            Capability {
-                read: true,
-                write: true,
-                shared: true,
-                ..Default::default()
-            },
-        )
+impl Access for SurrealdbBackend {
+    type Reader = Buffer;
+    type Writer = SurrealdbWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<SurrealdbDeleter>;
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn get(&self, path: &str) -> crate::Result<Option<Buffer>> {
-        let query: String = if self.key_field == "id" {
-            "SELECT type::field($value_field) FROM type::thing($table, 
$path)".to_string()
-        } else {
-            format!(
-                "SELECT type::field($value_field) FROM type::table($table) 
WHERE {} = $path LIMIT 1",
-                self.key_field
-            )
-        };
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
 
-        let mut result = self
-            .get_connection()
-            .await?
-            .query(query)
-            .bind(("namespace", "opendal"))
-            .bind(("path", path.to_string()))
-            .bind(("table", self.table.to_string()))
-            .bind(("value_field", self.value_field.to_string()))
-            .await
-            .map_err(parse_surrealdb_error)?;
-
-        let value: Option<Vec<u8>> = result
-            .take((0, self.value_field.as_str()))
-            .map_err(parse_surrealdb_error)?;
-
-        Ok(value.map(Buffer::from))
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p).await?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
surrealdb")),
+            }
+        }
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> crate::Result<()> {
-        let query = format!(
-            "INSERT INTO {} ({}, {}) \
-            VALUES ($path, $value) \
-            ON DUPLICATE KEY UPDATE {} = $value",
-            self.table, self.key_field, self.value_field, self.value_field
-        );
-        self.get_connection()
-            .await?
-            .query(query)
-            .bind(("path", path.to_string()))
-            .bind(("value", value.to_vec()))
-            .await
-            .map_err(parse_surrealdb_error)?;
-        Ok(())
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p).await? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
surrealdb"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
     }
 
-    async fn delete(&self, path: &str) -> crate::Result<()> {
-        let query: String = if self.key_field == "id" {
-            "DELETE FROM type::thing($table, $path)".to_string()
-        } else {
-            format!(
-                "DELETE FROM type::table($table) WHERE {} = $path",
-                self.key_field
-            )
-        };
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), SurrealdbWriter::new(self.core.clone(), p)))
+    }
 
-        self.get_connection()
-            .await?
-            .query(query.as_str())
-            .bind(("path", path.to_string()))
-            .bind(("table", self.table.to_string()))
-            .await
-            .map_err(parse_surrealdb_error)?;
-        Ok(())
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(SurrealdbDeleter::new(self.core.clone(), 
self.root.clone())),
+        ))
     }
-}
 
-fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
-    Error::new(ErrorKind::Unexpected, "unhandled error from 
surrealdb").set_source(err)
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
+    }
 }
diff --git a/core/src/services/surrealdb/config.rs 
b/core/src/services/surrealdb/config.rs
index 76b102d3e..a09711653 100644
--- a/core/src/services/surrealdb/config.rs
+++ b/core/src/services/surrealdb/config.rs
@@ -18,10 +18,11 @@
 use std::fmt::Debug;
 use std::fmt::Formatter;
 
-use super::backend::SurrealdbBuilder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::SurrealdbBuilder;
+
 /// Config for Surrealdb services support.
 #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -66,6 +67,7 @@ impl Debug for SurrealdbConfig {
 
 impl crate::Configurator for SurrealdbConfig {
     type Builder = SurrealdbBuilder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let mut map = uri.options().clone();
 
diff --git a/core/src/services/surrealdb/core.rs 
b/core/src/services/surrealdb/core.rs
new file mode 100644
index 000000000..9172ed951
--- /dev/null
+++ b/core/src/services/surrealdb/core.rs
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use surrealdb::Surreal;
+use surrealdb::engine::any::Any;
+use surrealdb::opt::auth::Database;
+use tokio::sync::OnceCell;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct SurrealdbCore {
+    pub db: OnceCell<Arc<Surreal<Any>>>,
+    pub connection_string: String,
+
+    pub username: String,
+    pub password: String,
+    pub namespace: String,
+    pub database: String,
+
+    pub table: String,
+    pub key_field: String,
+    pub value_field: String,
+}
+
+impl Debug for SurrealdbCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SurrealdbCore")
+            .field("connection_string", &self.connection_string)
+            .field("username", &self.username)
+            .field("password", &"<redacted>")
+            .field("namespace", &self.namespace)
+            .field("database", &self.database)
+            .field("table", &self.table)
+            .field("key_field", &self.key_field)
+            .field("value_field", &self.value_field)
+            .finish()
+    }
+}
+
+impl SurrealdbCore {
+    async fn get_connection(&self) -> Result<&Surreal<Any>> {
+        self.db
+            .get_or_try_init(|| async {
+                let namespace = self.namespace.as_str();
+                let database = self.database.as_str();
+
+                let db: Surreal<Any> = Surreal::init();
+                db.connect(self.connection_string.clone())
+                    .await
+                    .map_err(parse_surrealdb_error)?;
+
+                if !self.username.is_empty() && !self.password.is_empty() {
+                    db.signin(Database {
+                        namespace,
+                        database,
+                        username: self.username.as_str(),
+                        password: self.password.as_str(),
+                    })
+                    .await
+                    .map_err(parse_surrealdb_error)?;
+                }
+                db.use_ns(namespace)
+                    .use_db(database)
+                    .await
+                    .map_err(parse_surrealdb_error)?;
+
+                Ok(Arc::new(db))
+            })
+            .await
+            .map(|v| v.as_ref())
+    }
+
+    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+        let query: String = if self.key_field == "id" {
+            "SELECT type::field($value_field) FROM type::thing($table, 
$path)".to_string()
+        } else {
+            format!(
+                "SELECT type::field($value_field) FROM type::table($table) 
WHERE {} = $path LIMIT 1",
+                self.key_field
+            )
+        };
+
+        let mut result = self
+            .get_connection()
+            .await?
+            .query(query)
+            .bind(("namespace", "opendal"))
+            .bind(("path", path.to_string()))
+            .bind(("table", self.table.to_string()))
+            .bind(("value_field", self.value_field.to_string()))
+            .await
+            .map_err(parse_surrealdb_error)?;
+
+        let value: Option<Vec<u8>> = result
+            .take((0, self.value_field.as_str()))
+            .map_err(parse_surrealdb_error)?;
+
+        Ok(value.map(Buffer::from))
+    }
+
+    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+        let query = format!(
+            "INSERT INTO {} ({}, {}) \
+            VALUES ($path, $value) \
+            ON DUPLICATE KEY UPDATE {} = $value",
+            self.table, self.key_field, self.value_field, self.value_field
+        );
+        self.get_connection()
+            .await?
+            .query(query)
+            .bind(("path", path.to_string()))
+            .bind(("value", value.to_vec()))
+            .await
+            .map_err(parse_surrealdb_error)?;
+        Ok(())
+    }
+
+    pub async fn delete(&self, path: &str) -> Result<()> {
+        let query: String = if self.key_field == "id" {
+            "DELETE FROM type::thing($table, $path)".to_string()
+        } else {
+            format!(
+                "DELETE FROM type::table($table) WHERE {} = $path",
+                self.key_field
+            )
+        };
+
+        self.get_connection()
+            .await?
+            .query(query.as_str())
+            .bind(("path", path.to_string()))
+            .bind(("table", self.table.to_string()))
+            .await
+            .map_err(parse_surrealdb_error)?;
+        Ok(())
+    }
+}
+
+fn parse_surrealdb_error(err: surrealdb::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
surrealdb").set_source(err)
+}
diff --git a/core/src/services/surrealdb/mod.rs 
b/core/src/services/surrealdb/deleter.rs
similarity index 60%
copy from core/src/services/surrealdb/mod.rs
copy to core/src/services/surrealdb/deleter.rs
index a427aac3b..8dfc62253 100644
--- a/core/src/services/surrealdb/mod.rs
+++ b/core/src/services/surrealdb/deleter.rs
@@ -15,8 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod backend;
-pub use backend::SurrealdbBuilder as Surrealdb;
+use std::sync::Arc;
 
-mod config;
-pub use config::SurrealdbConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct SurrealdbDeleter {
+    core: Arc<SurrealdbCore>,
+    root: String,
+}
+
+impl SurrealdbDeleter {
+    pub fn new(core: Arc<SurrealdbCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for SurrealdbDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p).await?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/surrealdb/docs.md 
b/core/src/services/surrealdb/docs.md
index 52e04df12..862736ad1 100644
--- a/core/src/services/surrealdb/docs.md
+++ b/core/src/services/surrealdb/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [ ] create_dir
 - [x] delete
 - [ ] copy
 - [ ] rename
-- [ ] ~~list~~
+- [ ] list
 - [ ] ~~presign~~
-- [ ] blocking
 
 ## Configuration
 
diff --git a/core/src/services/surrealdb/mod.rs 
b/core/src/services/surrealdb/mod.rs
index a427aac3b..c54f14740 100644
--- a/core/src/services/surrealdb/mod.rs
+++ b/core/src/services/surrealdb/mod.rs
@@ -16,6 +16,10 @@
 // under the License.
 
 mod backend;
+mod core;
+mod deleter;
+mod writer;
+
 pub use backend::SurrealdbBuilder as Surrealdb;
 
 mod config;
diff --git a/core/src/services/surrealdb/writer.rs 
b/core/src/services/surrealdb/writer.rs
new file mode 100644
index 000000000..95c0efc9a
--- /dev/null
+++ b/core/src/services/surrealdb/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct SurrealdbWriter {
+    core: Arc<SurrealdbCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl SurrealdbWriter {
+    pub fn new(core: Arc<SurrealdbCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for SurrealdbWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf).await?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to