This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-sqlite
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b4f5b9362a02cc30adb01dadb66f0a391e562dc3
Author: Xuanwo <[email protected]>
AuthorDate: Mon Oct 9 23:24:59 2023 +0800

    refactor(services/sqlite): Polish sqlite via adding connection pool
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .env.example                        |   6 +++
 .typos.toml                         |   2 +-
 Cargo.lock                          |  12 +++++
 bindings/cpp/Cargo.toml             |   6 +--
 bindings/dotnet/Cargo.toml          |   1 -
 bindings/haskell/Cargo.toml         |   2 +-
 bindings/java/Cargo.toml            |  32 +++++------
 core/Cargo.toml                     |  11 ++--
 core/src/raw/mod.rs                 |   3 ++
 core/src/raw/tokio_util.rs          |  23 ++++++++
 core/src/services/sqlite/backend.rs | 102 +++++++++++++++++++++---------------
 core/tests/behavior/main.rs         |   2 +-
 integrations/dav-server/Cargo.toml  |   5 +-
 13 files changed, 134 insertions(+), 73 deletions(-)

diff --git a/.env.example b/.env.example
index 2a5b03448..08b3c255e 100644
--- a/.env.example
+++ b/.env.example
@@ -164,3 +164,9 @@ OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
 OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
 OPENDAL_GDRIVE_CLIENT_ID=<client_id>
 OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
+# sqlite
+OPENDAL_SQLITE_TEST=on
+OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db
+OPENDAL_SQLITE_TABLE=data
+OPENDAL_SQLITE_KEY_FIELD=key
+OPENDAL_SQLITE_VALUE_FIELD=data
diff --git a/.typos.toml b/.typos.toml
index 5b2bca99b..df205aecc 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -18,8 +18,8 @@
 [default.extend-words]
 # Random strings.
 "Dum" = "Dum"
-"ba" = "ba"
 "Hel" = "Hel"
+"ba" = "ba"
 "hellow" = "hellow"
 # Showed up in examples.
 "thw" = "thw"
