This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 378d7868c feat(services/sled): change blocking_x in async_x call to
tokio::task::blocking_spawn (#3280)
378d7868c is described below
commit 378d7868c3d2ece3c23b15f0beb1b90d0007c8f7
Author: yangxiaowei <[email protected]>
AuthorDate: Sat Oct 14 19:03:10 2023 +0900
feat(services/sled): change blocking_x in async_x call to
tokio::task::blocking_spawn (#3280)
---
core/src/services/sled/backend.rs | 31 +++++++++++++++++++++++++++----
1 file changed, 27 insertions(+), 4 deletions(-)
diff --git a/core/src/services/sled/backend.rs
b/core/src/services/sled/backend.rs
index 41e8c1fd7..df9eca583 100644
--- a/core/src/services/sled/backend.rs
+++ b/core/src/services/sled/backend.rs
@@ -21,8 +21,10 @@ use std::fmt::Formatter;
use std::str;
use async_trait::async_trait;
+use tokio::task;
use crate::raw::adapters::kv;
+use crate::raw::*;
use crate::Builder;
use crate::Error;
use crate::ErrorKind;
@@ -145,7 +147,12 @@ impl kv::Adapter for Adapter {
}
async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
- self.blocking_get(path)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_get(cloned_path.as_str()))
+ .await
+ .map_err(new_task_join_error)?
}
fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
@@ -157,7 +164,13 @@ impl kv::Adapter for Adapter {
}
async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
- self.blocking_set(path, value)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+ let cloned_value = value.to_vec();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
+ .await
+ .map_err(new_task_join_error)?
}
fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
@@ -167,7 +180,12 @@ impl kv::Adapter for Adapter {
}
async fn delete(&self, path: &str) -> Result<()> {
- self.blocking_delete(path)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_delete(cloned_path.as_str()))
+ .await
+ .map_err(new_task_join_error)?
}
fn blocking_delete(&self, path: &str) -> Result<()> {
@@ -177,7 +195,12 @@ impl kv::Adapter for Adapter {
}
async fn scan(&self, path: &str) -> Result<Vec<String>> {
- self.blocking_scan(path)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_scan(cloned_path.as_str()))
+ .await
+ .map_err(new_task_join_error)?
}
fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {