This is an automated email from the ASF dual-hosted git repository.

koushiro pushed a commit to branch refactor-d1
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit f48dd80a04ab701e197b8f458c964740a1da24cf
Author: koushiro <[email protected]>
AuthorDate: Wed Oct 22 14:58:39 2025 +0800

    refactor: migrate d1 service from adapter::kv to impl Access directly
---
 core/src/services/d1/backend.rs             | 210 +++++++++++-----------------
 core/src/services/d1/config.rs              |   4 +-
 core/src/services/d1/core.rs                | 134 ++++++++++++++++++
 core/src/services/d1/{mod.rs => deleter.rs} |  29 +++-
 core/src/services/d1/docs.md                |   5 +-
 core/src/services/d1/error.rs               |   3 +-
 core/src/services/d1/mod.rs                 |   3 +
 core/src/services/d1/model.rs               |   3 +-
 core/src/services/d1/writer.rs              |  59 ++++++++
 9 files changed, 308 insertions(+), 142 deletions(-)

diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs
index dd195a1f4..224ef057c 100644
--- a/core/src/services/d1/backend.rs
+++ b/core/src/services/d1/backend.rs
@@ -15,20 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::fmt::Formatter;
-
-use http::Request;
-use http::StatusCode;
-use http::header;
-use serde_json::Value;
-
-use super::error::parse_error;
-use super::model::D1Response;
-use crate::ErrorKind;
-use crate::raw::adapters::kv;
+use std::sync::Arc;
+
+use super::config::D1Config;
+use super::core::*;
+use super::deleter::D1Deleter;
+use super::writer::D1Writer;
 use crate::raw::*;
-use crate::services::D1Config;
 use crate::*;
 
 #[doc = include_str!("docs.md")]
@@ -39,14 +32,6 @@ pub struct D1Builder {
     pub(super) http_client: Option<HttpClient>,
 }
 
-impl Debug for D1Builder {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("D1Builder")
-            .field("config", &self.config)
-            .finish()
-    }
-}
-
 impl D1Builder {
     /// Set api token for the cloudflare d1 service.
     ///
@@ -179,7 +164,7 @@ impl Builder for D1Builder {
                 .unwrap_or_else(|| "/".to_string())
                 .as_str(),
         );
-        Ok(D1Backend::new(Adapter {
+        Ok(D1Backend::new(D1Core {
             authorization,
             account_id,
             database_id,
@@ -192,129 +177,98 @@ impl Builder for D1Builder {
     }
 }
 
-pub type D1Backend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    authorization: Option<String>,
-    account_id: String,
-    database_id: String,
-
-    client: HttpClient,
-    table: String,
-    key_field: String,
-    value_field: String,
-}
-
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("D1Adapter");
-        ds.field("table", &self.table);
-        ds.field("key_field", &self.key_field);
-        ds.field("value_field", &self.value_field);
-        ds.finish()
-    }
+/// Backend for D1 services.
+#[derive(Clone, Debug)]
+pub struct D1Backend {
+    core: Arc<D1Core>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Adapter {
-    fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> 
Result<Request<Buffer>> {
-        let p = format!(
-            "/accounts/{}/d1/database/{}/query",
-            self.account_id, self.database_id
-        );
-        let url: String = format!(
-            "{}{}",
-            "https://api.cloudflare.com/client/v4";,
-            percent_encode_path(&p)
-        );
+impl D1Backend {
+    pub fn new(core: D1Core) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::D1.into_static());
+        info.set_name(&core.table);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            // Cloudflare D1 supports 1MB as max in write_total.
+            // refer to https://developers.cloudflare.com/d1/platform/limits/
+            write_total_max_size: Some(1000 * 1000),
+            delete: true,
+            shared: true,
+            ..Default::default()
+        });
 
-        let mut req = Request::post(&url);
-        if let Some(auth) = &self.authorization {
-            req = req.header(header::AUTHORIZATION, auth);
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
         }
-        req = req.header(header::CONTENT_TYPE, "application/json");
-
-        let json = serde_json::json!({
-            "sql": sql,
-            "params": params,
-        });
+    }
 
-        let body = 
serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
-        req.body(Buffer::from(body))
-            .map_err(new_request_build_error)
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
 }
 
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::D1,
-            &self.table,
-            Capability {
-                read: true,
-                write: true,
-                // Cloudflare D1 supports 1MB as max in write_total.
-                // refer to 
https://developers.cloudflare.com/d1/platform/limits/
-                write_total_max_size: Some(1000 * 1000),
-                shared: true,
-                ..Default::default()
-            },
-        )
+impl Access for D1Backend {
+    type Reader = Buffer;
+    type Writer = D1Writer;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<D1Deleter>;
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let query = format!(
-            "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
-            self.value_field, self.table, self.key_field
-        );
-        let req = self.create_d1_query_request(&query, vec![path.into()])?;
-
-        let resp = self.client.send(req).await?;
-        let status = resp.status();
-        match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
-                let body = resp.into_body();
-                let bs = body.to_bytes();
-                let d1_response = D1Response::parse(&bs)?;
-                Ok(d1_response.get_result(&self.value_field))
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
+
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p).await?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
d1")),
             }
-            _ => Err(parse_error(resp)),
         }
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        let table = &self.table;
-        let key_field = &self.key_field;
-        let value_field = &self.value_field;
-        let query = format!(
-            "INSERT INTO {table} ({key_field}, {value_field}) \
-                VALUES (?, ?) \
-                ON CONFLICT ({key_field}) \
-                    DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
-        );
-
-        let params = vec![path.into(), value.to_vec().into()];
-        let req = self.create_d1_query_request(&query, params)?;
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p).await? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
d1"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
+    }
 
