This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-sqlite in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b4f5b9362a02cc30adb01dadb66f0a391e562dc3 Author: Xuanwo <[email protected]> AuthorDate: Mon Oct 9 23:24:59 2023 +0800 refactor(services/sqlite): Polish sqlite via adding connection pool Signed-off-by: Xuanwo <[email protected]> --- .env.example | 6 +++ .typos.toml | 2 +- Cargo.lock | 12 +++++ bindings/cpp/Cargo.toml | 6 +-- bindings/dotnet/Cargo.toml | 1 - bindings/haskell/Cargo.toml | 2 +- bindings/java/Cargo.toml | 32 +++++------ core/Cargo.toml | 11 ++-- core/src/raw/mod.rs | 3 ++ core/src/raw/tokio_util.rs | 23 ++++++++ core/src/services/sqlite/backend.rs | 102 +++++++++++++++++++++--------------- core/tests/behavior/main.rs | 2 +- integrations/dav-server/Cargo.toml | 5 +- 13 files changed, 134 insertions(+), 73 deletions(-) diff --git a/.env.example b/.env.example index 2a5b03448..08b3c255e 100644 --- a/.env.example +++ b/.env.example @@ -164,3 +164,9 @@ OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token> OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token> OPENDAL_GDRIVE_CLIENT_ID=<client_id> OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret> +# sqlite +OPENDAL_SQLITE_TEST=on +OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db +OPENDAL_SQLITE_TABLE=data +OPENDAL_SQLITE_KEY_FIELD=key +OPENDAL_SQLITE_VALUE_FIELD=data diff --git a/.typos.toml b/.typos.toml index 5b2bca99b..df205aecc 100644 --- a/.typos.toml +++ b/.typos.toml @@ -18,8 +18,8 @@ [default.extend-words] # Random strings. "Dum" = "Dum" -"ba" = "ba" "Hel" = "Hel" +"ba" = "ba" "hellow" = "hellow" # Showed up in examples. "thw" = "thw" diff --git a/Cargo.lock b/Cargo.lock index b936172a3..601e740cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4047,6 +4047,7 @@ dependencies = [ "prometheus-client", "prost", "quick-xml 0.30.0", + "r2d2", "rand 0.8.5", "redb", "redis", @@ -5252,6 +5253,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "radium" version = "0.7.0" diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml index 058de3a20..40e90680e 100644 --- a/bindings/cpp/Cargo.toml +++ b/bindings/cpp/Cargo.toml @@ -24,17 +24,17 @@ edition.workspace = true homepage.workspace = true license.workspace = true repository.workspace = true -version.workspace = true rust-version.workspace = true +version.workspace = true [lib] crate-type = ["staticlib"] [dependencies] -opendal.workspace = true -cxx = "1.0" anyhow = "1.0" chrono = "0.4" +cxx = "1.0" +opendal.workspace = true [build-dependencies] cxx-build = "1.0" diff --git a/bindings/dotnet/Cargo.toml b/bindings/dotnet/Cargo.toml index 11a6a3c5a..e6a320bc4 100644 --- a/bindings/dotnet/Cargo.toml +++ b/bindings/dotnet/Cargo.toml @@ -27,7 +27,6 @@ license.workspace = true repository.workspace = true rust-version.workspace = true - [lib] crate-type = ["cdylib"] doc = false diff --git a/bindings/haskell/Cargo.toml b/bindings/haskell/Cargo.toml index 637ee3d8e..5c1021f07 100644 --- a/bindings/haskell/Cargo.toml +++ b/bindings/haskell/Cargo.toml @@ -24,8 +24,8 @@ edition.workspace = true homepage.workspace = true license.workspace = true repository.workspace = true -version.workspace = true rust-version.workspace = true +version.workspace = true [lib] crate-type = ["cdylib"] diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index 09364f268..66deeeb17 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -82,20 +82,20 @@ services-all = [ ] # Default services provided by opendal. -services-azblob = [ "opendal/services-azblob" ] -services-azdls = [ "opendal/services-azdls" ] -services-cos = [ "opendal/services-cos" ] -services-fs = [ "opendal/services-fs" ] -services-gcs = [ "opendal/services-gcs" ] -services-ghac = [ "opendal/services-ghac" ] -services-http = [ "opendal/services-http" ] -services-ipmfs = [ "opendal/services-ipmfs" ] -services-memory = [ "opendal/services-memory" ] -services-obs = [ "opendal/services-obs" ] -services-oss = [ "opendal/services-oss" ] -services-s3 = [ "opendal/services-s3" ] -services-webdav = [ "opendal/services-webdav" ] -services-webhdfs = [ "opendal/services-webhdfs" ] +services-azblob = ["opendal/services-azblob"] +services-azdls = ["opendal/services-azdls"] +services-cos = ["opendal/services-cos"] +services-fs = ["opendal/services-fs"] +services-gcs = ["opendal/services-gcs"] +services-ghac = ["opendal/services-ghac"] +services-http = ["opendal/services-http"] +services-ipmfs = ["opendal/services-ipmfs"] +services-memory = ["opendal/services-memory"] +services-obs = ["opendal/services-obs"] +services-oss = ["opendal/services-oss"] +services-s3 = ["opendal/services-s3"] +services-webdav = ["opendal/services-webdav"] +services-webhdfs = ["opendal/services-webhdfs"] # Optional services provided by opendal. services-cacache = ["opendal/services-cacache"] @@ -129,14 +129,14 @@ anyhow = "1.0.71" jni = "0.21.1" num_cpus = "1.15.0" once_cell = "1.17.1" -tokio = { version = "1.28.1", features = ["full"] } opendal = { workspace = true } +tokio = { version = "1.28.1", features = ["full"] } # This is not optimal. See also the Cargo issue: # https://github.com/rust-lang/cargo/issues/1197#issuecomment-1641086954 [target.'cfg(unix)'.dependencies.opendal] -workspace = true features = [ # Depend on "openssh" which depends on "tokio-pipe" that is unavailable on Windows. "services-sftp", ] +workspace = true diff --git a/core/Cargo.toml b/core/Cargo.toml index f1c568c51..5b1e7b5b5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -141,6 +141,7 @@ services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] +services-mysql = ["dep:mysql_async"] services-obs = [ "dep:reqsign", "reqsign?/services-huaweicloud", @@ -166,6 +167,7 @@ services-s3 = [ ] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"] services-sled = ["dep:sled"] +services-sqlite = ["dep:rusqlite", "dep:r2d2"] services-supabase = [] services-tikv = ["tikv-client"] services-vercel-artifacts = [] @@ -176,8 +178,6 @@ services-wasabi = [ ] services-webdav = [] services-webhdfs = [] -services-mysql = ["dep:mysql_async"] -services-sqlite = ["dep:rusqlite"] [lib] bench = false @@ -206,6 +206,7 @@ await-tree = { version = "0.1.1", optional = true } backon = "0.4.1" base64 = "0.21" bb8 = { version = "0.8", optional = true } +bb8-postgres = { version = "0.8.1", optional = true } bytes = "1.4" cacache = { version = "11.6", default-features = false, features = [ "tokio-runtime", @@ -235,6 +236,7 @@ metrics = { version = "0.20", optional = true } mini-moka = { version = "0.10", optional = true } minitrace = { version = "0.5", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } +mysql_async = { version = "0.32.2", optional = true } once_cell = "1" openssh = { version = "0.9.9", optional = true } openssh-sftp-client = { version = "0.13.9", optional = true, features = [ @@ -250,6 +252,7 @@ prometheus = { version = "0.13", features = ["process"], optional = true } prometheus-client = { version = "0.21.2", optional = true } prost = { version = "0.11", optional = true } quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] } +r2d2 = { version = "0.8", optional = true } rand = { version = "0.8", optional = true } redb = { version = "1.1.0", optional = true } redis = { version = "0.23.1", features = [ @@ -262,6 +265,7 @@ reqwest = { version = "0.11.18", features = [ "stream", ], default-features = false } rocksdb = { version = "0.21.0", default-features = false, optional = true } +rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = { version = "0.10", optional = true } @@ -275,9 +279,6 @@ tokio = "1.27" tokio-postgres = { version = "0.7.8", optional = true } tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } -mysql_async = { version = "0.32.2", optional = true } -bb8-postgres = { version = "0.8.1", optional = true } -rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } [dev-dependencies] criterion = { version = "0.4", features = ["async", "async_tokio"] } diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 34638b9ff..313c5d04f 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -56,6 +56,9 @@ pub use serde_util::*; mod chrono_util; pub use chrono_util::*; +mod tokio_util; +pub use tokio_util::*; + // Expose as a pub mod to avoid confusing. pub mod adapters; pub mod oio; diff --git a/core/src/raw/tokio_util.rs b/core/src/raw/tokio_util.rs new file mode 100644 index 000000000..e68be2cf2 --- /dev/null +++ b/core/src/raw/tokio_util.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; + +/// Parse tokio error into opendal::Error. +pub fn new_task_join_error(e: tokio::task::JoinError) -> Error { + Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) +} diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 9aac1c44d..24dbc1674 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -155,8 +155,13 @@ impl Builder for SqliteBuilder { .unwrap_or_else(|| "/".to_string()) .as_str(), ); + let mgr = SqliteConnectionManager { connection_string }; + let pool = r2d2::Pool::new(mgr).map_err(|err| { + Error::new(ErrorKind::Unexpected, "sqlite pool init failed").set_source(err) + })?; + Ok(SqliteBackend::new(Adapter { - connection_string, + pool, table, key_field, value_field, @@ -165,11 +170,33 @@ impl Builder for SqliteBuilder { } } +struct SqliteConnectionManager { + connection_string: String, +} + +impl r2d2::ManageConnection for SqliteConnectionManager { + type Connection = Connection; + type Error = Error; + + fn connect(&self) -> Result<Connection> { + Connection::open(&self.connection_string) + .map_err(|err| Error::new(ErrorKind::Unexpected, "sqlite open error").set_source(err)) + } + + fn is_valid(&self, conn: &mut Connection) -> Result<()> { + conn.execute_batch("").map_err(parse_rusqlite_error) + } + + fn has_broken(&self, _: &mut Connection) -> bool { + false + } +} + pub type SqliteBackend = kv::Backend<Adapter>; #[derive(Clone)] pub struct Adapter { - connection_string: String, + pool: r2d2::Pool<SqliteConnectionManager>, table: String, key_field: String, @@ -179,7 +206,6 @@ pub struct Adapter { impl Debug for Adapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut ds = f.debug_struct("SqliteAdapter"); - ds.field("connection_string", &self.connection_string); ds.field("table", &self.table); ds.field("key_field", &self.key_field); ds.field("value_field", &self.value_field); @@ -198,93 +224,85 @@ impl kv::Adapter for Adapter { write: true, create_dir: true, delete: true, + blocking: true, ..Default::default() }, ) } async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> { - let cloned_path = path.to_string(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str())) + task::spawn_blocking(move || this.blocking_get(&path)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> { + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!( "SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1", self.value_field, self.table, self.key_field ); - let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?; - let mut statement = conn.prepare(&query).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; let result = statement.query_row([path], |row| row.get(0)); match result { Ok(v) => Ok(Some(v)), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(err) => Err(Error::from(err)), + Err(err) => Err(parse_rusqlite_error(err)), } } async fn set(&self, path: &str, value: &[u8]) -> Result<()> { - let cloned_path = path.to_string(); - let cloned_value = value.to_vec(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); + // FIXME: can we avoid this copy? + let value = value.to_vec(); - task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value)) + task::spawn_blocking(move || this.blocking_set(&path, &value)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!( "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)", self.table, self.key_field, self.value_field ); - let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?; - let mut statement = conn.prepare(&query).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; statement .execute(params![path, value]) - .map_err(Error::from)?; + .map_err(parse_rusqlite_error)?; Ok(()) } async fn delete(&self, path: &str) -> Result<()> { - let cloned_path = path.to_string(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str())) + task::spawn_blocking(move || this.blocking_delete(&path)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_delete(&self, path: &str) -> Result<()> { - let conn = Connection::open(self.connection_string.clone()).map_err(|err| { - Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err) - })?; + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field); - let mut statement = conn.prepare(&query).map_err(Error::from)?; - statement.execute([path]).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; + statement.execute([path]).map_err(parse_rusqlite_error)?; Ok(()) } } -impl From<rusqlite::Error> for Error { - fn from(value: rusqlite::Error) -> Error { - Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(value) - } +fn parse_rusqlite_error(err: rusqlite::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(err) } -impl From<task::JoinError> for Error { - fn from(value: task::JoinError) -> Error { - Error::new( - ErrorKind::Unexpected, - "unhandled error from sqlite when spawning task", - ) - .set_source(value) - } +fn parse_r2d2_error(err: r2d2::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from r2d2").set_source(err) } diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index 695eefe2c..f6a5e2df4 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::<services::Atomicserver>()); #[cfg(feature = "services-azblob")] tests.extend(behavior_test::<services::Azblob>()); - #[cfg(feature = "services-Azdls")] + #[cfg(feature = "services-azdls")] tests.extend(behavior_test::<services::Azdls>()); #[cfg(feature = "services-cacache")] tests.extend(behavior_test::<services::Cacache>()); diff --git a/integrations/dav-server/Cargo.toml b/integrations/dav-server/Cargo.toml index aa82c51a1..5bfdb0fe1 100644 --- a/integrations/dav-server/Cargo.toml +++ b/integrations/dav-server/Cargo.toml @@ -29,10 +29,10 @@ version.workspace = true [dependencies] anyhow = "1" -chrono = "0.4.28" -dirs = "5.0.0" bytes = { version = "1.4.0" } +chrono = "0.4.28" dav-server = { version = "0.5.5" } +dirs = "5.0.0" futures = "0.3" futures-util = { version = "0.3.16" } opendal.workspace = true @@ -44,4 +44,3 @@ tokio = { version = "1.27", features = [ "rt-multi-thread", "io-std", ] } -
