This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch use-mea-sync
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 11f6b217dd657f5fbb5f7d3a3c5eb7f81b79bb5b
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 14:30:09 2025 +0800

    chore: move from tokio::sync to mea primitives
    
    Signed-off-by: tison <[email protected]>
---
 bindings/cpp/Cargo.toml                         |  1 +
 bindings/cpp/src/async.rs                       |  2 +-
 bindings/python/Cargo.toml                      |  1 +
 bindings/python/src/file.rs                     |  2 +-
 bindings/python/src/lister.rs                   |  5 ++-
 core/Cargo.lock                                 | 10 +++++
 core/Cargo.toml                                 |  3 +-
 core/src/layers/concurrent_limit.rs             | 49 +++++--------------------
 core/src/raw/oio/write/multipart_write.rs       |  2 +-
 core/src/raw/path_cache.rs                      |  4 +-
 core/src/services/aliyun_drive/backend.rs       |  2 +-
 core/src/services/aliyun_drive/core.rs          |  4 +-
 core/src/services/b2/backend.rs                 |  2 +-
 core/src/services/b2/core.rs                    |  2 +-
 core/src/services/dropbox/builder.rs            |  2 +-
 core/src/services/dropbox/core.rs               |  2 +-
 core/src/services/gdrive/builder.rs             |  2 +-
 core/src/services/gdrive/core.rs                |  2 +-
 core/src/services/koofr/backend.rs              |  2 +-
 core/src/services/koofr/core.rs                 |  2 +-
 core/src/services/onedrive/builder.rs           |  2 +-
 core/src/services/onedrive/core.rs              |  2 +-
 core/src/services/seafile/backend.rs            |  2 +-
 core/src/services/seafile/core.rs               |  2 +-
 core/src/types/context/write.rs                 |  2 +-
 integrations/object_store/Cargo.toml            |  1 +
 integrations/object_store/src/service/writer.rs |  8 +---
 integrations/object_store/src/store.rs          |  3 +-
 28 files changed, 52 insertions(+), 71 deletions(-)

diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 169234739..d727c4ffd 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,6 +34,7 @@ anyhow = { version = "1.0.100" }
 cxx = { version = "1.0.186" }
 cxx-async = { version = "0.1.3", optional = true }
 futures = { version = "0.3.31" }
+mea = { version = "0.5.0" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = ["blocking"] }
 tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs
index be461ea51..eb2a89224 100644
--- a/bindings/cpp/src/async.rs
+++ b/bindings/cpp/src/async.rs
@@ -17,6 +17,7 @@
 
 use anyhow::Result;
 use cxx_async::CxxAsyncException;
+use mea::mutex::Mutex;
 use opendal as od;
 use std::collections::HashMap;
 use std::future::Future;
@@ -24,7 +25,6 @@ use std::ops::Deref;
 use std::pin::Pin;
 use std::str::FromStr;
 use std::sync::{Arc, OnceLock};
