This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-d1 in repository https://gitbox.apache.org/repos/asf/opendal.git
commit f48dd80a04ab701e197b8f458c964740a1da24cf Author: koushiro <[email protected]> AuthorDate: Wed Oct 22 14:58:39 2025 +0800 refactor: migrate d1 service from adapter::kv to impl Access directly --- core/src/services/d1/backend.rs | 210 +++++++++++----------------- core/src/services/d1/config.rs | 4 +- core/src/services/d1/core.rs | 134 ++++++++++++++++++ core/src/services/d1/{mod.rs => deleter.rs} | 29 +++- core/src/services/d1/docs.md | 5 +- core/src/services/d1/error.rs | 3 +- core/src/services/d1/mod.rs | 3 + core/src/services/d1/model.rs | 3 +- core/src/services/d1/writer.rs | 59 ++++++++ 9 files changed, 308 insertions(+), 142 deletions(-) diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs index dd195a1f4..224ef057c 100644 --- a/core/src/services/d1/backend.rs +++ b/core/src/services/d1/backend.rs @@ -15,20 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; - -use http::Request; -use http::StatusCode; -use http::header; -use serde_json::Value; - -use super::error::parse_error; -use super::model::D1Response; -use crate::ErrorKind; -use crate::raw::adapters::kv; +use std::sync::Arc; + +use super::config::D1Config; +use super::core::*; +use super::deleter::D1Deleter; +use super::writer::D1Writer; use crate::raw::*; -use crate::services::D1Config; use crate::*; #[doc = include_str!("docs.md")] @@ -39,14 +32,6 @@ pub struct D1Builder { pub(super) http_client: Option<HttpClient>, } -impl Debug for D1Builder { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("D1Builder") - .field("config", &self.config) - .finish() - } -} - impl D1Builder { /// Set api token for the cloudflare d1 service. /// @@ -179,7 +164,7 @@ impl Builder for D1Builder { .unwrap_or_else(|| "/".to_string()) .as_str(), ); - Ok(D1Backend::new(Adapter { + Ok(D1Backend::new(D1Core { authorization, account_id, database_id, @@ -192,129 +177,98 @@ impl Builder for D1Builder { } } -pub type D1Backend = kv::Backend<Adapter>; - -#[derive(Clone)] -pub struct Adapter { - authorization: Option<String>, - account_id: String, - database_id: String, - - client: HttpClient, - table: String, - key_field: String, - value_field: String, -} - -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("D1Adapter"); - ds.field("table", &self.table); - ds.field("key_field", &self.key_field); - ds.field("value_field", &self.value_field); - ds.finish() - } +/// Backend for D1 services. +#[derive(Clone, Debug)] +pub struct D1Backend { + core: Arc<D1Core>, + root: String, + info: Arc<AccessorInfo>, } -impl Adapter { - fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> Result<Request<Buffer>> { - let p = format!( - "/accounts/{}/d1/database/{}/query", - self.account_id, self.database_id - ); - let url: String = format!( - "{}{}", - "https://api.cloudflare.com/client/v4", - percent_encode_path(&p) - ); +impl D1Backend { + pub fn new(core: D1Core) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::D1.into_static()); + info.set_name(&core.table); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + // Cloudflare D1 supports 1MB as max in write_total. + // refer to https://developers.cloudflare.com/d1/platform/limits/ + write_total_max_size: Some(1000 * 1000), + delete: true, + shared: true, + ..Default::default() + }); - let mut req = Request::post(&url); - if let Some(auth) = &self.authorization { - req = req.header(header::AUTHORIZATION, auth); + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), } - req = req.header(header::CONTENT_TYPE, "application/json"); - - let json = serde_json::json!({ - "sql": sql, - "params": params, - }); + } - let body = serde_json::to_vec(&json).map_err(new_json_serialize_error)?; - req.body(Buffer::from(body)) - .map_err(new_request_build_error) + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self } } -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::D1, - &self.table, - Capability { - read: true, - write: true, - // Cloudflare D1 supports 1MB as max in write_total. - // refer to https://developers.cloudflare.com/d1/platform/limits/ - write_total_max_size: Some(1000 * 1000), - shared: true, - ..Default::default() - }, - ) +impl Access for D1Backend { + type Reader = Buffer; + type Writer = D1Writer; + type Lister = (); + type Deleter = oio::OneShotDeleter<D1Deleter>; + + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let query = format!( - "SELECT {} FROM {} WHERE {} = ? LIMIT 1", - self.value_field, self.table, self.key_field - ); - let req = self.create_d1_query_request(&query, vec![path.into()])?; - - let resp = self.client.send(req).await?; - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - let body = resp.into_body(); - let bs = body.to_bytes(); - let d1_response = D1Response::parse(&bs)?; - Ok(d1_response.get_result(&self.value_field)) + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); + + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p).await?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "kv not found in d1")), } - _ => Err(parse_error(resp)), } } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let table = &self.table; - let key_field = &self.key_field; - let value_field = &self.value_field; - let query = format!( - "INSERT INTO {table} ({key_field}, {value_field}) \ - VALUES (?, ?) \ - ON CONFLICT ({key_field}) \ - DO UPDATE SET {value_field} = EXCLUDED.{value_field}", - ); - - let params = vec![path.into(), value.to_vec().into()]; - let req = self.create_d1_query_request(&query, params)?; + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p).await? { + Some(bs) => bs, + None => { + return Err(Error::new(ErrorKind::NotFound, "kv not found in d1")); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) + } - let resp = self.client.send(req).await?; - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), - _ => Err(parse_error(resp)), - } + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), D1Writer::new(self.core.clone(), p))) } - async fn delete(&self, path: &str) -> Result<()> { - let query = format!("DELETE FROM {} WHERE {} = ?", self.table, self.key_field); - let req = self.create_d1_query_request(&query, vec![path.into()])?; + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(D1Deleter::new(self.core.clone(), self.root.clone())), + )) + } - let resp = self.client.send(req).await?; - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), - _ => Err(parse_error(resp)), - } + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) } } diff --git a/core/src/services/d1/config.rs b/core/src/services/d1/config.rs index 168371f2b..e20b9c076 100644 --- a/core/src/services/d1/config.rs +++ b/core/src/services/d1/config.rs @@ -18,10 +18,11 @@ use std::fmt::Debug; use std::fmt::Formatter; -use super::backend::D1Builder; use serde::Deserialize; use serde::Serialize; +use super::backend::D1Builder; + /// Config for [Cloudflare D1](https://developers.cloudflare.com/d1) backend support. #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -57,6 +58,7 @@ impl Debug for D1Config { impl crate::Configurator for D1Config { type Builder = D1Builder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let account_id = uri.name().ok_or_else(|| { crate::Error::new( diff --git a/core/src/services/d1/core.rs b/core/src/services/d1/core.rs new file mode 100644 index 000000000..da179dbf3 --- /dev/null +++ b/core/src/services/d1/core.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use http::Request; +use http::StatusCode; +use http::header; +use serde_json::Value; + +use super::error::parse_error; +use super::model::*; +use crate::raw::*; +use crate::*; + +#[derive(Clone)] +pub struct D1Core { + pub authorization: Option<String>, + pub account_id: String, + pub database_id: String, + + pub client: HttpClient, + pub table: String, + pub key_field: String, + pub value_field: String, +} + +impl Debug for D1Core { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("D1Core"); + ds.field("table", &self.table); + ds.field("key_field", &self.key_field); + ds.field("value_field", &self.value_field); + ds.finish() + } +} + +impl D1Core { + fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> Result<Request<Buffer>> { + let p = format!( + "/accounts/{}/d1/database/{}/query", + self.account_id, self.database_id + ); + let url: String = format!( + "{}{}", + "https://api.cloudflare.com/client/v4", + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + if let Some(auth) = &self.authorization { + req = req.header(header::AUTHORIZATION, auth); + } + req = req.header(header::CONTENT_TYPE, "application/json"); + + let json = serde_json::json!({ + "sql": sql, + "params": params, + }); + + let body = serde_json::to_vec(&json).map_err(new_json_serialize_error)?; + req.body(Buffer::from(body)) + .map_err(new_request_build_error) + } + + pub async fn get(&self, path: &str) -> Result<Option<Buffer>> { + let query = format!( + "SELECT {} FROM {} WHERE {} = ? LIMIT 1", + self.value_field, self.table, self.key_field + ); + let req = self.create_d1_query_request(&query, vec![path.into()])?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => { + let body = resp.into_body(); + let bs = body.to_bytes(); + let d1_response = D1Response::parse(&bs)?; + Ok(d1_response.get_result(&self.value_field)) + } + _ => Err(parse_error(resp)), + } + } + + pub async fn set(&self, path: &str, value: Buffer) -> Result<()> { + let table = &self.table; + let key_field = &self.key_field; + let value_field = &self.value_field; + let query = format!( + "INSERT INTO {table} ({key_field}, {value_field}) \ + VALUES (?, ?) \ + ON CONFLICT ({key_field}) \ + DO UPDATE SET {value_field} = EXCLUDED.{value_field}", + ); + + let params = vec![path.into(), value.to_vec().into()]; + let req = self.create_d1_query_request(&query, params)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), + _ => Err(parse_error(resp)), + } + } + + pub async fn delete(&self, path: &str) -> Result<()> { + let query = format!("DELETE FROM {} WHERE {} = ?", self.table, self.key_field); + let req = self.create_d1_query_request(&query, vec![path.into()])?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()), + _ => Err(parse_error(resp)), + } + } +} diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/deleter.rs similarity index 61% copy from core/src/services/d1/mod.rs copy to core/src/services/d1/deleter.rs index 9c3c6ea4d..1ada4007e 100644 --- a/core/src/services/d1/mod.rs +++ b/core/src/services/d1/deleter.rs @@ -15,11 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod error; -mod model; +use std::sync::Arc; -mod backend; -pub use backend::D1Builder as D1; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; -mod config; -pub use config::D1Config; +pub struct D1Deleter { + core: Arc<D1Core>, + root: String, +} + +impl D1Deleter { + pub fn new(core: Arc<D1Core>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for D1Deleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p).await?; + Ok(()) + } +} diff --git a/core/src/services/d1/docs.md b/core/src/services/d1/docs.md index 798cc834b..290201a30 100644 --- a/core/src/services/d1/docs.md +++ b/core/src/services/d1/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete - [ ] copy - [ ] rename -- [ ] ~~list~~ +- [ ] list - [ ] ~~presign~~ -- [ ] blocking ## Configuration diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs index 37f6b9556..002a823c5 100644 --- a/core/src/services/d1/error.rs +++ b/core/src/services/d1/error.rs @@ -20,8 +20,7 @@ use http::Response; use http::StatusCode; use serde_json::de; -use super::model::D1Error; -use super::model::D1Response; +use super::model::*; use crate::raw::*; use crate::*; diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/mod.rs index 9c3c6ea4d..15e6240ae 100644 --- a/core/src/services/d1/mod.rs +++ b/core/src/services/d1/mod.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +mod core; +mod deleter; mod error; mod model; +mod writer; mod backend; pub use backend::D1Builder as D1; diff --git a/core/src/services/d1/model.rs b/core/src/services/d1/model.rs index 4e8f6eb59..d0daa9262 100644 --- a/core/src/services/d1/model.rs +++ b/core/src/services/d1/model.rs @@ -23,8 +23,7 @@ use serde::Serialize; use serde_json::Map; use serde_json::Value; -use crate::Buffer; -use crate::Error; +use crate::*; /// response data from d1 #[derive(Deserialize, Debug)] diff --git a/core/src/services/d1/writer.rs b/core/src/services/d1/writer.rs new file mode 100644 index 000000000..97751ff13 --- /dev/null +++ b/core/src/services/d1/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct D1Writer { + core: Arc<D1Core>, + path: String, + buffer: oio::QueueBuf, +} + +impl D1Writer { + pub fn new(core: Arc<D1Core>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for D1Writer { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
