This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch use-mea-sync in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 60aa0808e5e922b2d456d893ccc8a4bb084512c9 Author: tison <[email protected]> AuthorDate: Tue Nov 25 15:42:29 2025 +0800 for notify Signed-off-by: tison <[email protected]> --- bindings/cpp/Cargo.toml | 2 +- bindings/python/Cargo.toml | 2 +- core/Cargo.lock | 4 ++-- core/Cargo.toml | 2 +- integrations/object_store/src/store.rs | 22 +++++++++++----------- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml index d727c4ffd..41c443734 100644 --- a/bindings/cpp/Cargo.toml +++ b/bindings/cpp/Cargo.toml @@ -34,7 +34,7 @@ anyhow = { version = "1.0.100" } cxx = { version = "1.0.186" } cxx-async = { version = "0.1.3", optional = true } futures = { version = "0.3.31" } -mea = { version = "0.5.0" } +mea = { version = "0.5.1" } # this crate won't be published, we always use the local version opendal = { version = ">=0", path = "../../core", features = ["blocking"] } tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] } diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 1c0889429..7d204be47 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -201,7 +201,7 @@ bytes = "1.5.0" dict_derive = "0.6.0" futures = "0.3.28" jiff = { version = "0.2.15" } -mea = { version = "0.5.0" } +mea = { version = "0.5.1" } # this crate won't be published, we always use the local version opendal = { version = ">=0", path = "../../core", features = [ "blocking", diff --git a/core/Cargo.lock b/core/Cargo.lock index 7498081a6..7ab3d1485 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -4772,9 +4772,9 @@ dependencies = [ [[package]] name = "mea" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3e2d273c5e8d75098efdc751378787e9197e242abe344b9326a117242c3f9d" +checksum = "c84bb668065d5f9eca80c5d072bb3357fa7e09fb6dbfed2f08c8f99f1749d302" dependencies = [ "slab", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index f95075b88..16224b93b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -252,7 +252,7 @@ http-body = "1" jiff = { version = "0.2.15", features = ["serde"] } log = "0.4" md-5 = "0.10" -mea = { version = "0.5.0" } +mea = { version = "0.5.1" } percent-encoding = "2" quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] } reqwest = { version = "0.12.24", features = [ diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index 175421673..08a377c18 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -28,6 +28,7 @@ use futures::StreamExt; use futures::TryStreamExt; use futures::stream::BoxStream; use mea::mutex::Mutex; +use mea::oneshot; use object_store::ListResult; use object_store::MultipartUpload; use object_store::ObjectMeta; @@ -46,7 +47,6 @@ use opendal::options::CopyOptions; use opendal::raw::percent_decode_path; use opendal::{Operator, OperatorInfo}; use std::collections::HashMap; -use tokio::sync::Notify; /// OpendalStore implements ObjectStore trait by using opendal. /// @@ -609,15 +609,18 @@ impl ObjectStore for OpendalStore { struct OpendalMultipartUpload { writer: Arc<Mutex<Writer>>, location: Path, - next_notify: Option<Arc<Notify>>, + next_notify: oneshot::Receiver<()>, } impl OpendalMultipartUpload { fn new(writer: Writer, location: Path) -> Self { + // an immediately dropped sender for the first part to write without waiting + let (_, rx) = oneshot::channel(); + Self { writer: Arc::new(Mutex::new(writer)), location, - next_notify: None, + next_notify: rx, } } } @@ -629,16 +632,13 @@ impl MultipartUpload for OpendalMultipartUpload { let location = self.location.clone(); // Generate next notify which will be notified after the current part is written. - let next_notify = Arc::new(Notify::new()); + let (tx, rx) = oneshot::channel(); // Fetch the notify for current part to wait for it to be written. - let current_notify = self.next_notify.replace(next_notify.clone()); + let last_rx = std::mem::replace(&mut self.next_notify, rx); async move { - // current_notify == None means that it's the first part, we don't need to wait. - if let Some(notify) = current_notify { - // Wait for the previous part to be written - notify.notified().await; - } + // Wait for the previous part to be written + let _ = last_rx.await; let mut writer = writer.lock().await; let result = writer @@ -647,7 +647,7 @@ impl MultipartUpload for OpendalMultipartUpload { .map_err(|err| format_object_store_error(err, location.as_ref())); // Notify the next part to be written - next_notify.notify_one(); + drop(tx); result }
