This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new cd967b308 feat(services/redb): change blocking_x in async_x call to
tokio::task::blocking_spawn (#3276)
cd967b308 is described below
commit cd967b30817939440da220fe691fb16d1ba4f84c
Author: yangxiaowei <[email protected]>
AuthorDate: Sat Oct 14 01:27:13 2023 +0900
feat(services/redb): change blocking_x in async_x call to
tokio::task::blocking_spawn (#3276)
Co-authored-by: yangxw <[email protected]>
---
core/src/services/redb/backend.rs | 27 ++++++++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/core/src/services/redb/backend.rs
b/core/src/services/redb/backend.rs
index 2d721800e..ab3c0f601 100644
--- a/core/src/services/redb/backend.rs
+++ b/core/src/services/redb/backend.rs
@@ -22,8 +22,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use redb::ReadableTable;
+use tokio::task;
use crate::raw::adapters::kv;
+use crate::raw::*;
use crate::Builder;
use crate::Error;
use crate::ErrorKind;
@@ -132,7 +134,13 @@ impl kv::Adapter for Adapter {
}
async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
- self.blocking_get(path)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_get(cloned_path.as_str()))
+ .await
+ .map_err(new_task_join_error)
+ .and_then(|inner_result| inner_result)
}
fn blocking_get(&self, path: &str) -> Result<Option<Vec<u8>>> {
@@ -154,7 +162,14 @@ impl kv::Adapter for Adapter {
}
async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
- self.blocking_set(path, value)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+ let cloned_value = value.to_vec();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_set(cloned_path.as_str(), &cloned_value))
+ .await
+ .map_err(new_task_join_error)
+ .and_then(|inner_result| inner_result)
}
fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> {
@@ -176,7 +191,13 @@ impl kv::Adapter for Adapter {
}
async fn delete(&self, path: &str) -> Result<()> {
- self.blocking_delete(path)
+ let cloned_self = self.clone();
+ let cloned_path = path.to_string();
+
+ task::spawn_blocking(move ||
cloned_self.blocking_delete(cloned_path.as_str()))
+ .await
+ .map_err(new_task_join_error)
+ .and_then(|inner_result| inner_result)
}
fn blocking_delete(&self, path: &str) -> Result<()> {