-        let resp = self.client.send(req).await?;
-        let status = resp.status();
-        match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
-            _ => Err(parse_error(resp)),
-        }
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), D1Writer::new(self.core.clone(), p)))
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        let query = format!("DELETE FROM {} WHERE {} = ?", self.table, 
self.key_field);
-        let req = self.create_d1_query_request(&query, vec![path.into()])?;
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(D1Deleter::new(self.core.clone(), 
self.root.clone())),
+        ))
+    }
 
-        let resp = self.client.send(req).await?;
-        let status = resp.status();
-        match status {
-            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
-            _ => Err(parse_error(resp)),
-        }
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
     }
 }
diff --git a/core/src/services/d1/config.rs b/core/src/services/d1/config.rs
index 168371f2b..e20b9c076 100644
--- a/core/src/services/d1/config.rs
+++ b/core/src/services/d1/config.rs
@@ -18,10 +18,11 @@
 use std::fmt::Debug;
 use std::fmt::Formatter;
 
-use super::backend::D1Builder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::D1Builder;
+
 /// Config for [Cloudflare D1](https://developers.cloudflare.com/d1) backend 
support.
 #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -57,6 +58,7 @@ impl Debug for D1Config {
 
 impl crate::Configurator for D1Config {
     type Builder = D1Builder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let account_id = uri.name().ok_or_else(|| {
             crate::Error::new(
diff --git a/core/src/services/d1/core.rs b/core/src/services/d1/core.rs
new file mode 100644
index 000000000..da179dbf3
--- /dev/null
+++ b/core/src/services/d1/core.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use http::Request;
+use http::StatusCode;
+use http::header;
+use serde_json::Value;
+
+use super::error::parse_error;
+use super::model::*;
+use crate::raw::*;
+use crate::*;
+
+#[derive(Clone)]
+pub struct D1Core {
+    pub authorization: Option<String>,
+    pub account_id: String,
+    pub database_id: String,
+
+    pub client: HttpClient,
+    pub table: String,
+    pub key_field: String,
+    pub value_field: String,
+}
+
+impl Debug for D1Core {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("D1Core");
+        ds.field("table", &self.table);
+        ds.field("key_field", &self.key_field);
+        ds.field("value_field", &self.value_field);
+        ds.finish()
+    }
+}
+
+impl D1Core {
+    fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> 
Result<Request<Buffer>> {
+        let p = format!(
+            "/accounts/{}/d1/database/{}/query",
+            self.account_id, self.database_id
+        );
+        let url: String = format!(
+            "{}{}",
+            "https://api.cloudflare.com/client/v4";,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+        if let Some(auth) = &self.authorization {
+            req = req.header(header::AUTHORIZATION, auth);
+        }
+        req = req.header(header::CONTENT_TYPE, "application/json");
+
+        let json = serde_json::json!({
+            "sql": sql,
+            "params": params,
+        });
+
+        let body = 
serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
+        req.body(Buffer::from(body))
+            .map_err(new_request_build_error)
+    }
+
+    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+        let query = format!(
+            "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
+            self.value_field, self.table, self.key_field
+        );
+        let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let body = resp.into_body();
+                let bs = body.to_bytes();
+                let d1_response = D1Response::parse(&bs)?;
+                Ok(d1_response.get_result(&self.value_field))
+            }
+            _ => Err(parse_error(resp)),
+        }
+    }
+
+    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+        let table = &self.table;
+        let key_field = &self.key_field;
+        let value_field = &self.value_field;
+        let query = format!(
+            "INSERT INTO {table} ({key_field}, {value_field}) \
+                VALUES (?, ?) \
+                ON CONFLICT ({key_field}) \
+                    DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
+        );
+
+        let params = vec![path.into(), value.to_vec().into()];
+        let req = self.create_d1_query_request(&query, params)?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+            _ => Err(parse_error(resp)),
+        }
+    }
+
+    pub async fn delete(&self, path: &str) -> Result<()> {
+        let query = format!("DELETE FROM {} WHERE {} = ?", self.table, 
self.key_field);
+        let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+            _ => Err(parse_error(resp)),
+        }
+    }
+}
diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/deleter.rs
similarity index 61%
copy from core/src/services/d1/mod.rs
copy to core/src/services/d1/deleter.rs
index 9c3c6ea4d..1ada4007e 100644
--- a/core/src/services/d1/mod.rs
+++ b/core/src/services/d1/deleter.rs
@@ -15,11 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-mod model;
+use std::sync::Arc;
 
