This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new cf17729bd feat(service/libsql): support libsql (#3233)
cf17729bd is described below
commit cf17729bd723c1010c960726ac77009453b55dea
Author: G-XD <[email protected]>
AuthorDate: Wed Oct 11 14:17:51 2023 +0800
feat(service/libsql): support libsql (#3233)
* feat(service/libsql): support libsql
* ci(service/libsql): add ci action
* feat(service/libsql): replace libsql-client with basic http request
* ci(service/libsql): add test of auth
* doc(service/libsql): update
---
.env.example | 8 +
.github/workflows/service_test_libsql.yml | 132 ++++++++++
Cargo.lock | 97 ++++----
core/Cargo.toml | 2 +
core/src/services/libsql/backend.rs | 400 ++++++++++++++++++++++++++++++
core/src/services/libsql/docs.md | 50 ++++
core/src/services/libsql/error.rs | 60 +++++
core/src/services/libsql/mod.rs | 21 ++
core/src/services/mod.rs | 5 +
core/src/types/scheme.rs | 4 +
core/tests/behavior/main.rs | 2 +
fixtures/libsql/docker-compose-auth.yml | 28 +++
fixtures/libsql/docker-compose.yml | 27 ++
13 files changed, 794 insertions(+), 42 deletions(-)
diff --git a/.env.example b/.env.example
index 08b3c255e..79211cb97 100644
--- a/.env.example
+++ b/.env.example
@@ -164,6 +164,14 @@ OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
OPENDAL_GDRIVE_CLIENT_ID=<client_id>
OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
+# libsql
+OPENDAL_LIBSQL_TEST=false
+OPENDAL_LIBSQL_ROOT=/tmp/opendal/
+OPENDAL_LIBSQL_CONNECTION_STRING=https://example.com/db
+OPENDAL_LIBSQL_AUTH_TOKEN=<secret>
+OPENDAL_LIBSQL_TABLE=t_opendal
+OPENDAL_LIBSQL_KEY_FIELD=key
+OPENDAL_LIBSQL_VALUE_FIELD=val
# sqlite
OPENDAL_SQLITE_TEST=on
OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
diff --git a/.github/workflows/service_test_libsql.yml
b/.github/workflows/service_test_libsql.yml
new file mode 100644
index 000000000..68ac2124c
--- /dev/null
+++ b/.github/workflows/service_test_libsql.yml
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Libsql
+
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "core/src/**"
+ - "core/tests/**"
+ - "!core/src/docs/**"
+ - "!core/src/services/**"
+ - "core/src/services/libsql/**"
+ - ".github/workflows/service_test_libsql.yml"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ libsql:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup libsql Server
+ shell: bash
+ working-directory: fixtures/libsql
+ run: docker-compose -f docker-compose.yml up -d
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ with:
+ need-nextest: true
+
+ - name: Create table
+ shell: bash
+ working-directory: core
+ run: |
+ curl --location '127.0.0.1:8080/v2/pipeline' \
+ --header 'Content-Type: application/json' \
+ --data '{
+ "baton": null,
+ "requests": [
+ {
+ "type": "execute",
+ "stmt": {
+ "sql": "CREATE TABLE IF NOT EXISTS `data` (`key`
TEXT PRIMARY KEY NOT NULL CHECK(length(key) <= 255),`data` BLOB);",
+ "args": [],
+ "want_rows": true
+ }
+ }
+ ]
+ }'
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ run: cargo nextest run libsql --features services-libsql
+ env:
+ OPENDAL_LIBSQL_TEST: on
+ OPENDAL_LIBSQL_CONNECTION_STRING: http://127.0.0.1:8080
+ OPENDAL_LIBSQL_TABLE: data
+ OPENDAL_LIBSQL_KEY_FIELD: key
+ OPENDAL_LIBSQL_VALUE_FIELD: data
+
+ libsql-auth:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup libsql-auth Server
+ shell: bash
+ working-directory: fixtures/libsql
+ run: docker-compose -f docker-compose-auth.yml up -d
+
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ with:
+ need-nextest: true
+
+ - name: Create table
+ shell: bash
+ working-directory: core
+ run: |
+ curl --location '127.0.0.1:8080/v2/pipeline' \
+ --header 'Content-Type: application/json' \
+ --header 'Authorization: Bearer
eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjc5ODg0ODM4Mjd9.MatB2aLnPFusagqH2RMoVExP37o2GFLmaJbmd52OdLtAehRNeqeJZPrefP1t2GBFidApUTLlaBRL6poKq_s3CQ'
\
+ --data '{
+ "baton": null,
+ "requests": [
+ {
+ "type": "execute",
+ "stmt": {
+ "sql": "CREATE TABLE IF NOT EXISTS `data` (`key`
TEXT PRIMARY KEY NOT NULL CHECK(length(key) <= 255),`data` BLOB);",
+ "args": [],
+ "want_rows": true
+ }
+ }
+ ]
+ }'
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ run: cargo nextest run libsql --features services-libsql
+ env:
+ OPENDAL_LIBSQL_TEST: on
+ OPENDAL_LIBSQL_CONNECTION_STRING: http://127.0.0.1:8080
+ OPENDAL_LIBSQL_AUTH_TOKEN:
eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJleHAiOjc5ODg0ODM4Mjd9.MatB2aLnPFusagqH2RMoVExP37o2GFLmaJbmd52OdLtAehRNeqeJZPrefP1t2GBFidApUTLlaBRL6poKq_s3CQ
+ OPENDAL_LIBSQL_TABLE: data
+ OPENDAL_LIBSQL_KEY_FIELD: key
+ OPENDAL_LIBSQL_VALUE_FIELD: data
diff --git a/Cargo.lock b/Cargo.lock
index 601e740cd..d5f269a7b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -212,7 +212,7 @@ checksum =
"840d2e9edec91ac974365978efc6f00781ff497e706a12306fff29ae92f8ad46"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -316,7 +316,7 @@ checksum =
"0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -364,7 +364,7 @@ checksum =
"16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -394,7 +394,7 @@ checksum =
"7b2d0f03b3640e3a630367e40c468cb7f309529c708ed1d88597047b0e7c6ef7"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -649,7 +649,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
- "syn 2.0.23",
+ "syn 2.0.32",
"which",
]
@@ -1058,7 +1058,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1383,7 +1383,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed5fff0d93c7559121e9c72bf9c242295869396255071ff2cb1617147b608c5"
dependencies = [
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1416,7 +1416,7 @@ dependencies = [
"proc-macro2",
"quote",
"scratch",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1433,7 +1433,7 @@ checksum =
"2fa16a70dd58129e4dfffdff535fb1bce66673f7bbeec4a5a1765a504e1ccd84"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1481,7 +1481,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1503,7 +1503,7 @@ checksum =
"836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
dependencies = [
"darling_core 0.20.3",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -1628,7 +1628,7 @@ checksum =
"53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -2051,7 +2051,7 @@ checksum =
"83c8d52fe8b46ab822b4decdcc0d6d85aeedfc98f0d52ba2bd4aec4a97807516"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
"try_map",
]
@@ -2089,7 +2089,7 @@ checksum =
"b0fa992f1656e1707946bbba340ad244f0814009ef8c0118eb7b658395f19a2e"
dependencies = [
"frunk_proc_macro_helpers",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -2101,7 +2101,7 @@ dependencies = [
"frunk_core",
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -2113,7 +2113,7 @@ dependencies = [
"frunk_core",
"frunk_proc_macro_helpers",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -2203,7 +2203,7 @@ checksum =
"89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -2504,6 +2504,18 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "hrana-client-proto"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f16b4e41e289da3fd60e64f245246a97e78fab7b3788c6d8147b3ae7d9f5e533"
+dependencies = [
+ "anyhow",
+ "base64 0.21.2",
+ "serde",
+ "serde_json",
+]
+
[[package]]
name = "htmlescape"
version = "0.3.1"
@@ -2912,7 +2924,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -3361,7 +3373,7 @@ checksum =
"4901771e1d44ddb37964565c654a3223ba41a594d02b8da471cc4464912b5cfa"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -3543,7 +3555,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
"termcolor",
"thiserror",
]
@@ -4019,6 +4031,7 @@ dependencies = [
"futures",
"governor",
"hdrs",
+ "hrana-client-proto",
"http",
"hyper",
"lazy-regex 3.0.1",
@@ -4309,7 +4322,7 @@ checksum =
"a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -4701,7 +4714,7 @@ checksum =
"4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -4897,7 +4910,7 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "9825a04601d60621feed79c4e6b56d65db77cdca55cef43b46b0de1096d1c282"
dependencies = [
"proc-macro2",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -5019,7 +5032,7 @@ checksum =
"440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -6003,9 +6016,9 @@ dependencies = [
[[package]]
name = "serde"
-version = "1.0.166"
+version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d01b7404f9d441d3ad40e6a636a7782c377d2abdbe4fa2440e2edcc2f4f10db8"
+checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [
"serde_derive",
]
@@ -6021,13 +6034,13 @@ dependencies = [
[[package]]
name = "serde_derive"
-version = "1.0.166"
+version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5dd83d6dde2b6b2d466e14d9d1acce8816dedee94f735eac6395808b3483c6d6"
+checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -6420,9 +6433,9 @@ dependencies = [
[[package]]
name = "syn"
-version = "2.0.23"
+version = "2.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59fb7d6d8281a51045d62b8eb3a7d1ce347b76f312af50cd3dc0af39c87c1737"
+checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2"
dependencies = [
"proc-macro2",
"quote",
@@ -6495,22 +6508,22 @@ checksum =
"aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8"
[[package]]
name = "thiserror"
-version = "1.0.40"
+version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
+checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.40"
+version = "1.0.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
+checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -6705,7 +6718,7 @@ checksum =
"630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -6918,9 +6931,9 @@ dependencies = [
[[package]]
name = "tower-http"
-version = "0.4.1"
+version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8bd22a874a2d0b70452d5597b12c537331d49060824a95f49f108994f94aa4c"
+checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
dependencies = [
"bitflags 2.3.3",
"bytes",
@@ -6968,7 +6981,7 @@ checksum =
"5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
]
[[package]]
@@ -7353,7 +7366,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
"wasm-bindgen-shared",
]
@@ -7387,7 +7400,7 @@ checksum =
"54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
- "syn 2.0.23",
+ "syn 2.0.32",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 5b1e7b5b5..c5ca1b20c 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -137,6 +137,7 @@ services-hdfs = ["dep:hdrs"]
services-http = []
services-ipfs = ["dep:prost"]
services-ipmfs = []
+services-libsql = ["dep:hrana-client-proto"]
services-memcached = ["dep:bb8"]
services-memory = []
services-mini-moka = ["dep:mini-moka"]
@@ -228,6 +229,7 @@ governor = { version = "0.5", optional = true, features =
["std"] }
hdrs = { version = "0.3.0", optional = true, features = ["async_file"] }
http = "0.2.9"
hyper = "0.14"
+hrana-client-proto = { version = "0.2.1", optional = true }
lazy-regex = { version = "3.0.1", optional = true }
log = "0.4"
madsim = { version = "0.2.21", optional = true }
diff --git a/core/src/services/libsql/backend.rs
b/core/src/services/libsql/backend.rs
new file mode 100644
index 000000000..db1291485
--- /dev/null
+++ b/core/src/services/libsql/backend.rs
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::str;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use hrana_client_proto::pipeline::{
+ ClientMsg, Response, ServerMsg, StreamExecuteReq, StreamExecuteResult,
StreamRequest,
+ StreamResponse, StreamResponseError, StreamResponseOk,
+};
+use hrana_client_proto::Error as PipelineError;
+use hrana_client_proto::{Stmt, StmtResult, Value};
+use http::{Request, Uri};
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
+use crate::*;
+
+use super::error::parse_error;
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct LibsqlBuilder {
+ connection_string: Option<String>,
+ auth_token: Option<String>,
+
+ table: Option<String>,
+ key_field: Option<String>,
+ value_field: Option<String>,
+ root: Option<String>,
+}
+
+impl Debug for LibsqlBuilder {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("LibsqlBuilder");
+ ds.field("connection_string", &self.connection_string)
+ .field("table", &self.table)
+ .field("key_field", &self.key_field)
+ .field("value_field", &self.value_field)
+ .field("root", &self.root);
+
+ if self.auth_token.is_some() {
+ ds.field("auth_token", &"<redacted>");
+ }
+ ds.finish()
+ }
+}
+
+impl LibsqlBuilder {
+ /// Set the connection_string of the libsql service.
+ ///
+ /// This connection string is used to connect to the libsql service. There
are url based formats:
+ ///
+ /// ## Url
+ ///
+ /// This format resembles the url format of the libsql client.
+ ///
+ /// for a remote database connection:
+ ///
+ /// - `http://example.com/db`
+ /// - `https://example.com/db`
+ /// - `libsql://example.com/db`
+ pub fn connection_string(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.connection_string = Some(v.to_string());
+ }
+ self
+ }
+
+ /// set the authentication token for libsql service.
+ ///
+ /// default: no authentication token
+ pub fn auth_token(&mut self, auth_token: &str) -> &mut Self {
+ if !auth_token.is_empty() {
+ self.auth_token = Some(auth_token.to_owned());
+ }
+ self
+ }
+
+ /// set the working directory, all operations will be performed under it.
+ ///
+ /// default: "/"
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ if !root.is_empty() {
+ self.root = Some(root.to_string());
+ }
+ self
+ }
+
+ /// Set the table name of the libsql service to read/write.
+ pub fn table(&mut self, table: &str) -> &mut Self {
+ if !table.is_empty() {
+ self.table = Some(table.to_string());
+ }
+ self
+ }
+
+ /// Set the key field name of the libsql service to read/write.
+ ///
+ /// Default to `key` if not specified.
+ pub fn key_field(&mut self, key_field: &str) -> &mut Self {
+ if !key_field.is_empty() {
+ self.key_field = Some(key_field.to_string());
+ }
+ self
+ }
+
+ /// Set the value field name of the libsql service to read/write.
+ ///
+ /// Default to `value` if not specified.
+ pub fn value_field(&mut self, value_field: &str) -> &mut Self {
+ if !value_field.is_empty() {
+ self.value_field = Some(value_field.to_string());
+ }
+ self
+ }
+}
+
+impl Builder for LibsqlBuilder {
+ const SCHEME: Scheme = Scheme::Libsql;
+ type Accessor = LibsqlBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = LibsqlBuilder::default();
+ map.get("connection_string")
+ .map(|v| builder.connection_string(v));
+ map.get("auth_token").map(|v| builder.auth_token(v));
+ map.get("table").map(|v| builder.table(v));
+ map.get("key_field").map(|v| builder.key_field(v));
+ map.get("value_field").map(|v| builder.value_field(v));
+ map.get("root").map(|v| builder.root(v));
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let conn = self.get_connection_string()?;
+
+ let table = match self.table.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "table is
empty")
+ .with_context("service", Scheme::Libsql))
+ }
+ };
+ let key_field = match self.key_field.clone() {
+ Some(v) => v,
+ None => "key".to_string(),
+ };
+ let value_field = match self.value_field.clone() {
+ Some(v) => v,
+ None => "value".to_string(),
+ };
+ let root = normalize_root(
+ self.root
+ .clone()
+ .unwrap_or_else(|| "/".to_string())
+ .as_str(),
+ );
+
+ let client = HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::Libsql)
+ })?;
+
+ Ok(LibsqlBackend::new(Adapter {
+ client,
+ connection_string: conn,
+ auth_token: self.auth_token.clone(),
+ table,
+ key_field,
+ value_field,
+ })
+ .with_root(&root))
+ }
+}
+
+impl LibsqlBuilder {
+ fn get_connection_string(&self) -> Result<String> {
+ let connection_string = self
+ .connection_string
+ .clone()
+ .ok_or_else(|| Error::new(ErrorKind::ConfigInvalid,
"connection_string is empty"))?;
+
+ let ep_url = connection_string
+ .replace("libsql://", "https://")
+ .parse::<Uri>()
+ .map_err(|e| {
+ Error::new(ErrorKind::ConfigInvalid, "connection_string is
invalid")
+ .with_context("service", Scheme::Libsql)
+ .with_context("connection_string", connection_string)
+ .set_source(e)
+ })?;
+
+ match ep_url.scheme_str() {
+ None => Ok(format!("https://{ep_url}/")),
+ Some("http") | Some("https") => Ok(ep_url.to_string()),
+ Some(s) => Err(
+ Error::new(ErrorKind::ConfigInvalid, "invalid or unsupported
scheme")
+ .with_context("service", Scheme::Libsql)
+ .with_context("scheme", s),
+ ),
+ }
+ }
+}
+
+/// Backend for libsql service
+pub type LibsqlBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+ client: HttpClient,
+ connection_string: String,
+ auth_token: Option<String>,
+
+ table: String,
+ key_field: String,
+ value_field: String,
+}
+
+impl Debug for Adapter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("LibsqlAdapter");
+ ds.field("connection_string", &self.connection_string)
+ .field("table", &self.table)
+ .field("key_field", &self.key_field)
+ .field("value_field", &self.value_field);
+
+ if self.auth_token.is_some() {
+ ds.field("auth_token", &"<redacted>");
+ }
+
+ ds.finish()
+ }
+}
+
+impl Adapter {
+ async fn execute(&self, sql: String, args: Vec<Value>) ->
Result<ServerMsg> {
+ let url = format!("{}v2/pipeline", self.connection_string);
+
+ let mut req = Request::post(&url);
+
+ if let Some(auth_token) = self.auth_token.clone() {
+ req = req.header("Authorization", format!("Bearer {}",
auth_token));
+ }
+
+ let msg = ClientMsg {
+ baton: None,
+ requests: vec![StreamRequest::Execute(StreamExecuteReq {
+ stmt: Stmt {
+ sql,
+ args,
+ named_args: vec![],
+ want_rows: true,
+ },
+ })],
+ };
+ let body = serde_json::to_string(&msg).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "failed to serialize request")
+ .with_context("service", Scheme::Libsql)
+ .set_source(err)
+ })?;
+
+ let req = req
+ .body(AsyncBody::Bytes(Bytes::from(body)))
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp).await?);
+ }
+
+ let bs = resp.into_body().bytes().await?;
+
+ let resp: ServerMsg = serde_json::from_slice(&bs).map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "deserialize json from
response").set_source(e)
+ })?;
+
+ if resp.results.is_empty() {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unexpected empty response from server",
+ ));
+ }
+
+ if resp.results.len() > 1 {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unexpected multiple response from server",
+ ));
+ }
+
+ Ok(resp)
+ }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+ fn metadata(&self) -> kv::Metadata {
+ kv::Metadata::new(
+ Scheme::Libsql,
+ &self.table,
+ Capability {
+ read: true,
+ write: true,
+ create_dir: true,
+ delete: true,
+ ..Default::default()
+ },
+ )
+ }
+
+ async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ let query = format!(
+ "SELECT {} FROM {} WHERE `{}` = ? LIMIT 1",
+ self.value_field, self.table, self.key_field
+ );
+ let mut resp = self.execute(query, vec![Value::from(path)]).await?;
+
+ match resp.results.swap_remove(0) {
+ Response::Ok(StreamResponseOk {
+ response:
+ StreamResponse::Execute(StreamExecuteResult {
+ result: StmtResult { cols: _, rows, .. },
+ }),
+ }) => {
+ if rows.is_empty() || rows[0].is_empty() {
+ return Ok(None);
+ } else {
+ let val = &rows[0][0];
+ match val {
+ Value::Null => Ok(None),
+ Value::Blob { value } => Ok(Some(value.to_owned())),
+ _ => Err(Error::new(ErrorKind::Unexpected, "invalid
value type")),
+ }
+ }
+ }
+ Response::Ok(_) => Err(Error::new(
+ ErrorKind::Unexpected,
+ "Unexpected response from server",
+ )),
+ Response::Error(StreamResponseError {
+ error: PipelineError { message },
+ }) => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("get failed: {}", message).as_str(),
+ )),
+ }
+ }
+
+ async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+ let query = format!(
+ "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES (?, ?)",
+ self.table, self.key_field, self.value_field
+ );
+ let mut resp = self
+ .execute(query, vec![Value::from(path),
Value::from(value.to_vec())])
+ .await?;
+ match resp.results.swap_remove(0) {
+ Response::Ok(_) => Ok(()),
+ Response::Error(StreamResponseError {
+ error: PipelineError { message },
+ }) => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("set failed: {}", message).as_str(),
+ )),
+ }
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let query = format!("DELETE FROM {} WHERE `{}` = ?", self.table,
self.key_field);
+ let mut resp = self.execute(query, vec![Value::from(path)]).await?;
+ match resp.results.swap_remove(0) {
+ Response::Ok(_) => Ok(()),
+ Response::Error(StreamResponseError {
+ error: PipelineError { message },
+ }) => Err(Error::new(
+ ErrorKind::Unexpected,
+ format!("delete failed: {}", message).as_str(),
+ )),
+ }
+ }
+}
diff --git a/core/src/services/libsql/docs.md b/core/src/services/libsql/docs.md
new file mode 100644
index 000000000..00d0408c1
--- /dev/null
+++ b/core/src/services/libsql/docs.md
@@ -0,0 +1,50 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `connection_string`: Set the connection string for libsql server
+- `auth_token`: Set the authentication token for libsql server
+- `table`: Set the table of libsql
+- `key_field`: Set the key field of libsql
+- `value_field`: Set the value field of libsql
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Libsql;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut builder = Libsql::default();
+ builder.root("/");
+ builder.connection_string("https://example.com/db");
+ builder.auth_token("secret");
+ builder.table("your_table");
+ // key field type in the table should be compatible with Rust's &str like
text
+ builder.key_field("key");
+ // value field type in the table should be compatible with Rust's Vec<u8>
like bytea
+ builder.value_field("value");
+
+ let op = Operator::new(builder)?.finish();
+ Ok(())
+}
+```
diff --git a/core/src/services/libsql/error.rs
b/core/src/services/libsql/error.rs
new file mode 100644
index 000000000..a2a80a02e
--- /dev/null
+++ b/core/src/services/libsql/error.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use http::Response;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (kind, retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ (ErrorKind::ConditionNotMatch, false)
+ }
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let mut message = String::from_utf8_lossy(&bs).into_owned();
+
+ // If there is no body here, fill with http response code.
+ if message.is_empty() {
+ message = format!("Error response code: {}", parts.status);
+ }
+
+ let mut err = Error::new(kind, &message);
+
+ err = with_error_response_context(err, parts);
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
diff --git a/core/src/services/libsql/mod.rs b/core/src/services/libsql/mod.rs
new file mode 100644
index 000000000..1b0d623c4
--- /dev/null
+++ b/core/src/services/libsql/mod.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::LibsqlBuilder as Libsql;
+
+mod error;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 4e00c19fc..5293aaa44 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -84,6 +84,11 @@ mod ipmfs;
#[cfg(feature = "services-ipmfs")]
pub use ipmfs::Ipmfs;
+#[cfg(feature = "services-libsql")]
+mod libsql;
+#[cfg(feature = "services-libsql")]
+pub use libsql::Libsql;
+
#[cfg(feature = "services-memcached")]
mod memcached;
#[cfg(feature = "services-memcached")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 84480bf68..ec3298b7b 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -88,6 +88,8 @@ pub enum Scheme {
Redis,
/// [postgresql][crate::services::Postgresql]: Postgresql services
Postgresql,
+ /// [libsql][crate::services::Libsql]: Libsql services
+ Libsql,
/// [mysql][crate::services::Mysql]: Mysql services
Mysql,
/// [sqlite][crate::services::Sqlite]: Sqlite services
@@ -169,6 +171,7 @@ impl FromStr for Scheme {
"ftp" | "ftps" => Ok(Scheme::Ftp),
"ipfs" | "ipns" => Ok(Scheme::Ipfs),
"ipmfs" => Ok(Scheme::Ipmfs),
+ "libsql" => Ok(Scheme::Libsql),
"memcached" => Ok(Scheme::Memcached),
"memory" => Ok(Scheme::Memory),
"mysql" => Ok(Scheme::Mysql),
@@ -216,6 +219,7 @@ impl From<Scheme> for &'static str {
Scheme::Ftp => "ftp",
Scheme::Ipfs => "ipfs",
Scheme::Ipmfs => "ipmfs",
+ Scheme::Libsql => "libsql",
Scheme::Memcached => "memcached",
Scheme::Memory => "memory",
Scheme::MiniMoka => "mini_moka",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index f6a5e2df4..12f24e25f 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -132,6 +132,8 @@ fn main() -> anyhow::Result<()> {
tests.extend(behavior_test::<services::Ipfs>());
#[cfg(feature = "services-ipmfs")]
tests.extend(behavior_test::<services::Ipmfs>());
+ #[cfg(feature = "services-libsql")]
+ tests.extend(behavior_test::<services::Libsql>());
#[cfg(feature = "services-memcached")]
tests.extend(behavior_test::<services::Memcached>());
#[cfg(feature = "services-memory")]
diff --git a/fixtures/libsql/docker-compose-auth.yml
b/fixtures/libsql/docker-compose-auth.yml
new file mode 100644
index 000000000..d14214a0a
--- /dev/null
+++ b/fixtures/libsql/docker-compose-auth.yml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+ libsql:
+ image: ghcr.io/libsql/sqld:v0.21.9
+ ports:
+ - '8080:8080'
+ environment:
+ - 'SQLD_MAX_RESPONSE_SIZE=20971520'
+ - 'SQLD_MAX_TOTAL_RESPONSE_SIZE=209715200'
+ - 'SQLD_AUTH_JWT_KEY=zaMv-aFGmB7PXkjM4IrMdF6B5zCYEiEGXW3RgMjNAtc'
diff --git a/fixtures/libsql/docker-compose.yml
b/fixtures/libsql/docker-compose.yml
new file mode 100644
index 000000000..77304afcc
--- /dev/null
+++ b/fixtures/libsql/docker-compose.yml
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+version: '3.8'
+
+services:
+ libsql:
+ image: ghcr.io/libsql/sqld:v0.21.9
+ ports:
+ - '8080:8080'
+ environment:
+ - 'SQLD_MAX_RESPONSE_SIZE=20971520'
+ - 'SQLD_MAX_TOTAL_RESPONSE_SIZE=209715200'