This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b4ee05ef4 fix(layers/throttle): await limiter before throttled writes 
(#6671)
b4ee05ef4 is described below

commit b4ee05ef4c442508f7a0e39d2b072a6160f0772b
Author: TennyZhuang <[email protected]>
AuthorDate: Wed Oct 15 19:41:39 2025 +0800

    fix(layers/throttle): await limiter before throttled writes (#6671)
---
 core/src/layers/throttle.rs | 46 ++++++++++++++++++++++-----------------------
 1 file changed, 22 insertions(+), 24 deletions(-)

diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3e1779123..1394aaaaf 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 
 use governor::Quota;
 use governor::RateLimiter;
-use governor::clock::Clock;
 use governor::clock::DefaultClock;
 use governor::middleware::NoOpMiddleware;
 use governor::state::InMemoryState;
@@ -170,30 +169,29 @@ impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
 
 impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
-        let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
-
-        loop {
-            match self.limiter.check_n(buf_length) {
-                Ok(res) => match res {
-                    Ok(_) => return self.inner.write(bs).await,
-                    // the query is valid but the Decider can not accommodate 
them.
-                    Err(not_until) => {
-                        let _ = 
not_until.wait_time_from(DefaultClock::default().now());
-                        // TODO: Should lock the limiter and wait for the 
wait_time, or should let other small requests go first?
-
-                        // FIXME: we should sleep here.
-                        // tokio::time::sleep(wait_time).await;
-                    }
-                },
-                // the query was invalid as the rate limit parameters can 
"never" accommodate the number of cells queried for.
-                Err(_) => {
-                    return Err(Error::new(
-                        ErrorKind::RateLimited,
-                        "InsufficientCapacity due to burst size being smaller 
than the request size",
-                    ));
-                }
-            }
+        let len = bs.len();
+        if len == 0 {
+            return self.inner.write(bs).await;
         }
+
+        if len > u32::MAX as usize {
+            return Err(Error::new(
+                ErrorKind::RateLimited,
+                "request size exceeds throttle quota capacity",
+            ));
+        }
+
+        let buf_length =
+            NonZeroU32::new(len as u32).expect("len is non-zero so NonZeroU32 
must exist");
+
+        self.limiter.until_n_ready(buf_length).await.map_err(|_| {
+            Error::new(
+                ErrorKind::RateLimited,
+                "burst size is smaller than the request size",
+            )
+        })?;
+
+        self.inner.write(bs).await
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to