This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new eb4b13622 feat(service/d1): Support d1 for opendal (#3248)
eb4b13622 is described below

commit eb4b13622d0b420e1c9a0bbfebe7df131b0f49fc
Author: taobo <[email protected]>
AuthorDate: Fri Oct 13 03:51:29 2023 -0500

    feat(service/d1): Support d1 for opendal (#3248)
    
    * feat(service/d1): Support d1 for opendal
    
    * refactor: optimize code
    
    * fix: delete extra code
    
    * fix: optimize code
    
    * refactor: errors and get method
    
    * fix: let clippy http
    
    * fix: let clippy happy
---
 .env.example                    |   8 +
 core/Cargo.toml                 |   1 +
 core/src/services/d1/backend.rs | 331 ++++++++++++++++++++++++++++++++++++++++
 core/src/services/d1/docs.md    |  61 ++++++++
 core/src/services/d1/error.rs   |  83 ++++++++++
 core/src/services/d1/mod.rs     |  21 +++
 core/src/services/d1/model.rs   | 141 +++++++++++++++++
 core/src/services/mod.rs        |   5 +
 core/src/types/scheme.rs        |   4 +
 core/tests/behavior/main.rs     |   2 +
 10 files changed, 657 insertions(+)

diff --git a/.env.example b/.env.example
index 79211cb97..b42f03bdd 100644
--- a/.env.example
+++ b/.env.example
@@ -178,3 +178,11 @@ 
OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
 OPENDAL_SQLITE_TABLE=data
 OPENDAL_SQLITE_KEY_FIELD=key
 OPENDAL_SQLITE_VALUE_FIELD=data
+# d1
+OPENDAL_D1_TEST=false
+OPENDAL_D1_TOKEN=<token>
+OPENDAL_D1_ACCOUNT_ID=<account_id>
+OPENDAL_D1_DATABASE_ID=<database_id>
+OPENDAL_D1_TABLE=<table>
+OPENDAL_D1_KEY_FIELD=<key_field>
+OPENDAL_D1_VALUE_FIELD=<value_field>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index c5ca1b20c..5e20e949e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -120,6 +120,7 @@ services-cos = [
   "reqsign?/services-tencent",
   "reqsign?/reqwest_request",
 ]
+services-d1 = []
 services-dashmap = ["dep:dashmap"]
 services-dropbox = []
 services-etcd = ["dep:etcd-client", "dep:bb8"]
diff --git a/core/src/services/d1/backend.rs b/core/src/services/d1/backend.rs
new file mode 100644
index 000000000..faf0c472e
--- /dev/null
+++ b/core/src/services/d1/backend.rs
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use async_trait::async_trait;
+use http::header;
+use http::Request;
+use http::StatusCode;
+use serde_json::Value;
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
+use crate::ErrorKind;
+use crate::*;
+
+use super::error::parse_error;
+use super::model::D1Response;
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct D1Builder {
+    token: Option<String>,
+    account_id: Option<String>,
+    database_id: Option<String>,
+
+    http_client: Option<HttpClient>,
+    root: Option<String>,
+
+    table: Option<String>,
+    key_field: Option<String>,
+    value_field: Option<String>,
+}
+
+impl Debug for D1Builder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("D1Builder");
+        ds.field("root", &self.root);
+        ds.field("table", &self.table);
+        ds.field("key_field", &self.key_field);
+        ds.field("value_field", &self.value_field);
+        ds.finish()
+    }
+}
+
+impl D1Builder {
+    /// Set api token for the cloudflare d1 service.
+    ///
+    /// create a api token from 
[here](https://dash.cloudflare.com/profile/api-tokens)
+    pub fn token(&mut self, token: &str) -> &mut Self {
+        if !token.is_empty() {
+            self.token = Some(token.to_string());
+        }
+        self
+    }
+
+    /// Set the account identifier for the cloudflare d1 service.
+    ///
+    /// get the account identifier from Workers & Pages -> Overview -> Account 
ID
+    /// If not specified, it will return an error when building.
+    pub fn account_id(&mut self, account_id: &str) -> &mut Self {
+        if !account_id.is_empty() {
+            self.account_id = Some(account_id.to_string());
+        }
+        self
+    }
+
+    /// Set the database identifier for the cloudflare d1 service.
+    ///
+    /// get the database identifier from Workers & Pages -> D1 -> [Your 
Database] -> Database ID
+    /// If not specified, it will return an error when building.
+    pub fn database_id(&mut self, database_id: &str) -> &mut Self {
+        if !database_id.is_empty() {
+            self.database_id = Some(database_id.to_string());
+        }
+        self
+    }
+
+    /// set the working directory, all operations will be performed under it.
+    ///
+    /// default: "/"
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        if !root.is_empty() {
+            self.root = Some(root.to_owned());
+        }
+        self
+    }
+
+    /// Set the table name of the d1 service to read/write.
+    ///
+    /// If not specified, it will return an error when building.
+    pub fn table(&mut self, table: &str) -> &mut Self {
+        if !table.is_empty() {
+            self.table = Some(table.to_owned());
+        }
+        self
+    }
+
+    /// Set the key field name of the d1 service to read/write.
+    ///
+    /// Default to `key` if not specified.
+    pub fn key_field(&mut self, key_field: &str) -> &mut Self {
+        if !key_field.is_empty() {
+            self.key_field = Some(key_field.to_string());
+        }
+        self
+    }
+
+    /// Set the value field name of the d1 service to read/write.
+    ///
+    /// Default to `value` if not specified.
+    pub fn value_field(&mut self, value_field: &str) -> &mut Self {
+        if !value_field.is_empty() {
+            self.value_field = Some(value_field.to_string());
+        }
+        self
+    }
+}
+
+impl Builder for D1Builder {
+    const SCHEME: Scheme = Scheme::D1;
+    type Accessor = D1Backend;
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        let mut builder = D1Builder::default();
+        map.get("token").map(|v| builder.token(v));
+        map.get("account_id").map(|v| builder.account_id(v));
+        map.get("database_id").map(|v| builder.database_id(v));
+
+        map.get("root").map(|v| builder.root(v));
+        map.get("table").map(|v| builder.table(v));
+        map.get("key_field").map(|v| builder.key_field(v));
+        map.get("value_field").map(|v| builder.value_field(v));
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        let mut authorization = None;
+        if let Some(token) = &self.token {
+            authorization = Some(format_authorization_by_bearer(token)?)
+        }
+
+        let Some(account_id) = self.account_id.clone() else {
+            return Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "account_id is required",
+            ));
+        };
+
+        let Some(database_id) = self.database_id.clone() else {
+            return Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "database_id is required",
+            ));
+        };
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::D1)
+            })?
+        };
+
+        let Some(table) = self.table.clone() else {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "table is 
required"));
+        };
+
+        let key_field = self.key_field.clone().unwrap_or_else(|| 
"key".to_string());
+
+        let value_field = self
+            .value_field
+            .clone()
+            .unwrap_or_else(|| "value".to_string());
+
+        let root = normalize_root(
+            self.root
+                .clone()
+                .unwrap_or_else(|| "/".to_string())
+                .as_str(),
+        );
+        Ok(D1Backend::new(Adapter {
+            authorization,
+            account_id,
+            database_id,
+            client,
+            table,
+            key_field,
+            value_field,
+        })
+        .with_root(&root))
+    }
+}
+
+pub type D1Backend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+    authorization: Option<String>,
+    account_id: String,
+    database_id: String,
+
+    client: HttpClient,
+    table: String,
+    key_field: String,
+    value_field: String,
+}
+
+impl Debug for Adapter {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("D1Adapter");
+        ds.field("table", &self.table);
+        ds.field("key_field", &self.key_field);
+        ds.field("value_field", &self.value_field);
+        ds.finish()
+    }
+}
+
+impl Adapter {
+    fn create_d1_query_request(&self, sql: &str, params: Vec<Value>) -> 
Result<Request<AsyncBody>> {
+        let p = format!(
+            "/accounts/{}/d1/database/{}/query",
+            self.account_id, self.database_id
+        );
+        let url: String = format!(
+            "{}{}",
+            "https://api.cloudflare.com/client/v4";,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+        if let Some(auth) = &self.authorization {
+            req = req.header(header::AUTHORIZATION, auth);
+        }
+        req = req.header(header::CONTENT_TYPE, "application/json");
+
+        let json = serde_json::json!({
+            "sql": sql,
+            "params": params,
+        });
+
+        let body = 
serde_json::to_vec(&json).map_err(new_json_serialize_error)?;
+        req.body(AsyncBody::Bytes(body.into()))
+            .map_err(new_request_build_error)
+    }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+    fn metadata(&self) -> kv::Metadata {
+        kv::Metadata::new(
+            Scheme::D1,
+            &self.table,
+            Capability {
+                read: true,
+                write: true,
+                ..Default::default()
+            },
+        )
+    }
+
+    async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+        let query = format!(
+            "SELECT {} FROM {} WHERE {} = ? LIMIT 1",
+            self.value_field, self.table, self.key_field
+        );
+        let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let body = resp.into_body().bytes().await?;
+                let d1_response = D1Response::parse(&body)?;
+                Ok(d1_response.get_result(&self.value_field))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+        let table = &self.table;
+        let key_field = &self.key_field;
+        let value_field = &self.value_field;
+        let query = format!(
+            "INSERT INTO {table} ({key_field}, {value_field}) \
+                VALUES (?, ?) \
+                ON CONFLICT ({key_field}) \
+                    DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
+        );
+
+        let params = vec![path.into(), value.into()];
+        let req = self.create_d1_query_request(&query, params)?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn delete(&self, path: &str) -> Result<()> {
+        let query = format!("DELETE FROM {} WHERE {} = ?", self.table, 
self.key_field);
+        let req = self.create_d1_query_request(&query, vec![path.into()])?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/services/d1/docs.md b/core/src/services/d1/docs.md
new file mode 100644
index 000000000..17e3c7123
--- /dev/null
+++ b/core/src/services/d1/docs.md
@@ -0,0 +1,61 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `token`: Set the token of cloudflare api
+- `account_identifier`: Set the account identifier of d1
+- `database_identifier`: Set the database identifier of d1
+- `endpoint`: Set the endpoint of d1 service
+- `table`: Set the table name of the d1 service to read/write
+- `key_field`: Set the key field of d1
+- `value_field`: Set the value field of d1
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::D1;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut builder = D1::default();
+    builder
+        .token("token")
+        .account_id("account_id")
+        .database_id("database_id")
+        .table("table")
+        .key_field("key_field")
+        .value_field("value_field");
+
+    let op = Operator::new(builder)?.finish();
+    let source_path = "ALFKI";
+    // set value to d1 "opendal test value" as Vec<u8>
+    let value = "opendal test value".as_bytes();
+    // write value to d1, the key is source_path
+    op.write(source_path, value).await?;
+    // read value from d1, the key is source_path
+    let v = op.read(source_path).await?;
+    assert_eq!(v, value);
+    // delete value from d1, the key is source_path
+    op.delete(source_path).await?;
+    Ok(())
+}
+```
diff --git a/core/src/services/d1/error.rs b/core/src/services/d1/error.rs
new file mode 100644
index 000000000..2a2f75de4
--- /dev/null
+++ b/core/src/services/d1/error.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+use serde_json::de;
+
+use super::model::D1Error;
+use super::model::D1Response;
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (mut kind, mut retryable) = match parts.status {
+        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+        // Some services (like owncloud) return 403 while file locked.
+        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, true),
+        // Allowing retry for resource locked.
+        StatusCode::LOCKED => (ErrorKind::Unexpected, true),
+        StatusCode::INTERNAL_SERVER_ERROR
+        | StatusCode::BAD_GATEWAY
+        | StatusCode::SERVICE_UNAVAILABLE
+        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, d1_err) = de::from_reader::<_, 
D1Response>(bs.clone().reader())
+        .map(|d1_err| (format!("{d1_err:?}"), Some(d1_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    if let Some(d1_err) = d1_err {
+        (kind, retryable) = 
parse_d1_error_code(d1_err.errors).unwrap_or((kind, retryable));
+    }
+
+    let mut err = Error::new(kind, &message);
+
+    err = with_error_response_context(err, parts);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
+
+pub fn parse_d1_error_code(errors: Vec<D1Error>) -> Option<(ErrorKind, bool)> {
+    if errors.is_empty() {
+        return None;
+    }
+
+    match errors[0].code {
+        // The request is malformed: failed to decode id.
+        7400 => Some((ErrorKind::Unexpected, false)),
+        // no such column: Xxxx.
+        7500 => Some((ErrorKind::NotFound, false)),
+        // Authentication error.
+        10000 => Some((ErrorKind::PermissionDenied, false)),
+        _ => None,
+    }
+}
diff --git a/core/src/services/d1/mod.rs b/core/src/services/d1/mod.rs
new file mode 100644
index 000000000..9163a135c
--- /dev/null
+++ b/core/src/services/d1/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+mod error;
+mod model;
+pub use backend::D1Builder as D1;
diff --git a/core/src/services/d1/model.rs b/core/src/services/d1/model.rs
new file mode 100644
index 000000000..c086af0d5
--- /dev/null
+++ b/core/src/services/d1/model.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Error;
+use bytes::Bytes;
+use serde::{Deserialize, Serialize};
+use serde_json::{Map, Value};
+use std::fmt::Debug;
+
+/// response data from d1
+#[derive(Deserialize, Debug)]
+pub struct D1Response {
+    pub result: Vec<D1Result>,
+    pub success: bool,
+    pub errors: Vec<D1Error>,
+    pub messages: Vec<D1Message>,
+}
+
+impl D1Response {
+    pub fn parse(bs: &Bytes) -> Result<D1Response, Error> {
+        let response: D1Response = serde_json::from_slice(bs).map_err(|e| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                &format!("failed to parse error response: {}", e),
+            )
+        })?;
+
+        if !response.success {
+            return Err(Error::new(
+                crate::ErrorKind::Unexpected,
+                &String::from_utf8_lossy(bs),
+            ));
+        }
+        Ok(response)
+    }
+
+    pub fn get_result(&self, key: &str) -> Option<Vec<u8>> {
+        if self.result.is_empty() || self.result[0].results.is_empty() {
+            return None;
+        }
+        let result = &self.result[0].results[0];
+        let value = result.get(key);
+
+        match value {
+            Some(Value::Array(s)) => {
+                let mut v = Vec::new();
+                for i in s {
+                    if let Value::Number(n) = i {
+                        v.push(n.as_u64().unwrap() as u8);
+                    }
+                }
+                Some(v)
+            }
+            _ => None,
+        }
+    }
+}
+
+#[derive(Deserialize, Debug)]
+pub struct D1Result {
+    pub meta: Meta,
+    pub results: Vec<Map<String, Value>>,
+    pub success: bool,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Meta {
+    pub served_by: String,
+    pub duration: f64,
+    pub changes: i32,
+    pub last_row_id: i32,
+    pub changed_db: bool,
+    pub size_after: i32,
+    pub rows_read: i32,
+    pub rows_written: i32,
+}
+
+#[derive(Clone, Deserialize, Debug, Serialize)]
+pub struct D1Error {
+    pub message: String,
+    pub code: i32,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct D1Message {
+    pub message: String,
+    pub code: i32,
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn test_deserialize_get_object_json_response() {
+        let data = r#"
+        {
+            "result": [
+                {
+                    "results": [
+                        {
+                            "CustomerId": "4",
+                            "CompanyName": "Around the Horn",
+                            "ContactName": "Thomas Hardy"
+                        }
+                    ],
+                    "success": true,
+                    "meta": {
+                        "served_by": "v3-prod",
+                        "duration": 0.2147,
+                        "changes": 0,
+                        "last_row_id": 0,
+                        "changed_db": false,
+                        "size_after": 2162688,
+                        "rows_read": 3,
+                        "rows_written": 2
+                    }
+                }
+            ],
+            "success": true,
+            "errors": [],
+            "messages": []
+        }"#;
+        let response: D1Response = serde_json::from_str(data).unwrap();
+        println!("{:?}", response.result[0].results[0]);
+    }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 5293aaa44..93da8b917 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -228,3 +228,8 @@ pub use self::mysql::Mysql;
 mod sqlite;
 #[cfg(feature = "services-sqlite")]
 pub use self::sqlite::Sqlite;
+
+#[cfg(feature = "services-d1")]
+mod d1;
+#[cfg(feature = "services-d1")]
+pub use self::d1::D1;
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index ec3298b7b..fad9d7f6e 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -41,6 +41,8 @@ pub enum Scheme {
     Cacache,
     /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services.
     Cos,
+    /// [d1][crate::services::D1]: D1 services
+    D1,
     /// [dashmap][crate::services::Dashmap]: dashmap backend support.
     Dashmap,
     /// [etcd][crate::services::Etcd]: Etcd Services
@@ -159,6 +161,7 @@ impl FromStr for Scheme {
             "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls),
             "cacache" => Ok(Scheme::Cacache),
             "cos" => Ok(Scheme::Cos),
+            "d1" => Ok(Scheme::D1),
             "dashmap" => Ok(Scheme::Dashmap),
             "dropbox" => Ok(Scheme::Dropbox),
             "etcd" => Ok(Scheme::Etcd),
@@ -208,6 +211,7 @@ impl From<Scheme> for &'static str {
             Scheme::Azdls => "Azdls",
             Scheme::Cacache => "cacache",
             Scheme::Cos => "cos",
+            Scheme::D1 => "d1",
             Scheme::Dashmap => "dashmap",
             Scheme::Etcd => "etcd",
             Scheme::Fs => "fs",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index de38efd14..cc0bf667f 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -185,6 +185,8 @@ fn main() -> anyhow::Result<()> {
     tests.extend(behavior_test::<services::Mysql>());
     #[cfg(feature = "services-sqlite")]
     tests.extend(behavior_test::<services::Sqlite>());
+    #[cfg(feature = "services-d1")]
+    tests.extend(behavior_test::<services::D1>());
 
     // Don't init logging while building operator which may break cargo
     // nextest output

Reply via email to