diff --git a/Cargo.lock b/Cargo.lock
index b936172a3..601e740cd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4047,6 +4047,7 @@ dependencies = [
  "prometheus-client",
  "prost",
  "quick-xml 0.30.0",
+ "r2d2",
  "rand 0.8.5",
  "redb",
  "redis",
@@ -5252,6 +5253,17 @@ dependencies = [
  "proc-macro2",
 ]
 
+[[package]]
+name = "r2d2"
+version = "0.8.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
+dependencies = [
+ "log",
+ "parking_lot 0.12.1",
+ "scheduled-thread-pool",
+]
+
 [[package]]
 name = "radium"
 version = "0.7.0"
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 058de3a20..40e90680e 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -24,17 +24,17 @@ edition.workspace = true
 homepage.workspace = true
 license.workspace = true
 repository.workspace = true
-version.workspace = true
 rust-version.workspace = true
+version.workspace = true
 
 [lib]
 crate-type = ["staticlib"]
 
 [dependencies]
-opendal.workspace = true
-cxx = "1.0"
 anyhow = "1.0"
 chrono = "0.4"
+cxx = "1.0"
+opendal.workspace = true
 
 [build-dependencies]
 cxx-build = "1.0"
diff --git a/bindings/dotnet/Cargo.toml b/bindings/dotnet/Cargo.toml
index 11a6a3c5a..e6a320bc4 100644
--- a/bindings/dotnet/Cargo.toml
+++ b/bindings/dotnet/Cargo.toml
@@ -27,7 +27,6 @@ license.workspace = true
 repository.workspace = true
 rust-version.workspace = true
 
-
 [lib]
 crate-type = ["cdylib"]
 doc = false
diff --git a/bindings/haskell/Cargo.toml b/bindings/haskell/Cargo.toml
index 637ee3d8e..5c1021f07 100644
--- a/bindings/haskell/Cargo.toml
+++ b/bindings/haskell/Cargo.toml
@@ -24,8 +24,8 @@ edition.workspace = true
 homepage.workspace = true
 license.workspace = true
 repository.workspace = true
-version.workspace = true
 rust-version.workspace = true
+version.workspace = true
 
 [lib]
 crate-type = ["cdylib"]
diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml
index 09364f268..66deeeb17 100644
--- a/bindings/java/Cargo.toml
+++ b/bindings/java/Cargo.toml
@@ -82,20 +82,20 @@ services-all = [
 ]
 
 # Default services provided by opendal.
-services-azblob = [ "opendal/services-azblob" ]
-services-azdls = [ "opendal/services-azdls" ]
-services-cos = [ "opendal/services-cos" ]
-services-fs = [ "opendal/services-fs" ]
-services-gcs = [ "opendal/services-gcs" ]
-services-ghac = [ "opendal/services-ghac" ]
-services-http = [ "opendal/services-http" ]
-services-ipmfs = [ "opendal/services-ipmfs" ]
-services-memory = [ "opendal/services-memory" ]
-services-obs = [ "opendal/services-obs" ]
-services-oss = [ "opendal/services-oss" ]
-services-s3 = [ "opendal/services-s3" ]
-services-webdav = [ "opendal/services-webdav" ]
-services-webhdfs = [ "opendal/services-webhdfs" ]
+services-azblob = ["opendal/services-azblob"]
+services-azdls = ["opendal/services-azdls"]
+services-cos = ["opendal/services-cos"]
+services-fs = ["opendal/services-fs"]
+services-gcs = ["opendal/services-gcs"]
+services-ghac = ["opendal/services-ghac"]
+services-http = ["opendal/services-http"]
+services-ipmfs = ["opendal/services-ipmfs"]
+services-memory = ["opendal/services-memory"]
+services-obs = ["opendal/services-obs"]
+services-oss = ["opendal/services-oss"]
+services-s3 = ["opendal/services-s3"]
+services-webdav = ["opendal/services-webdav"]
+services-webhdfs = ["opendal/services-webhdfs"]
 
 # Optional services provided by opendal.
 services-cacache = ["opendal/services-cacache"]
@@ -129,14 +129,14 @@ anyhow = "1.0.71"
 jni = "0.21.1"
 num_cpus = "1.15.0"
 once_cell = "1.17.1"
-tokio = { version = "1.28.1", features = ["full"] }
 opendal = { workspace = true }
+tokio = { version = "1.28.1", features = ["full"] }
 
 # This is not optimal. See also the Cargo issue:
 # https://github.com/rust-lang/cargo/issues/1197#issuecomment-1641086954
 [target.'cfg(unix)'.dependencies.opendal]
-workspace = true
 features = [
   # Depend on "openssh" which depends on "tokio-pipe" that is unavailable on 
Windows.
   "services-sftp",
 ]
+workspace = true
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f1c568c51..5b1e7b5b5 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -141,6 +141,7 @@ services-memcached = ["dep:bb8"]
 services-memory = []
 services-mini-moka = ["dep:mini-moka"]
 services-moka = ["dep:moka"]
+services-mysql = ["dep:mysql_async"]
 services-obs = [
   "dep:reqsign",
   "reqsign?/services-huaweicloud",
@@ -166,6 +167,7 @@ services-s3 = [
 ]
 services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"]
 services-sled = ["dep:sled"]
+services-sqlite = ["dep:rusqlite", "dep:r2d2"]
 services-supabase = []
 services-tikv = ["tikv-client"]
 services-vercel-artifacts = []
@@ -176,8 +178,6 @@ services-wasabi = [
 ]
 services-webdav = []
 services-webhdfs = []
-services-mysql = ["dep:mysql_async"]
-services-sqlite = ["dep:rusqlite"]
 
 [lib]
 bench = false
@@ -206,6 +206,7 @@ await-tree = { version = "0.1.1", optional = true }
 backon = "0.4.1"
 base64 = "0.21"
 bb8 = { version = "0.8", optional = true }
+bb8-postgres = { version = "0.8.1", optional = true }
 bytes = "1.4"
 cacache = { version = "11.6", default-features = false, features = [
   "tokio-runtime",
@@ -235,6 +236,7 @@ metrics = { version = "0.20", optional = true }
 mini-moka = { version = "0.10", optional = true }
 minitrace = { version = "0.5", optional = true }
 moka = { version = "0.10", optional = true, features = ["future"] }
+mysql_async = { version = "0.32.2", optional = true }
 once_cell = "1"
 openssh = { version = "0.9.9", optional = true }
 openssh-sftp-client = { version = "0.13.9", optional = true, features = [
@@ -250,6 +252,7 @@ prometheus = { version = "0.13", features = ["process"], 
optional = true }
 prometheus-client = { version = "0.21.2", optional = true }
 prost = { version = "0.11", optional = true }
 quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] }
+r2d2 = { version = "0.8", optional = true }
 rand = { version = "0.8", optional = true }
 redb = { version = "1.1.0", optional = true }
 redis = { version = "0.23.1", features = [
@@ -262,6 +265,7 @@ reqwest = { version = "0.11.18", features = [
   "stream",
 ], default-features = false }
 rocksdb = { version = "0.21.0", default-features = false, optional = true }
+rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
 sha2 = { version = "0.10", optional = true }
@@ -275,9 +279,6 @@ tokio = "1.27"
 tokio-postgres = { version = "0.7.8", optional = true }
 tracing = { version = "0.1", optional = true }
 uuid = { version = "1", features = ["serde", "v4"] }
-mysql_async = { version = "0.32.2", optional = true }
-bb8-postgres = { version = "0.8.1", optional = true }
-rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
 
 [dev-dependencies]
 criterion = { version = "0.4", features = ["async", "async_tokio"] }
diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs
index 34638b9ff..313c5d04f 100644
--- a/core/src/raw/mod.rs
+++ b/core/src/raw/mod.rs
@@ -56,6 +56,9 @@ pub use serde_util::*;
 mod chrono_util;
 pub use chrono_util::*;
 
+mod tokio_util;
+pub use tokio_util::*;
+
 // Expose as a pub mod to avoid confusing.
 pub mod adapters;
 pub mod oio;
diff --git a/core/src/raw/tokio_util.rs b/core/src/raw/tokio_util.rs
new file mode 100644
index 000000000..e68be2cf2
--- /dev/null
+++ b/core/src/raw/tokio_util.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+
+/// Parse tokio error into opendal::Error.
+pub fn new_task_join_error(e: tokio::task::JoinError) -> Error {
+    Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
+}
diff --git a/core/src/services/sqlite/backend.rs 
b/core/src/services/sqlite/backend.rs
index 9aac1c44d..24dbc1674 100644
--- a/core/src/services/sqlite/backend.rs
+++ b/core/src/services/sqlite/backend.rs
@@ -155,8 +155,13 @@ impl Builder for SqliteBuilder {
                 .unwrap_or_else(|| "/".to_string())
                 .as_str(),
         );
+        let mgr = SqliteConnectionManager { connection_string };
+        let pool = r2d2::Pool::new(mgr).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "sqlite pool init 
failed").set_source(err)
+        })?;
+
         Ok(SqliteBackend::new(Adapter {
-            connection_string,
+            pool,
             table,
             key_field,
             value_field,
@@ -165,11 +170,33 @@ impl Builder for SqliteBuilder {
     }
 }
 
+struct SqliteConnectionManager {
+    connection_string: String,
+}
+
+impl r2d2::ManageConnection for SqliteConnectionManager {
+    type Connection = Connection;
+    type Error = Error;
+
+    fn connect(&self) -> Result<Connection> {
+        Connection::open(&self.connection_string)
+            .map_err(|err| Error::new(ErrorKind::Unexpected, "sqlite open 
error").set_source(err))
+    }
+
+    fn is_valid(&self, conn: &mut Connection) -> Result<()> {
+        conn.execute_batch("").map_err(parse_rusqlite_error)
+    }
+
+    fn has_broken(&self, _: &mut Connection) -> bool {
+        false
+    }
+}
+
 pub type SqliteBackend = kv::Backend<Adapter>;
 
 #[derive(Clone)]
 pub struct Adapter {
-    connection_string: String,
+    pool: r2d2::Pool<SqliteConnectionManager>,
 
     table: String,
     key_field: String,
@@ -179,7 +206,6 @@ pub struct Adapter {
 impl Debug for Adapter {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         let mut ds = f.debug_struct("SqliteAdapter");
-        ds.field("connection_string", &self.connection_string);
         ds.field("table", &self.table);
         ds.field("key_field", &self.key_field);
         ds.field("value_field", &self.value_field);
@@ -198,93 +224,85 @@ impl kv::Adapter for Adapter {
                 write: true,
                 create_dir: true,
                 delete: true,
+                blocking: true,
                 ..Default::default()
             },
         )
     }
 
     async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
-        let cloned_path = path.to_string();
-        let cloned_self = self.clone();
+        let this = self.clone();
+        let path = path.to_string();
 
-        task::spawn_blocking(move || 
cloned_self.blocking_get(cloned_path.as_str()))
+        task::spawn_blocking(move || this.blocking_get(&path))
             .await
-            .map_err(Error::from)
-            .and_then(|inner_result| inner_result)
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+        let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
         let query = format!(
             "SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1",
             self.value_field, self.table, self.key_field
         );
-        let conn = 
Connection::open(self.connection_string.clone()).map_err(Error::from)?;
-        let mut statement = conn.prepare(&query).map_err(Error::from)?;
+        let mut statement = 
conn.prepare(&query).map_err(parse_rusqlite_error)?;
         let result = statement.query_row([path], |row| row.get(0));
         match result {
             Ok(v) => Ok(Some(v)),
             Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
-            Err(err) => Err(Error::from(err)),
+            Err(err) => Err(parse_rusqlite_error(err)),
         }
     }
 
     async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
-        let cloned_path = path.to_string();
-        let cloned_value = value.to_vec();
-        let cloned_self = self.clone();
+        let this = self.clone();
+        let path = path.to_string();
+        // FIXME: can we avoid this copy?
+        let value = value.to_vec();
 
-        task::spawn_blocking(move || 
cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
+        task::spawn_blocking(move || this.blocking_set(&path, &value))
             .await
-            .map_err(Error::from)
-            .and_then(|inner_result| inner_result)
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
+        let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
         let query = format!(
             "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
             self.table, self.key_field, self.value_field
         );
-        let conn = 
Connection::open(self.connection_string.clone()).map_err(Error::from)?;
-        let mut statement = conn.prepare(&query).map_err(Error::from)?;
+        let mut statement = 
conn.prepare(&query).map_err(parse_rusqlite_error)?;
         statement
             .execute(params![path, value])
-            .map_err(Error::from)?;
+            .map_err(parse_rusqlite_error)?;
         Ok(())
     }
 
     async fn delete(&self, path: &str) -> Result<()> {
-        let cloned_path = path.to_string();
-        let cloned_self = self.clone();
+        let this = self.clone();
+        let path = path.to_string();
 
-        task::spawn_blocking(move || 
cloned_self.blocking_delete(cloned_path.as_str()))
+        task::spawn_blocking(move || this.blocking_delete(&path))
             .await
-            .map_err(Error::from)
-            .and_then(|inner_result| inner_result)
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_delete(&self, path: &str) -> Result<()> {
-        let conn = 
Connection::open(self.connection_string.clone()).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "Sqlite open 
error").set_source(err)
-        })?;
+        let conn = self.pool.get().map_err(parse_r2d2_error)?;
+
         let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, 
self.key_field);
-        let mut statement = conn.prepare(&query).map_err(Error::from)?;
-        statement.execute([path]).map_err(Error::from)?;
+        let mut statement = 
conn.prepare(&query).map_err(parse_rusqlite_error)?;
+        statement.execute([path]).map_err(parse_rusqlite_error)?;
         Ok(())
     }
 }
 
-impl From<rusqlite::Error> for Error {
-    fn from(value: rusqlite::Error) -> Error {
-        Error::new(ErrorKind::Unexpected, "unhandled error from 
sqlite").set_source(value)
-    }
+fn parse_rusqlite_error(err: rusqlite::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
sqlite").set_source(err)
 }
 
-impl From<task::JoinError> for Error {
-    fn from(value: task::JoinError) -> Error {
-        Error::new(
-            ErrorKind::Unexpected,
-            "unhandled error from sqlite when spawning task",
-        )
-        .set_source(value)
-    }
+fn parse_r2d2_error(err: r2d2::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
r2d2").set_source(err)
 }
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index 695eefe2c..f6a5e2df4 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
     tests.extend(behavior_test::<services::Atomicserver>());
     #[cfg(feature = "services-azblob")]
     tests.extend(behavior_test::<services::Azblob>());
-    #[cfg(feature = "services-Azdls")]
+    #[cfg(feature = "services-azdls")]
     tests.extend(behavior_test::<services::Azdls>());
     #[cfg(feature = "services-cacache")]
     tests.extend(behavior_test::<services::Cacache>());
diff --git a/integrations/dav-server/Cargo.toml 
b/integrations/dav-server/Cargo.toml
index aa82c51a1..5bfdb0fe1 100644
--- a/integrations/dav-server/Cargo.toml
+++ b/integrations/dav-server/Cargo.toml
@@ -29,10 +29,10 @@ version.workspace = true
 
 [dependencies]
 anyhow = "1"
-chrono = "0.4.28"
-dirs = "5.0.0"
 bytes = { version = "1.4.0" }
+chrono = "0.4.28"
 dav-server = { version = "0.5.5" }
+dirs = "5.0.0"
 futures = "0.3"
 futures-util = { version = "0.3.16" }
 opendal.workspace = true
@@ -44,4 +44,3 @@ tokio = { version = "1.27", features = [
   "rt-multi-thread",
   "io-std",
 ] }
-

Reply via email to