This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 593a78909 refactor(services/sqlite): Polish sqlite via adding
connection pool (#3249)
593a78909 is described below
commit 593a78909fbef297710811ca2cbafb0e3e95ded4
Author: Xuanwo <[email protected]>
AuthorDate: Tue Oct 10 00:21:48 2023 +0800
refactor(services/sqlite): Polish sqlite via adding connection pool (#3249)
---
.env.example | 6 +++
Cargo.lock | 12 +++++
core/Cargo.toml | 11 ++--
core/src/raw/mod.rs | 3 ++
core/src/raw/tokio_util.rs | 23 ++++++++
core/src/services/sqlite/backend.rs | 102 +++++++++++++++++++++---------------
core/tests/behavior/main.rs | 2 +-
7 files changed, 111 insertions(+), 48 deletions(-)
diff --git a/.env.example b/.env.example
index 2a5b03448..08b3c255e 100644
--- a/.env.example
+++ b/.env.example
@@ -164,3 +164,9 @@ OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
OPENDAL_GDRIVE_CLIENT_ID=<client_id>
OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
+# sqlite
+OPENDAL_SQLITE_TEST=on
+OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
+OPENDAL_SQLITE_TABLE=data
+OPENDAL_SQLITE_KEY_FIELD=key
+OPENDAL_SQLITE_VALUE_FIELD=data
diff --git a/Cargo.lock b/Cargo.lock
index b936172a3..601e740cd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4047,6 +4047,7 @@ dependencies = [
"prometheus-client",
"prost",
"quick-xml 0.30.0",
+ "r2d2",
"rand 0.8.5",
"redb",
"redis",
@@ -5252,6 +5253,17 @@ dependencies = [
"proc-macro2",
]
+[[package]]
+name = "r2d2"
+version = "0.8.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
+dependencies = [
+ "log",
+ "parking_lot 0.12.1",
+ "scheduled-thread-pool",
+]
+
[[package]]
name = "radium"
version = "0.7.0"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f1c568c51..5b1e7b5b5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -141,6 +141,7 @@ services-memcached = ["dep:bb8"]
services-memory = []
services-mini-moka = ["dep:mini-moka"]
services-moka = ["dep:moka"]
+services-mysql = ["dep:mysql_async"]
services-obs = [
"dep:reqsign",
"reqsign?/services-huaweicloud",
@@ -166,6 +167,7 @@ services-s3 = [
]
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"]
services-sled = ["dep:sled"]
+services-sqlite = ["dep:rusqlite", "dep:r2d2"]
services-supabase = []
services-tikv = ["tikv-client"]
services-vercel-artifacts = []
@@ -176,8 +178,6 @@ services-wasabi = [
]
services-webdav = []
services-webhdfs = []
-services-mysql = ["dep:mysql_async"]
-services-sqlite = ["dep:rusqlite"]
[lib]
bench = false
@@ -206,6 +206,7 @@ await-tree = { version = "0.1.1", optional = true }
backon = "0.4.1"
base64 = "0.21"
bb8 = { version = "0.8", optional = true }
+bb8-postgres = { version = "0.8.1", optional = true }
bytes = "1.4"
cacache = { version = "11.6", default-features = false, features = [
"tokio-runtime",
@@ -235,6 +236,7 @@ metrics = { version = "0.20", optional = true }
mini-moka = { version = "0.10", optional = true }
minitrace = { version = "0.5", optional = true }
moka = { version = "0.10", optional = true, features = ["future"] }
+mysql_async = { version = "0.32.2", optional = true }
once_cell = "1"
openssh = { version = "0.9.9", optional = true }
openssh-sftp-client = { version = "0.13.9", optional = true, features = [
@@ -250,6 +252,7 @@ prometheus = { version = "0.13", features = ["process"],
optional = true }
prometheus-client = { version = "0.21.2", optional = true }
prost = { version = "0.11", optional = true }
quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] }
+r2d2 = { version = "0.8", optional = true }
rand = { version = "0.8", optional = true }
redb = { version = "1.1.0", optional = true }
redis = { version = "0.23.1", features = [
@@ -262,6 +265,7 @@ reqwest = { version = "0.11.18", features = [
"stream",
], default-features = false }
rocksdb = { version = "0.21.0", default-features = false, optional = true }
+rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = { version = "0.10", optional = true }
@@ -275,9 +279,6 @@ tokio = "1.27"
tokio-postgres = { version = "0.7.8", optional = true }
tracing = { version = "0.1", optional = true }
uuid = { version = "1", features = ["serde", "v4"] }
-mysql_async = { version = "0.32.2", optional = true }
-bb8-postgres = { version = "0.8.1", optional = true }
-rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
[dev-dependencies]
criterion = { version = "0.4", features = ["async", "async_tokio"] }
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index 34638b9ff..313c5d04f 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -56,6 +56,9 @@ pub use serde_util::*;
mod chrono_util;
pub use chrono_util::*;
+mod tokio_util;
+pub use tokio_util::*;
+
// Expose as a pub mod to avoid confusing.
pub mod adapters;
pub mod oio;
diff --git a/core/src/raw/tokio_util.rs b/core/src/raw/tokio_util.rs
new file mode 100644
index 000000000..e68be2cf2
--- /dev/null
+++ b/core/src/raw/tokio_util.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+
+/// Parse tokio error into opendal::Error.
+pub fn new_task_join_error(e: tokio::task::JoinError) -> Error {
+ Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
+}
diff --git a/core/src/services/sqlite/backend.rs
b/core/src/services/sqlite/backend.rs
index 9aac1c44d..24dbc1674 100644
--- a/core/src/services/sqlite/backend.rs
+++ b/core/src/services/sqlite/backend.rs
@@ -155,8 +155,13 @@ impl Builder for SqliteBuilder {
.unwrap_or_else(|| "/".to_string())
.as_str(),
);
+ let mgr = SqliteConnectionManager { connection_string };
+ let pool = r2d2::Pool::new(mgr).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "sqlite pool init
failed").set_source(err)
+ })?;
+
Ok(SqliteBackend::new(Adapter {
- connection_string,
+ pool,
table,
key_field,
value_field,
@@ -165,11 +170,33 @@ impl Builder for SqliteBuilder {
}
}
+struct SqliteConnectionManager {
+ connection_string: String,
+}
+
+impl r2d2::ManageConnection for SqliteConnectionManager {
+ type Connection = Connection;
+ type Error = Error;
+
+ fn connect(&self) -> Result<Connection> {
+ Connection::open(&self.connection_string)
+ .map_err(|err| Error::new(ErrorKind::Unexpected, "sqlite open
error").set_source(err))
+ }
+
+ fn is_valid(&self, conn: &mut Connection) -> Result<()> {
+ conn.execute_batch("").map_err(parse_rusqlite_error)
+ }
+
+ fn has_broken(&self, _: &mut Connection) -> bool {
+ false
+ }
+}
+
pub type SqliteBackend = kv::Backend<Adapter>;
#[derive(Clone)]
pub struct Adapter {
- connection_string: String,
+ pool: r2d2::Pool<SqliteConnectionManager>,
table: String,
key_field: String,
@@ -179,7 +206,6 @@ pub struct Adapter {
impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("SqliteAdapter");
- ds.field("connection_string", &self.connection_string);
ds.field("table", &self.table);
ds.field("key_field", &self.key_field);
ds.field("value_field", &self.value_field);
@@ -198,93 +224,85 @@ impl kv::Adapter for Adapter {
write: true,
create_dir: true,
delete: true,
+ blocking: true,
..Default::default()
},
)
}
async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
- let cloned_path = path.to_string();
- let cloned_self = self.clone();
+ let this = self.clone();
+ let path = path.to_string();
- task::spawn_blocking(move ||
cloned_self.blocking_get(cloned_path.as_str()))
+ task::spawn_blocking(move || this.blocking_get(&path))
.await
- .map_err(Error::from)
- .and_then(|inner_result| inner_result)
+ .map_err(new_task_join_error)?
}
fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
let query = format!(
"SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
- let conn =
Connection::open(self.connection_string.clone()).map_err(Error::from)?;
- let mut statement = conn.prepare(&query).map_err(Error::from)?;
+ let mut statement =
conn.prepare(&query).map_err(parse_rusqlite_error)?;
let result = statement.query_row([path], |row| row.get(0));
match result {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
- Err(err) => Err(Error::from(err)),
+ Err(err) => Err(parse_rusqlite_error(err)),
}
}
async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
- let cloned_path = path.to_string();
- let cloned_value = value.to_vec();
- let cloned_self = self.clone();
+ let this = self.clone();
+ let path = path.to_string();
+ // FIXME: can we avoid this copy?
+ let value = value.to_vec();
- task::spawn_blocking(move ||
cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
+ task::spawn_blocking(move || this.blocking_set(&path, &value))
.await
- .map_err(Error::from)
- .and_then(|inner_result| inner_result)
+ .map_err(new_task_join_error)?
}
fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
+ let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
let query = format!(
"INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
self.table, self.key_field, self.value_field
);
- let conn =
Connection::open(self.connection_string.clone()).map_err(Error::from)?;
- let mut statement = conn.prepare(&query).map_err(Error::from)?;
+ let mut statement =
conn.prepare(&query).map_err(parse_rusqlite_error)?;
statement
.execute(params![path, value])
- .map_err(Error::from)?;
+ .map_err(parse_rusqlite_error)?;
Ok(())
}
async fn delete(&self, path: &str) -> Result<()> {
- let cloned_path = path.to_string();
- let cloned_self = self.clone();
+ let this = self.clone();
+ let path = path.to_string();
- task::spawn_blocking(move ||
cloned_self.blocking_delete(cloned_path.as_str()))
+ task::spawn_blocking(move || this.blocking_delete(&path))
.await
- .map_err(Error::from)
- .and_then(|inner_result| inner_result)
+ .map_err(new_task_join_error)?
}
fn blocking_delete(&self, path: &str) -> Result<()> {
- let conn =
Connection::open(self.connection_string.clone()).map_err(|err| {
- Error::new(ErrorKind::Unexpected, "Sqlite open
error").set_source(err)
- })?;
+ let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table,
self.key_field);
- let mut statement = conn.prepare(&query).map_err(Error::from)?;
- statement.execute([path]).map_err(Error::from)?;
+ let mut statement =
conn.prepare(&query).map_err(parse_rusqlite_error)?;
+ statement.execute([path]).map_err(parse_rusqlite_error)?;
Ok(())
}
}
-impl From<rusqlite::Error> for Error {
- fn from(value: rusqlite::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "unhandled error from
sqlite").set_source(value)
- }
+fn parse_rusqlite_error(err: rusqlite::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
sqlite").set_source(err)
}
-impl From<task::JoinError> for Error {
- fn from(value: task::JoinError) -> Error {
- Error::new(
- ErrorKind::Unexpected,
- "unhandled error from sqlite when spawning task",
- )
- .set_source(value)
- }
+fn parse_r2d2_error(err: r2d2::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
r2d2").set_source(err)
}
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index 695eefe2c..f6a5e2df4 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
tests.extend(behavior_test::<services::Atomicserver>());
#[cfg(feature = "services-azblob")]
tests.extend(behavior_test::<services::Azblob>());
- #[cfg(feature = "services-Azdls")]
+ #[cfg(feature = "services-azdls")]
tests.extend(behavior_test::<services::Azdls>());
#[cfg(feature = "services-cacache")]
tests.extend(behavior_test::<services::Cacache>());