-mod backend;
-pub use backend::D1Builder as D1;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
 
-mod config;
-pub use config::D1Config;
+pub struct D1Deleter {
+    core: Arc<D1Core>,
+    root: String,
+}
+
+impl D1Deleter {
+    pub fn new(core: Arc<D1Core>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for D1Deleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p).await?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/d1/docs.md b/core/src/services/d1/docs.md
index 798cc834b..290201a30 100644
--- a/core/src/services/d1/docs.md
+++ b/core/src/services/d1/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [x] create_dir
 - [x] delete
 - [ ] copy
 - [ ] rename
-- [ ] ~~list~~
+- [ ] list
 - [ ] ~~presign~~
-- [ ] blocking
 
 ## Configuration
 
diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs
index 37f6b9556..002a823c5 100644
--- a/core/src/services/d1/error.rs
+++ b/core/src/services/d1/error.rs
@@ -20,8 +20,7 @@ use http::Response;
 use http::StatusCode;
 use serde_json::de;
 
-use super::model::D1Error;
-use super::model::D1Response;
+use super::model::*;
 use crate::raw::*;
 use crate::*;
 
diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/mod.rs
index 9c3c6ea4d..15e6240ae 100644
--- a/core/src/services/d1/mod.rs
+++ b/core/src/services/d1/mod.rs
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+mod core;
+mod deleter;
 mod error;
 mod model;
+mod writer;
 
 mod backend;
 pub use backend::D1Builder as D1;
diff --git a/core/src/services/d1/model.rs b/core/src/services/d1/model.rs
index 4e8f6eb59..d0daa9262 100644
--- a/core/src/services/d1/model.rs
+++ b/core/src/services/d1/model.rs
@@ -23,8 +23,7 @@ use serde::Serialize;
 use serde_json::Map;
 use serde_json::Value;
 
-use crate::Buffer;
-use crate::Error;
+use crate::*;
 
 /// response data from d1
 #[derive(Deserialize, Debug)]
diff --git a/core/src/services/d1/writer.rs b/core/src/services/d1/writer.rs
new file mode 100644
index 000000000..97751ff13
--- /dev/null
+++ b/core/src/services/d1/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct D1Writer {
+    core: Arc<D1Core>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl D1Writer {
+    pub fn new(core: Arc<D1Core>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for D1Writer {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf).await?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to