-use tokio::sync::Mutex;
 
 #[cxx::bridge(namespace = opendal::ffi::async_op)]
 mod ffi {
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index f585ab5fe..1c0889429 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -201,6 +201,7 @@ bytes = "1.5.0"
 dict_derive = "0.6.0"
 futures = "0.3.28"
 jiff = { version = "0.2.15" }
+mea = { version = "0.5.0" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = [
   "blocking",
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index 6e1d6a861..da744eb39 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -26,13 +26,13 @@ use std::sync::Arc;
 use futures::AsyncReadExt;
 use futures::AsyncSeekExt;
 use futures::AsyncWriteExt;
+use mea::mutex::Mutex;
 use pyo3::IntoPyObjectExt;
 use pyo3::buffer::PyBuffer;
 use pyo3::exceptions::PyIOError;
 use pyo3::exceptions::PyValueError;
 use pyo3::prelude::*;
 use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
 
 use crate::*;
 
diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs
index dfa5193cf..04b77ec58 100644
--- a/bindings/python/src/lister.rs
+++ b/bindings/python/src/lister.rs
@@ -18,10 +18,11 @@
 use std::sync::Arc;
 
 use futures::TryStreamExt;
+use mea::mutex::Mutex;
+use pyo3::IntoPyObjectExt;
 use pyo3::exceptions::PyStopAsyncIteration;
-use pyo3::{IntoPyObjectExt, prelude::*};
+use pyo3::prelude::*;
 use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
 
 use crate::*;
 
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 76d6c1277..7498081a6 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -4770,6 +4770,15 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "mea"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1f3e2d273c5e8d75098efdc751378787e9197e242abe344b9326a117242c3f9d"
+dependencies = [
+ "slab",
+]
+
 [[package]]
 name = "memchr"
 version = "2.7.6"
@@ -5372,6 +5381,7 @@ dependencies = [
  "libtest-mimic",
  "log",
  "md-5",
+ "mea",
  "metrics",
  "mime_guess",
  "mini-moka",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 5ad6f706d..f95075b88 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -252,6 +252,7 @@ http-body = "1"
 jiff = { version = "0.2.15", features = ["serde"] }
 log = "0.4"
 md-5 = "0.10"
+mea = { version = "0.5.0" }
 percent-encoding = "2"
 quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
 reqwest = { version = "0.12.24", features = [
@@ -259,7 +260,7 @@ reqwest = { version = "0.12.24", features = [
 ], default-features = false }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
-tokio = { version = "1.48", features = ["sync", "io-util"] }
+tokio = { version = "1.48", features = ["io-util"] }
 url = "2.5"
 uuid = { version = "1", features = ["serde", "v4"] }
 
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index d51f4de45..0f42007ba 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -23,8 +23,8 @@ use std::task::Poll;
 
 use futures::Stream;
 use futures::StreamExt;
-use tokio::sync::OwnedSemaphorePermit;
-use tokio::sync::Semaphore;
+use mea::semaphore::OwnedSemaphorePermit;
+use mea::semaphore::Semaphore;
 
 use crate::raw::*;
 use crate::*;
@@ -139,10 +139,7 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
             return self.inner.fetch(req).await;
         };
 
-        let permit = semaphore
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = semaphore.acquire_owned(1).await;
 
         let resp = self.inner.fetch(req).await?;
         let (parts, body) = resp.into_parts();
@@ -191,22 +188,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let _permit = self
-            .semaphore
-            .acquire()
-            .await
-            .expect("semaphore must be valid");
+        let _permit = self.semaphore.acquire(1).await;
 
         self.inner.create_dir(path, args).await
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .read(path, args)
@@ -215,12 +203,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .write(path, args)
@@ -229,22 +212,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let _permit = self
-            .semaphore
-            .acquire()
-            .await
-            .expect("semaphore must be valid");
+        let _permit = self.semaphore.acquire(1).await;
 
         self.inner.stat(path, args).await
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .delete()
@@ -253,12 +227,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .list(path, args)
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 03bb8d270..403774826 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -309,11 +309,11 @@ where
 mod tests {
     use std::time::Duration;
 
+    use mea::mutex::Mutex;
     use pretty_assertions::assert_eq;
     use rand::Rng;
     use rand::RngCore;
     use rand::thread_rng;
-    use tokio::sync::Mutex;
     use tokio::time::sleep;
     use tokio::time::timeout;
 
diff --git a/core/src/raw/path_cache.rs b/core/src/raw/path_cache.rs
index ef8f51253..1d044f38e 100644
--- a/core/src/raw/path_cache.rs
+++ b/core/src/raw/path_cache.rs
@@ -18,9 +18,9 @@
 use std::collections::VecDeque;
 
 use futures::Future;
+use mea::mutex::Mutex;
+use mea::mutex::MutexGuard;
 use moka::sync::Cache;
-use tokio::sync::Mutex;
-use tokio::sync::MutexGuard;
 
 use crate::raw::*;
 use crate::*;
diff --git a/core/src/services/aliyun_drive/backend.rs 
b/core/src/services/aliyun_drive/backend.rs
index a88927650..27fdb6da7 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::ALIYUN_DRIVE_SCHEME;
 use super::config::AliyunDriveConfig;
diff --git a/core/src/services/aliyun_drive/core.rs 
b/core/src/services/aliyun_drive/core.rs
index eb20a7867..05cee528b 100644
--- a/core/src/services/aliyun_drive/core.rs
+++ b/core/src/services/aliyun_drive/core.rs
@@ -22,11 +22,11 @@ use bytes::Buf;
 use http::Method;
 use http::Request;
 use http::Response;
+use http::header;
 use http::header::HeaderValue;
-use http::header::{self};
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index b4a7bfc0b..1f78f0ddc 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -22,7 +22,7 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
 
 use super::B2_SCHEME;
 use super::config::B2Config;
diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs
index 636200a38..b88a20598 100644
--- a/core/src/services/b2/core.rs
+++ b/core/src/services/b2/core.rs
@@ -24,9 +24,9 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::rwlock::RwLock;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::RwLock;
 
 use self::constants::X_BZ_CONTENT_SHA1;
 use self::constants::X_BZ_FILE_NAME;
diff --git a/core/src/services/dropbox/builder.rs 
b/core/src/services/dropbox/builder.rs
index e9e0ec71f..bcda4694e 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -18,7 +18,7 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::DROPBOX_SCHEME;
 use super::backend::DropboxBackend;
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index 30e61ce35..396c73fe2 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -27,9 +27,9 @@ use http::StatusCode;
 use http::header;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/gdrive/builder.rs 
b/core/src/services/gdrive/builder.rs
index df1273beb..5abe7e7ed 100644
--- a/core/src/services/gdrive/builder.rs
+++ b/core/src/services/gdrive/builder.rs
@@ -19,7 +19,7 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::GDRIVE_SCHEME;
 use super::backend::GdriveBackend;
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 7e4f00db3..0b89c14b3 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -25,9 +25,9 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde_json::json;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/koofr/backend.rs 
b/core/src/services/koofr/backend.rs
index a887b0b02..e129deded 100644
--- a/core/src/services/koofr/backend.rs
+++ b/core/src/services/koofr/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 use tokio::sync::OnceCell;
 
 use super::KOOFR_SCHEME;
diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs
index c9747501a..a12926947 100644
--- a/core/src/services/koofr/core.rs
+++ b/core/src/services/koofr/core.rs
@@ -26,9 +26,9 @@ use http::Response;
 use http::StatusCode;
 use http::header;
 use http::request;
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde_json::json;
-use tokio::sync::Mutex;
 use tokio::sync::OnceCell;
 
 use super::error::parse_error;
diff --git a/core/src/services/onedrive/builder.rs 
b/core/src/services/onedrive/builder.rs
index 659fa376d..a058d27f1 100644
--- a/core/src/services/onedrive/builder.rs
+++ b/core/src/services/onedrive/builder.rs
@@ -19,9 +19,9 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use log::debug;
+use mea::mutex::Mutex;
 use services::onedrive::core::OneDriveCore;
 use services::onedrive::core::OneDriveSigner;
-use tokio::sync::Mutex;
 
 use super::ONEDRIVE_SCHEME;
 use super::backend::OnedriveBackend;
diff --git a/core/src/services/onedrive/core.rs 
b/core/src/services/onedrive/core.rs
index 4ebd93014..1867fbb65 100644
--- a/core/src/services/onedrive/core.rs
+++ b/core/src/services/onedrive/core.rs
@@ -25,7 +25,7 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::error::parse_error;
 use super::graph_model::*;
diff --git a/core/src/services/seafile/backend.rs 
b/core/src/services/seafile/backend.rs
index 43114a8e8..d0d11de3f 100644
--- a/core/src/services/seafile/backend.rs
+++ b/core/src/services/seafile/backend.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
 
 use super::SEAFILE_SCHEME;
 use super::config::SeafileConfig;
diff --git a/core/src/services/seafile/core.rs 
b/core/src/services/seafile/core.rs
index 3a3c60e62..e26a1cf70 100644
--- a/core/src/services/seafile/core.rs
+++ b/core/src/services/seafile/core.rs
@@ -24,8 +24,8 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::rwlock::RwLock;
 use serde::Deserialize;
-use tokio::sync::RwLock;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index b16799f38..cc6c7720c 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -210,13 +210,13 @@ mod tests {
     use bytes::BufMut;
     use bytes::Bytes;
     use log::debug;
+    use mea::mutex::Mutex;
     use pretty_assertions::assert_eq;
     use rand::Rng;
     use rand::RngCore;
     use rand::thread_rng;
     use sha2::Digest;
     use sha2::Sha256;
-    use tokio::sync::Mutex;
 
     use super::*;
     use crate::raw::oio::Write;
diff --git a/integrations/object_store/Cargo.toml 
b/integrations/object_store/Cargo.toml
index 534f17310..00dbf5bcc 100644
--- a/integrations/object_store/Cargo.toml
+++ b/integrations/object_store/Cargo.toml
@@ -41,6 +41,7 @@ async-trait = "0.1"
 bytes = "1"
 chrono = { version = "0.4.42", features = ["std", "clock"] }
 futures = "0.3"
+mea = { version = "0.5.0 "}
 object_store = "0.12.3"
 opendal = { version = "0.55.0", path = "../../core", default-features = false }
 pin-project = "1.1"
diff --git a/integrations/object_store/src/service/writer.rs 
b/integrations/object_store/src/service/writer.rs
index 5dc9ec809..9b6c07dc6 100644
--- a/integrations/object_store/src/service/writer.rs
+++ b/integrations/object_store/src/service/writer.rs
@@ -23,10 +23,10 @@ use object_store::PutPayload;
 use object_store::path::Path as ObjectStorePath;
 use object_store::{Attribute, AttributeValue};
 
+use mea::mutex::Mutex;
 use opendal::raw::oio::MultipartPart;
 use opendal::raw::*;
 use opendal::*;
-use tokio::sync::Mutex;
 
 use super::core::{format_put_multipart_options, format_put_result, 
parse_op_write};
 use super::error::parse_error;
@@ -160,11 +160,7 @@ impl oio::MultipartWrite for ObjectStoreWriter {
         Ok(multipart_part)
     }
 
-    async fn complete_part(
-        &self,
-        _upload_id: &str,
-        parts: &[oio::MultipartPart],
-    ) -> Result<Metadata> {
+    async fn complete_part(&self, _upload_id: &str, parts: &[MultipartPart]) 
-> Result<Metadata> {
         // Validate that we have parts to complete
         if parts.is_empty() {
             return Err(Error::new(
diff --git a/integrations/object_store/src/store.rs 
b/integrations/object_store/src/store.rs
index 177ce72d5..175421673 100644
--- a/integrations/object_store/src/store.rs
+++ b/integrations/object_store/src/store.rs
@@ -27,6 +27,7 @@ use futures::FutureExt;
 use futures::StreamExt;
 use futures::TryStreamExt;
 use futures::stream::BoxStream;
+use mea::mutex::Mutex;
 use object_store::ListResult;
 use object_store::MultipartUpload;
 use object_store::ObjectMeta;
@@ -45,7 +46,7 @@ use opendal::options::CopyOptions;
 use opendal::raw::percent_decode_path;
 use opendal::{Operator, OperatorInfo};
 use std::collections::HashMap;
-use tokio::sync::{Mutex, Notify};
+use tokio::sync::Notify;
 
 /// OpendalStore implements ObjectStore trait by using opendal.
 ///

Reply via email to