This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch use-mea-sync
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 60aa0808e5e922b2d456d893ccc8a4bb084512c9
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 15:42:29 2025 +0800

    for notify
    
    Signed-off-by: tison <[email protected]>
---
 bindings/cpp/Cargo.toml                |  2 +-
 bindings/python/Cargo.toml             |  2 +-
 core/Cargo.lock                        |  4 ++--
 core/Cargo.toml                        |  2 +-
 integrations/object_store/src/store.rs | 22 +++++++++++-----------
 5 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index d727c4ffd..41c443734 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,7 +34,7 @@ anyhow = { version = "1.0.100" }
 cxx = { version = "1.0.186" }
 cxx-async = { version = "0.1.3", optional = true }
 futures = { version = "0.3.31" }
-mea = { version = "0.5.0" }
+mea = { version = "0.5.1" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = ["blocking"] }
 tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index 1c0889429..7d204be47 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -201,7 +201,7 @@ bytes = "1.5.0"
 dict_derive = "0.6.0"
 futures = "0.3.28"
 jiff = { version = "0.2.15" }
-mea = { version = "0.5.0" }
+mea = { version = "0.5.1" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = [
   "blocking",
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 7498081a6..7ab3d1485 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -4772,9 +4772,9 @@ dependencies = [
 
 [[package]]
 name = "mea"
-version = "0.5.0"
+version = "0.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1f3e2d273c5e8d75098efdc751378787e9197e242abe344b9326a117242c3f9d"
+checksum = "c84bb668065d5f9eca80c5d072bb3357fa7e09fb6dbfed2f08c8f99f1749d302"
 dependencies = [
  "slab",
 ]
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f95075b88..16224b93b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -252,7 +252,7 @@ http-body = "1"
 jiff = { version = "0.2.15", features = ["serde"] }
 log = "0.4"
 md-5 = "0.10"
-mea = { version = "0.5.0" }
+mea = { version = "0.5.1" }
 percent-encoding = "2"
 quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
 reqwest = { version = "0.12.24", features = [
diff --git a/integrations/object_store/src/store.rs 
b/integrations/object_store/src/store.rs
index 175421673..08a377c18 100644
--- a/integrations/object_store/src/store.rs
+++ b/integrations/object_store/src/store.rs
@@ -28,6 +28,7 @@ use futures::StreamExt;
 use futures::TryStreamExt;
 use futures::stream::BoxStream;
 use mea::mutex::Mutex;
+use mea::oneshot;
 use object_store::ListResult;
 use object_store::MultipartUpload;
 use object_store::ObjectMeta;
@@ -46,7 +47,6 @@ use opendal::options::CopyOptions;
 use opendal::raw::percent_decode_path;
 use opendal::{Operator, OperatorInfo};
 use std::collections::HashMap;
-use tokio::sync::Notify;
 
 /// OpendalStore implements ObjectStore trait by using opendal.
 ///
@@ -609,15 +609,18 @@ impl ObjectStore for OpendalStore {
 struct OpendalMultipartUpload {
     writer: Arc<Mutex<Writer>>,
     location: Path,
-    next_notify: Option<Arc<Notify>>,
+    next_notify: oneshot::Receiver<()>,
 }
 
 impl OpendalMultipartUpload {
     fn new(writer: Writer, location: Path) -> Self {
+        // an immediately dropped sender for the first part to write without 
waiting
+        let (_, rx) = oneshot::channel();
+
         Self {
             writer: Arc::new(Mutex::new(writer)),
             location,
-            next_notify: None,
+            next_notify: rx,
         }
     }
 }
@@ -629,16 +632,13 @@ impl MultipartUpload for OpendalMultipartUpload {
         let location = self.location.clone();
 
         // Generate next notify which will be notified after the current part 
is written.
-        let next_notify = Arc::new(Notify::new());
+        let (tx, rx) = oneshot::channel();
         // Fetch the notify for current part to wait for it to be written.
-        let current_notify = self.next_notify.replace(next_notify.clone());
+        let last_rx = std::mem::replace(&mut self.next_notify, rx);
 
         async move {
-            // current_notify == None means that it's the first part, we don't 
need to wait.
-            if let Some(notify) = current_notify {
-                // Wait for the previous part to be written
-                notify.notified().await;
-            }
+            // Wait for the previous part to be written
+            let _ = last_rx.await;
 
             let mut writer = writer.lock().await;
             let result = writer
@@ -647,7 +647,7 @@ impl MultipartUpload for OpendalMultipartUpload {
                 .map_err(|err| format_object_store_error(err, 
location.as_ref()));
 
             // Notify the next part to be written
-            next_notify.notify_one();
+            drop(tx);
 
             result
         }

Reply via email to