This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new eb4b13622 feat(service/d1): Support d1 for opendal (#3248)
eb4b13622 is described below
commit eb4b13622d0b420e1c9a0bbfebe7df131b0f49fc
Author: taobo <[email protected]>
AuthorDate: Fri Oct 13 03:51:29 2023 -0500
feat(service/d1): Support d1 for opendal (#3248)
* feat(service/d1): Support d1 for opendal
* refactor: optimize code
* fix: delete extra code
* fix: optimize code
* refactor: errors and get method
* fix: let clippy http
* fix: let clippy happy
---
.env.example | 8 +
core/Cargo.toml | 1 +
core/src/services/d1/backend.rs | 331 ++++++++++++++++++++++++++++++++++++++++
core/src/services/d1/docs.md | 61 ++++++++
core/src/services/d1/error.rs | 83 ++++++++++
core/src/services/d1/mod.rs | 21 +++
core/src/services/d1/model.rs | 141 +++++++++++++++++
core/src/services/mod.rs | 5 +
core/src/types/scheme.rs | 4 +
core/tests/behavior/main.rs | 2 +
10 files changed, 657 insertions(+)
diff --git a/.env.example b/.env.example
index 79211cb97..b42f03bdd 100644
--- a/.env.example
+++ b/.env.example
@@ -178,3 +178,11 @@
OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
OPENDAL_SQLITE_TABLE=data
OPENDAL_SQLITE_KEY_FIELD=key
OPENDAL_SQLITE_VALUE_FIELD=data
+# d1
+OPENDAL_D1_TEST=false
+OPENDAL_D1_TOKEN=<token>
+OPENDAL_D1_ACCOUNT_ID=<account_id>
+OPENDAL_D1_DATABASE_ID=<database_id>
+OPENDAL_D1_TABLE=<table>
+OPENDAL_D1_KEY_FIELD=<key_field>
+OPENDAL_D1_VALUE_FIELD=<value_field>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index c5ca1b20c..5e20e949e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -120,6 +120,7 @@ services-cos = [
"reqsign?/services-tencent",
"reqsign?/reqwest_request",
]
+services-d1 = []
services-dashmap = ["dep:dashmap"]
services-dropbox = []
services-etcd = ["dep:etcd-client", "dep:bb8"]
diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs
new file mode 100644
index 000000000..faf0c472e
--- /dev/null
+++ b/core/src/services/d1/backend.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use async_trait::async_trait;
+use http::header;
+use http::Request;
+use http::StatusCode;
+use serde_json::Value;
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
+use crate::ErrorKind;
+use crate::*;
+
+use super::error::parse_error;
+use super::model::D1Response;
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct D1Builder {
+ token: Option<String>,
+ account_id: Option<String>,
+ database_id: Option<String>,
+
+ http_client: Option<HttpClient>,
+ root: Option<String>,
+
+ table: Option<String>,
+ key_field: Option<String>,
+ value_field: Option<String>,
+}
+
+impl Debug for D1Builder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("D1Builder");
+ ds.field("root", &self.root);
+ ds.field("table", &self.table);
+ ds.field("key_field", &self.key_field);
+ ds.field("value_field", &self.value_field);
+ ds.finish()
+ }
+}
+
+impl D1Builder {
+ /// Set api token for the cloudflare d1 service.
+ ///
+ /// create a api token from
[here](https://dash.cloudflare.com/profile/api-tokens)
+ pub fn token(&mut self, token: &str) -> &mut Self {
+ if !token.is_empty() {
+ self.token = Some(token.to_string());
+ }
+ self
+ }
+
+ /// Set the account identifier for the cloudflare d1 service.
+ ///
+ /// get the account identifier from Workers & Pages -> Overview -> Account
ID
+ /// If not specified, it will return an error when building.
+ pub fn account_id(&mut self, account_id: &str) -> &mut Self {
+ if !account_id.is_empty() {
+ self.account_id = Some(account_id.to_string());
+ }
+ self
+ }
+
+ /// Set the database identifier for the cloudflare d1 service.
+ ///
+ /// get the database identifier from Workers & Pages -> D1 -> [Your
Database] -> Database ID
+ /// If not specified, it will return an error when building.
+ pub fn database_id(&mut self, database_id: &str) -> &mut Self {
+ if !database_id.is_empty() {
+ self.database_id = Some(database_id.to_string());
+ }
+ self
+ }
+
+ /// set the working directory, all operations will be performed under it.
+ ///
+ /// default: "/"
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ if !root.is_empty() {
+ self.root = Some(root.to_owned());
+ }
+ self
+ }
+
+ /// Set the table name of the d1 service to read/write.
+ ///
+ /// If not specified, it will return an error when building.
+ pub fn table(&mut self, table: &str) -> &mut Self {
+ if !table.is_empty() {
+ self.table = Some(table.to_owned());
+ }
+ self
+ }
+
+ /// Set the key field name of the d1 service to read/write.
+ ///
+ /// Default to `key` if not specified.
+ pub fn key_field(&mut self, key_field: &str) -> &mut Self {
+ if !key_field.is_empty() {
+ self.key_field = Some(key_field.to_string());
+ }
+ self
+ }
+
+ /// Set the value field name of the d1 service to read/write.
+ ///
+ /// Default to `value` if not specified.
+ pub fn value_field(&mut self, value_field: &str) -> &mut Self {
+ if !value_field.is_empty() {
+ self.value_field = Some(value_field.to_string());
+ }
+ self
+ }
+}
+
+impl Builder for D1Builder {
+ const SCHEME: Scheme = Scheme::D1;
+ type Accessor = D1Backend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = D1Builder::default();
+ map.get("token").map(|v| builder.token(v));
+ map.get("account_id").map(|v| builder.account_id(v));
+ map.get("database_id").map(|v| builder.database_id(v));
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("table").map(|v| builder.table(v));
+ map.get("key_field").map(|v| builder.key_field(v));
+ map.get("value_field").map(|v| builder.value_field(v));
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let mut authorization = None;
+ if let Some(token) = &self.token {
+ authorization = Some(format_authorization_by_bearer(token)?)
+ }
+
+ let Some(account_id) = self.account_id.clone() else {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "account_id is required",
+ ));
+ };
+
+ let Some(database_id) = self.database_id.clone() else {
+ return Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "database_id is required",
+ ));
+ };
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::D1)
+ })?
+ };
+
+ let Some(table) = self.table.clone() else {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "table is
required"));
+ };
+
+ let key_field = self.key_field.clone().unwrap_or_else(||
"key".to_string());
+
+ let value_field = self
+ .value_field
+ .clone()
+ .unwrap_or_else(|| "value".to_string());
+
+ let root = normalize_root(
+ self.root
+ .clone()
+ .unwrap_or_else(|| "/".to_string())
+ .as_str(),
+ );
+ Ok(D1Backend::new(Adapter {
+ authorization,
+ account_id,
+ database_id,
+ client,
+ table,
+ key_field,
+ value_field,
+ })
+ .with_root(&root))
+ }
+}
+
+pub type D1Backend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+ authorization: Option<String>,
+ account_id: String,
+ database_id: String,
+
+ client: HttpClient,
+ table: String,
+ key_field: String,
+ value_field: String,
+}
+
+impl Debug for Adapter {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("D1Adapter");
+ ds.field("table", &self.table);
+ ds.field("key_field", &self.key_field);
+ ds.field("value_field", &self.value_field);
+ ds.finish()
+ }
+}
+
+impl Adapter {
+ fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) ->
Result<Request<AsyncBody>> {
+ let p = format!(
+ "/accounts/{}/d1/database/{}/query",
+ self.account_id, self.database_id
+ );
+ let url: String = format!(
+ "{}{}",
+ "https://api.cloudflare.com/client/v4",
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::post(&url);
+ if let Some(auth) = &self.authorization {
+ req = req.header(header::AUTHORIZATION, auth);
+ }
+ req = req.header(header::CONTENT_TYPE, "application/json");
+
+ let json = serde_json::json!({
+ "sql": sql,
+ "params": params,
+ });
+
+ let body =
serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
+ req.body(AsyncBody::Bytes(body.into()))
+ .map_err(new_request_build_error)
+ }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+ fn metadata(&self) -> kv::Metadata {
+ kv::Metadata::new(
+ Scheme::D1,
+ &self.table,
+ Capability {
+ read: true,
+ write: true,
+ ..Default::default()
+ },
+ )
+ }
+
+ async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ let query = format!(
+ "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
+ self.value_field, self.table, self.key_field
+ );
+ let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let body = resp.into_body().bytes().await?;
+ let d1_response = D1Response::parse(&body)?;
+ Ok(d1_response.get_result(&self.value_field))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+ let table = &self.table;
+ let key_field = &self.key_field;
+ let value_field = &self.value_field;
+ let query = format!(
+ "INSERT INTO {table} ({key_field}, {value_field}) \
+ VALUES (?, ?) \
+ ON CONFLICT ({key_field}) \
+ DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
+ );
+
+ let params = vec![path.into(), value.into()];
+ let req = self.create_d1_query_request(&query, params)?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let query = format!("DELETE FROM {} WHERE {} = ?", self.table,
self.key_field);
+ let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
diff --git a/core/src/services/d1/docs.md b/core/src/services/d1/docs.md
new file mode 100644
index 000000000..17e3c7123
--- /dev/null
+++ b/core/src/services/d1/docs.md
@@ -0,0 +1,61 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `token`: Set the token of cloudflare api
+- `account_identifier`: Set the account identifier of d1
+- `database_identifier`: Set the database identifier of d1
+- `endpoint`: Set the endpoint of d1 service
+- `table`: Set the table name of the d1 service to read/write
+- `key_field`: Set the key field of d1
+- `value_field`: Set the value field of d1
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::D1;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut builder = D1::default();
+ builder
+ .token("token")
+ .account_id("account_id")
+ .database_id("database_id")
+ .table("table")
+ .key_field("key_field")
+ .value_field("value_field");
+
+ let op = Operator::new(builder)?.finish();
+ let source_path = "ALFKI";
+ // set value to d1 "opendal test value" as Vec<u8>
+ let value = "opendal test value".as_bytes();
+ // write value to d1, the key is source_path
+ op.write(source_path, value).await?;
+ // read value from d1, the key is source_path
+ let v = op.read(source_path).await?;
+ assert_eq!(v, value);
+ // delete value from d1, the key is source_path
+ op.delete(source_path).await?;
+ Ok(())
+}
+```
diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs
new file mode 100644
index 000000000..2a2f75de4
--- /dev/null
+++ b/core/src/services/d1/error.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+use serde_json::de;
+
+use super::model::D1Error;
+use super::model::D1Response;
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (mut kind, mut retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ // Some services (like owncloud) return 403 while file locked.
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, true),
+ // Allowing retry for resource locked.
+ StatusCode::LOCKED => (ErrorKind::Unexpected, true),
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let (message, d1_err) = de::from_reader::<_,
D1Response>(bs.clone().reader())
+ .map(|d1_err| (format!("{d1_err:?}"), Some(d1_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ if let Some(d1_err) = d1_err {
+ (kind, retryable) =
parse_d1_error_code(d1_err.errors).unwrap_or((kind, retryable));
+ }
+
+ let mut err = Error::new(kind, &message);
+
+ err = with_error_response_context(err, parts);
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
+pub fn parse_d1_error_code(errors: Vec<D1Error>) -> Option<(ErrorKind, bool)> {
+ if errors.is_empty() {
+ return None;
+ }
+
+ match errors[0].code {
+ // The request is malformed: failed to decode id.
+ 7400 => Some((ErrorKind::Unexpected, false)),
+ // no such column: Xxxx.
+ 7500 => Some((ErrorKind::NotFound, false)),
+ // Authentication error.
+ 10000 => Some((ErrorKind::PermissionDenied, false)),
+ _ => None,
+ }
+}
diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/mod.rs
new file mode 100644
index 000000000..9163a135c
--- /dev/null
+++ b/core/src/services/d1/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+mod error;
+mod model;
+pub use backend::D1Builder as D1;
diff --git a/core/src/services/d1/model.rs b/core/src/services/d1/model.rs
new file mode 100644
index 000000000..c086af0d5
--- /dev/null
+++ b/core/src/services/d1/model.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Error;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::fmt::Debug;
+
+/// response data from d1
+#[derive(Deserialize, Debug)]
+pub struct D1Response {
+ pub result: Vec<D1Result>,
+ pub success: bool,
+ pub errors: Vec<D1Error>,
+ pub messages: Vec<D1Message>,
+}
+
+impl D1Response {
+ pub fn parse(bs: &Bytes) -> Result<D1Response, Error> {
+ let response: D1Response = serde_json::from_slice(bs).map_err(|e| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ &format!("failed to parse error response: {}", e),
+ )
+ })?;
+
+ if !response.success {
+ return Err(Error::new(
+ crate::ErrorKind::Unexpected,
+ &String::from_utf8_lossy(bs),
+ ));
+ }
+ Ok(response)
+ }
+
+ pub fn get_result(&self, key: &str) -> Option<Vec<u8>> {
+ if self.result.is_empty() || self.result[0].results.is_empty() {
+ return None;
+ }
+ let result = &self.result[0].results[0];
+ let value = result.get(key);
+
+ match value {
+ Some(Value::Array(s)) => {
+ let mut v = Vec::new();
+ for i in s {
+ if let Value::Number(n) = i {
+ v.push(n.as_u64().unwrap() as u8);
+ }
+ }
+ Some(v)
+ }
+ _ => None,
+ }
+ }
+}
+
+#[derive(Deserialize, Debug)]
+pub struct D1Result {
+ pub meta: Meta,
+ pub results: Vec<Map<String, Value>>,
+ pub success: bool,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Meta {
+ pub served_by: String,
+ pub duration: f64,
+ pub changes: i32,
+ pub last_row_id: i32,
+ pub changed_db: bool,
+ pub size_after: i32,
+ pub rows_read: i32,
+ pub rows_written: i32,
+}
+
+#[derive(Clone, Deserialize, Debug, Serialize)]
+pub struct D1Error {
+ pub message: String,
+ pub code: i32,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct D1Message {
+ pub message: String,
+ pub code: i32,
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn test_deserialize_get_object_json_response() {
+ let data = r#"
+ {
+ "result": [
+ {
+ "results": [
+ {
+ "CustomerId": "4",
+ "CompanyName": "Around the Horn",
+ "ContactName": "Thomas Hardy"
+ }
+ ],
+ "success": true,
+ "meta": {
+ "served_by": "v3-prod",
+ "duration": 0.2147,
+ "changes": 0,
+ "last_row_id": 0,
+ "changed_db": false,
+ "size_after": 2162688,
+ "rows_read": 3,
+ "rows_written": 2
+ }
+ }
+ ],
+ "success": true,
+ "errors": [],
+ "messages": []
+ }"#;
+ let response: D1Response = serde_json::from_str(data).unwrap();
+ println!("{:?}", response.result[0].results[0]);
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 5293aaa44..93da8b917 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -228,3 +228,8 @@ pub use self::mysql::Mysql;
mod sqlite;
#[cfg(feature = "services-sqlite")]
pub use self::sqlite::Sqlite;
+
+#[cfg(feature = "services-d1")]
+mod d1;
+#[cfg(feature = "services-d1")]
+pub use self::d1::D1;
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index ec3298b7b..fad9d7f6e 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -41,6 +41,8 @@ pub enum Scheme {
Cacache,
/// [cos][crate::services::Cos]: Tencent Cloud Object Storage services.
Cos,
+ /// [d1][crate::services::D1]: D1 services
+ D1,
/// [dashmap][crate::services::Dashmap]: dashmap backend support.
Dashmap,
/// [etcd][crate::services::Etcd]: Etcd Services
@@ -159,6 +161,7 @@ impl FromStr for Scheme {
"azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls),
"cacache" => Ok(Scheme::Cacache),
"cos" => Ok(Scheme::Cos),
+ "d1" => Ok(Scheme::D1),
"dashmap" => Ok(Scheme::Dashmap),
"dropbox" => Ok(Scheme::Dropbox),
"etcd" => Ok(Scheme::Etcd),
@@ -208,6 +211,7 @@ impl From<Scheme> for &'static str {
Scheme::Azdls => "Azdls",
Scheme::Cacache => "cacache",
Scheme::Cos => "cos",
+ Scheme::D1 => "d1",
Scheme::Dashmap => "dashmap",
Scheme::Etcd => "etcd",
Scheme::Fs => "fs",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index de38efd14..cc0bf667f 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -185,6 +185,8 @@ fn main() -> anyhow::Result<()> {
tests.extend(behavior_test::<services::Mysql>());
#[cfg(feature = "services-sqlite")]
tests.extend(behavior_test::<services::Sqlite>());
+ #[cfg(feature = "services-d1")]
+ tests.extend(behavior_test::<services::D1>());
// Don't init logging while building operator which may break cargo
// nextest output