This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 378d7868c feat(services/sled): change blocking_x in async_x call to 
tokio::task::blocking_spawn (#3280)
378d7868c is described below

commit 378d7868c3d2ece3c23b15f0beb1b90d0007c8f7
Author: yangxiaowei <[email protected]>
AuthorDate: Sat Oct 14 19:03:10 2023 +0900

    feat(services/sled): change blocking_x in async_x call to 
tokio::task::blocking_spawn (#3280)
---
 core/src/services/sled/backend.rs | 31 +++++++++++++++++++++++++++----
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/services/sled/backend.rs 
b/core/src/services/sled/backend.rs
index 41e8c1fd7..df9eca583 100644
--- a/core/src/services/sled/backend.rs
+++ b/core/src/services/sled/backend.rs
@@ -21,8 +21,10 @@ use std::fmt::Formatter;
 use std::str;
 
 use async_trait::async_trait;
+use tokio::task;
 
 use crate::raw::adapters::kv;
+use crate::raw::*;
 use crate::Builder;
 use crate::Error;
 use crate::ErrorKind;
@@ -145,7 +147,12 @@ impl kv::Adapter for Adapter {
     }
 
     async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
-        self.blocking_get(path)
+        let cloned_self = self.clone();
+        let cloned_path = path.to_string();
+
+        task::spawn_blocking(move || 
cloned_self.blocking_get(cloned_path.as_str()))
+            .await
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
@@ -157,7 +164,13 @@ impl kv::Adapter for Adapter {
     }
 
     async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
-        self.blocking_set(path, value)
+        let cloned_self = self.clone();
+        let cloned_path = path.to_string();
+        let cloned_value = value.to_vec();
+
+        task::spawn_blocking(move || 
cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
+            .await
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
@@ -167,7 +180,12 @@ impl kv::Adapter for Adapter {
     }
 
     async fn delete(&self, path: &str) -> Result<()> {
-        self.blocking_delete(path)
+        let cloned_self = self.clone();
+        let cloned_path = path.to_string();
+
+        task::spawn_blocking(move || 
cloned_self.blocking_delete(cloned_path.as_str()))
+            .await
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_delete(&self, path: &str) -> Result<()> {
@@ -177,7 +195,12 @@ impl kv::Adapter for Adapter {
     }
 
     async fn scan(&self, path: &str) -> Result<Vec<String>> {
-        self.blocking_scan(path)
+        let cloned_self = self.clone();
+        let cloned_path = path.to_string();
+
+        task::spawn_blocking(move || 
cloned_self.blocking_scan(cloned_path.as_str()))
+            .await
+            .map_err(new_task_join_error)?
     }
 
     fn blocking_scan(&self, path: &str) -> Result<Vec<String>> {

Reply via email to