This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch use-mea-sync in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 11f6b217dd657f5fbb5f7d3a3c5eb7f81b79bb5b Author: tison <[email protected]> AuthorDate: Tue Nov 25 14:30:09 2025 +0800 chore: move from tokio::sync to mea primitives Signed-off-by: tison <[email protected]> --- bindings/cpp/Cargo.toml | 1 + bindings/cpp/src/async.rs | 2 +- bindings/python/Cargo.toml | 1 + bindings/python/src/file.rs | 2 +- bindings/python/src/lister.rs | 5 ++- core/Cargo.lock | 10 +++++ core/Cargo.toml | 3 +- core/src/layers/concurrent_limit.rs | 49 +++++-------------------- core/src/raw/oio/write/multipart_write.rs | 2 +- core/src/raw/path_cache.rs | 4 +- core/src/services/aliyun_drive/backend.rs | 2 +- core/src/services/aliyun_drive/core.rs | 4 +- core/src/services/b2/backend.rs | 2 +- core/src/services/b2/core.rs | 2 +- core/src/services/dropbox/builder.rs | 2 +- core/src/services/dropbox/core.rs | 2 +- core/src/services/gdrive/builder.rs | 2 +- core/src/services/gdrive/core.rs | 2 +- core/src/services/koofr/backend.rs | 2 +- core/src/services/koofr/core.rs | 2 +- core/src/services/onedrive/builder.rs | 2 +- core/src/services/onedrive/core.rs | 2 +- core/src/services/seafile/backend.rs | 2 +- core/src/services/seafile/core.rs | 2 +- core/src/types/context/write.rs | 2 +- integrations/object_store/Cargo.toml | 1 + integrations/object_store/src/service/writer.rs | 8 +--- integrations/object_store/src/store.rs | 3 +- 28 files changed, 52 insertions(+), 71 deletions(-) diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml index 169234739..d727c4ffd 100644 --- a/bindings/cpp/Cargo.toml +++ b/bindings/cpp/Cargo.toml @@ -34,6 +34,7 @@ anyhow = { version = "1.0.100" } cxx = { version = "1.0.186" } cxx-async = { version = "0.1.3", optional = true } futures = { version = "0.3.31" } +mea = { version = "0.5.0" } # this crate won't be published, we always use the local version opendal = { version = ">=0", path = "../../core", features = ["blocking"] } tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs index be461ea51..eb2a89224 100644 --- a/bindings/cpp/src/async.rs +++ b/bindings/cpp/src/async.rs @@ -17,6 +17,7 @@ use anyhow::Result; use cxx_async::CxxAsyncException; +use mea::mutex::Mutex; use opendal as od; use std::collections::HashMap; use std::future::Future; @@ -24,7 +25,6 @@ use std::ops::Deref; use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, OnceLock}; -use tokio::sync::Mutex; #[cxx::bridge(namespace = opendal::ffi::async_op)] mod ffi { diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index f585ab5fe..1c0889429 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -201,6 +201,7 @@ bytes = "1.5.0" dict_derive = "0.6.0" futures = "0.3.28" jiff = { version = "0.2.15" } +mea = { version = "0.5.0" } # this crate won't be published, we always use the local version opendal = { version = ">=0", path = "../../core", features = [ "blocking", diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs index 6e1d6a861..da744eb39 100644 --- a/bindings/python/src/file.rs +++ b/bindings/python/src/file.rs @@ -26,13 +26,13 @@ use std::sync::Arc; use futures::AsyncReadExt; use futures::AsyncSeekExt; use futures::AsyncWriteExt; +use mea::mutex::Mutex; use pyo3::IntoPyObjectExt; use pyo3::buffer::PyBuffer; use pyo3::exceptions::PyIOError; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3_async_runtimes::tokio::future_into_py; -use tokio::sync::Mutex; use crate::*; diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs index dfa5193cf..04b77ec58 100644 --- a/bindings/python/src/lister.rs +++ b/bindings/python/src/lister.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use futures::TryStreamExt; +use mea::mutex::Mutex; +use pyo3::IntoPyObjectExt; use pyo3::exceptions::PyStopAsyncIteration; -use pyo3::{IntoPyObjectExt, prelude::*}; +use pyo3::prelude::*; use pyo3_async_runtimes::tokio::future_into_py; -use tokio::sync::Mutex; use crate::*; diff --git a/core/Cargo.lock b/core/Cargo.lock index 76d6c1277..7498081a6 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4770,6 +4770,15 @@ dependencies = [ "digest", ] +[[package]] +name = "mea" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3e2d273c5e8d75098efdc751378787e9197e242abe344b9326a117242c3f9d" +dependencies = [ + "slab", +] + [[package]] name = "memchr" version = "2.7.6" @@ -5372,6 +5381,7 @@ dependencies = [ "libtest-mimic", "log", "md-5", + "mea", "metrics", "mime_guess", "mini-moka", diff --git a/core/Cargo.toml b/core/Cargo.toml index 5ad6f706d..f95075b88 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -252,6 +252,7 @@ http-body = "1" jiff = { version = "0.2.15", features = ["serde"] } log = "0.4" md-5 = "0.10" +mea = { version = "0.5.0" } percent-encoding = "2" quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.24", features = [ @@ -259,7 +260,7 @@ reqwest = { version = "0.12.24", features = [ ], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" -tokio = { version = "1.48", features = ["sync", "io-util"] } +tokio = { version = "1.48", features = ["io-util"] } url = "2.5" uuid = { version = "1", features = ["serde", "v4"] } diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index d51f4de45..0f42007ba 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -23,8 +23,8 @@ use std::task::Poll; use futures::Stream; use futures::StreamExt; -use tokio::sync::OwnedSemaphorePermit; -use tokio::sync::Semaphore; +use mea::semaphore::OwnedSemaphorePermit; +use mea::semaphore::Semaphore; use crate::raw::*; use crate::*; @@ -139,10 +139,7 @@ impl HttpFetch for ConcurrentLimitHttpFetcher { return self.inner.fetch(req).await; }; - let permit = semaphore - .acquire_owned() - .await - .expect("semaphore must be valid"); + let permit = semaphore.acquire_owned(1).await; let resp = self.inner.fetch(req).await?; let (parts, body) = resp.into_parts(); @@ -191,22 +188,13 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { - let _permit = self - .semaphore - .acquire() - .await - .expect("semaphore must be valid"); + let _permit = self.semaphore.acquire(1).await; self.inner.create_dir(path, args).await } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let permit = self - .semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore must be valid"); + let permit = self.semaphore.clone().acquire_owned(1).await; self.inner .read(path, args) @@ -215,12 +203,7 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let permit = self - .semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore must be valid"); + let permit = self.semaphore.clone().acquire_owned(1).await; self.inner .write(path, args) @@ -229,22 +212,13 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { - let _permit = self - .semaphore - .acquire() - .await - .expect("semaphore must be valid"); + let _permit = self.semaphore.acquire(1).await; self.inner.stat(path, args).await } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - let permit = self - .semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore must be valid"); + let permit = self.semaphore.clone().acquire_owned(1).await; self.inner .delete() @@ -253,12 +227,7 @@ impl<A: Access> LayeredAccess for ConcurrentLimitAccessor<A> { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let permit = self - .semaphore - .clone() - .acquire_owned() - .await - .expect("semaphore must be valid"); + let permit = self.semaphore.clone().acquire_owned(1).await; self.inner .list(path, args) diff --git a/core/src/raw/oio/write/multipart_write.rs b/core/src/raw/oio/write/multipart_write.rs index 03bb8d270..403774826 100644 --- a/core/src/raw/oio/write/multipart_write.rs +++ b/core/src/raw/oio/write/multipart_write.rs @@ -309,11 +309,11 @@ where mod tests { use std::time::Duration; + use mea::mutex::Mutex; use pretty_assertions::assert_eq; use rand::Rng; use rand::RngCore; use rand::thread_rng; - use tokio::sync::Mutex; use tokio::time::sleep; use tokio::time::timeout; diff --git a/core/src/raw/path_cache.rs b/core/src/raw/path_cache.rs index ef8f51253..1d044f38e 100644 --- a/core/src/raw/path_cache.rs +++ b/core/src/raw/path_cache.rs @@ -18,9 +18,9 @@ use std::collections::VecDeque; use futures::Future; +use mea::mutex::Mutex; +use mea::mutex::MutexGuard; use moka::sync::Cache; -use tokio::sync::Mutex; -use tokio::sync::MutexGuard; use crate::raw::*; use crate::*; diff --git a/core/src/services/aliyun_drive/backend.rs b/core/src/services/aliyun_drive/backend.rs index a88927650..27fdb6da7 100644 --- a/core/src/services/aliyun_drive/backend.rs +++ b/core/src/services/aliyun_drive/backend.rs @@ -22,7 +22,7 @@ use bytes::Buf; use http::Response; use http::StatusCode; use log::debug; -use tokio::sync::Mutex; +use mea::mutex::Mutex; use super::ALIYUN_DRIVE_SCHEME; use super::config::AliyunDriveConfig; diff --git a/core/src/services/aliyun_drive/core.rs b/core/src/services/aliyun_drive/core.rs index eb20a7867..05cee528b 100644 --- a/core/src/services/aliyun_drive/core.rs +++ b/core/src/services/aliyun_drive/core.rs @@ -22,11 +22,11 @@ use bytes::Buf; use http::Method; use http::Request; use http::Response; +use http::header; use http::header::HeaderValue; -use http::header::{self}; +use mea::mutex::Mutex; use serde::Deserialize; use serde::Serialize; -use tokio::sync::Mutex; use super::error::parse_error; use crate::raw::*; diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs index b4a7bfc0b..1f78f0ddc 100644 --- a/core/src/services/b2/backend.rs +++ b/core/src/services/b2/backend.rs @@ -22,7 +22,7 @@ use http::Request; use http::Response; use http::StatusCode; use log::debug; -use tokio::sync::RwLock; +use mea::rwlock::RwLock; use super::B2_SCHEME; use super::config::B2Config; diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs index 636200a38..b88a20598 100644 --- a/core/src/services/b2/core.rs +++ b/core/src/services/b2/core.rs @@ -24,9 +24,9 @@ use http::Request; use http::Response; use http::StatusCode; use http::header; +use mea::rwlock::RwLock; use serde::Deserialize; use serde::Serialize; -use tokio::sync::RwLock; use self::constants::X_BZ_CONTENT_SHA1; use self::constants::X_BZ_FILE_NAME; diff --git a/core/src/services/dropbox/builder.rs b/core/src/services/dropbox/builder.rs index e9e0ec71f..bcda4694e 100644 --- a/core/src/services/dropbox/builder.rs +++ b/core/src/services/dropbox/builder.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::Mutex; +use mea::mutex::Mutex; use super::DROPBOX_SCHEME; use super::backend::DropboxBackend; diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 30e61ce35..396c73fe2 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -27,9 +27,9 @@ use http::StatusCode; use http::header; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; +use mea::mutex::Mutex; use serde::Deserialize; use serde::Serialize; -use tokio::sync::Mutex; use super::error::parse_error; use crate::raw::*; diff --git a/core/src/services/gdrive/builder.rs b/core/src/services/gdrive/builder.rs index df1273beb..5abe7e7ed 100644 --- a/core/src/services/gdrive/builder.rs +++ b/core/src/services/gdrive/builder.rs @@ -19,7 +19,7 @@ use std::fmt::Debug; use std::sync::Arc; use log::debug; -use tokio::sync::Mutex; +use mea::mutex::Mutex; use super::GDRIVE_SCHEME; use super::backend::GdriveBackend; diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index 7e4f00db3..0b89c14b3 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -25,9 +25,9 @@ use http::Request; use http::Response; use http::StatusCode; use http::header; +use mea::mutex::Mutex; use serde::Deserialize; use serde_json::json; -use tokio::sync::Mutex; use super::error::parse_error; use crate::raw::*; diff --git a/core/src/services/koofr/backend.rs b/core/src/services/koofr/backend.rs index a887b0b02..e129deded 100644 --- a/core/src/services/koofr/backend.rs +++ b/core/src/services/koofr/backend.rs @@ -22,7 +22,7 @@ use bytes::Buf; use http::Response; use http::StatusCode; use log::debug; -use tokio::sync::Mutex; +use mea::mutex::Mutex; use tokio::sync::OnceCell; use super::KOOFR_SCHEME; diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs index c9747501a..a12926947 100644 --- a/core/src/services/koofr/core.rs +++ b/core/src/services/koofr/core.rs @@ -26,9 +26,9 @@ use http::Response; use http::StatusCode; use http::header; use http::request; +use mea::mutex::Mutex; use serde::Deserialize; use serde_json::json; -use tokio::sync::Mutex; use tokio::sync::OnceCell; use super::error::parse_error; diff --git a/core/src/services/onedrive/builder.rs b/core/src/services/onedrive/builder.rs index 659fa376d..a058d27f1 100644 --- a/core/src/services/onedrive/builder.rs +++ b/core/src/services/onedrive/builder.rs @@ -19,9 +19,9 @@ use std::fmt::Debug; use std::sync::Arc; use log::debug; +use mea::mutex::Mutex; use services::onedrive::core::OneDriveCore; use services::onedrive::core::OneDriveSigner; -use tokio::sync::Mutex; use super::ONEDRIVE_SCHEME; use super::backend::OnedriveBackend; diff --git a/core/src/services/onedrive/core.rs b/core/src/services/onedrive/core.rs index 4ebd93014..1867fbb65 100644 --- a/core/src/services/onedrive/core.rs +++ b/core/src/services/onedrive/core.rs @@ -25,7 +25,7 @@ use http::Request; use http::Response; use http::StatusCode; use http::header; -use tokio::sync::Mutex; +use mea::mutex::Mutex; use super::error::parse_error; use super::graph_model::*; diff --git a/core/src/services/seafile/backend.rs b/core/src/services/seafile/backend.rs index 43114a8e8..d0d11de3f 100644 --- a/core/src/services/seafile/backend.rs +++ b/core/src/services/seafile/backend.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use http::Response; use http::StatusCode; use log::debug; -use tokio::sync::RwLock; +use mea::rwlock::RwLock; use super::SEAFILE_SCHEME; use super::config::SeafileConfig; diff --git a/core/src/services/seafile/core.rs b/core/src/services/seafile/core.rs index 3a3c60e62..e26a1cf70 100644 --- a/core/src/services/seafile/core.rs +++ b/core/src/services/seafile/core.rs @@ -24,8 +24,8 @@ use http::Request; use http::Response; use http::StatusCode; use http::header; +use mea::rwlock::RwLock; use serde::Deserialize; -use tokio::sync::RwLock; use super::error::parse_error; use crate::raw::*; diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs index b16799f38..cc6c7720c 100644 --- a/core/src/types/context/write.rs +++ b/core/src/types/context/write.rs @@ -210,13 +210,13 @@ mod tests { use bytes::BufMut; use bytes::Bytes; use log::debug; + use mea::mutex::Mutex; use pretty_assertions::assert_eq; use rand::Rng; use rand::RngCore; use rand::thread_rng; use sha2::Digest; use sha2::Sha256; - use tokio::sync::Mutex; use super::*; use crate::raw::oio::Write; diff --git a/integrations/object_store/Cargo.toml b/integrations/object_store/Cargo.toml index 534f17310..00dbf5bcc 100644 --- a/integrations/object_store/Cargo.toml +++ b/integrations/object_store/Cargo.toml @@ -41,6 +41,7 @@ async-trait = "0.1" bytes = "1" chrono = { version = "0.4.42", features = ["std", "clock"] } futures = "0.3" +mea = { version = "0.5.0 "} object_store = "0.12.3" opendal = { version = "0.55.0", path = "../../core", default-features = false } pin-project = "1.1" diff --git a/integrations/object_store/src/service/writer.rs b/integrations/object_store/src/service/writer.rs index 5dc9ec809..9b6c07dc6 100644 --- a/integrations/object_store/src/service/writer.rs +++ b/integrations/object_store/src/service/writer.rs @@ -23,10 +23,10 @@ use object_store::PutPayload; use object_store::path::Path as ObjectStorePath; use object_store::{Attribute, AttributeValue}; +use mea::mutex::Mutex; use opendal::raw::oio::MultipartPart; use opendal::raw::*; use opendal::*; -use tokio::sync::Mutex; use super::core::{format_put_multipart_options, format_put_result, parse_op_write}; use super::error::parse_error; @@ -160,11 +160,7 @@ impl oio::MultipartWrite for ObjectStoreWriter { Ok(multipart_part) } - async fn complete_part( - &self, - _upload_id: &str, - parts: &[oio::MultipartPart], - ) -> Result<Metadata> { + async fn complete_part(&self, _upload_id: &str, parts: &[MultipartPart]) -> Result<Metadata> { // Validate that we have parts to complete if parts.is_empty() { return Err(Error::new( diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index 177ce72d5..175421673 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -27,6 +27,7 @@ use futures::FutureExt; use futures::StreamExt; use futures::TryStreamExt; use futures::stream::BoxStream; +use mea::mutex::Mutex; use object_store::ListResult; use object_store::MultipartUpload; use object_store::ObjectMeta; @@ -45,7 +46,7 @@ use opendal::options::CopyOptions; use opendal::raw::percent_decode_path; use opendal::{Operator, OperatorInfo}; use std::collections::HashMap; -use tokio::sync::{Mutex, Notify}; +use tokio::sync::Notify; /// OpendalStore implements ObjectStore trait by using opendal. ///
