This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new fb9f447d feat: add exponential backoff when retry (#1486)
fb9f447d is described below

commit fb9f447dee6d6026ce3472146e56b71cfafe247b
Author: MianChen <[email protected]>
AuthorDate: Tue Feb 27 15:27:38 2024 +0800

    feat: add exponential backoff when retry (#1486)
    
    ## Rationale
    
    Add exponential backoff retry.
    
    ## Detailed Changes
    
    follow
    
[this](https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26)
    backoff implementation.
    
    ## Test Plan
    
    New UT.
    
    ---------
    
    Co-authored-by: zealchen <[email protected]>
---
 Cargo.lock                                    |  15 ++--
 Cargo.toml                                    |   2 +-
 src/analytic_engine/src/sst/file.rs           |   8 +-
 src/components/future_ext/Cargo.toml          |   1 +
 src/components/future_ext/src/lib.rs          |   2 +-
 src/components/future_ext/src/retry.rs        | 123 ++++++++++++++++++++++++--
 src/components/skiplist/benches/bench.rs      |   8 +-
 src/components/table_kv/src/obkv/tests.rs     |   2 +-
 src/components/table_kv/src/tests.rs          |   2 +-
 src/meta_client/src/load_balance.rs           |   2 +-
 src/server/src/grpc/meta_event_service/mod.rs |   8 +-
 11 files changed, 147 insertions(+), 26 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f43a9036..36c989c6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -121,7 +121,7 @@ dependencies = [
  "pin-project-lite",
  "prometheus 0.12.0",
  "prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
  "remote_engine_client",
  "router",
  "runtime",
@@ -899,7 +899,7 @@ dependencies = [
  "parquet",
  "parquet_ext",
  "pprof",
- "rand 0.7.3",
+ "rand 0.8.5",
  "runtime",
  "serde",
  "size_ext",
@@ -1532,7 +1532,7 @@ dependencies = [
  "macros",
  "paste 1.0.12",
  "prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
  "seahash",
  "serde",
  "serde_json",
@@ -2692,6 +2692,7 @@ dependencies = [
  "futures 0.3.28",
  "lazy_static",
  "prometheus 0.12.0",
+ "rand 0.8.5",
  "runtime",
  "tokio",
 ]
@@ -4534,7 +4535,7 @@ dependencies = [
  "prometheus 0.12.0",
  "prometheus-static-metric",
  "prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
  "runtime",
  "serde",
  "serde_json",
@@ -6599,7 +6600,7 @@ dependencies = [
  "arena",
  "bytes",
  "criterion",
- "rand 0.7.3",
+ "rand 0.8.5",
  "yatp",
 ]
 
@@ -7009,7 +7010,7 @@ dependencies = [
  "logger",
  "macros",
  "prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
  "regex",
  "runtime",
  "serde",
@@ -7029,7 +7030,7 @@ dependencies = [
  "macros",
  "obkv-table-client-rs",
  "prometheus 0.12.0",
- "rand 0.7.3",
+ "rand 0.8.5",
  "serde",
  "snafu 0.6.10",
  "time_ext",
diff --git a/Cargo.toml b/Cargo.toml
index b41694b3..5bb84523 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -154,7 +154,7 @@ prost = "0.11"
 proxy = { path = "src/proxy" }
 query_engine = { path = "src/query_engine" }
 query_frontend = { path = "src/query_frontend" }
-rand = "0.7"
+rand = "0.8.5"
 regex = "1"
 remote_engine_client = { path = "src/remote_engine_client" }
 reqwest = { version = "0.11", default-features = false, features = [
diff --git a/src/analytic_engine/src/sst/file.rs 
b/src/analytic_engine/src/sst/file.rs
index f2aa9ec6..39cdc7c7 100644
--- a/src/analytic_engine/src/sst/file.rs
+++ b/src/analytic_engine/src/sst/file.rs
@@ -34,7 +34,7 @@ use common_types::{
     time::{TimeRange, Timestamp},
     SequenceNumber,
 };
-use future_ext::{retry_async, RetryConfig};
+use future_ext::{retry_async, BackoffConfig, RetryConfig};
 use logger::{error, info, trace, warn};
 use macros::define_result;
 use metric_ext::Meter;
@@ -540,7 +540,11 @@ pub struct FilePurger {
 impl FilePurger {
     const RETRY_CONFIG: RetryConfig = RetryConfig {
         max_retries: 3,
-        interval: Duration::from_millis(500),
+        backoff: BackoffConfig {
+            init_backoff: Duration::from_millis(500),
+            max_backoff: Duration::from_secs(5),
+            base: 3.,
+        },
     };
 
     pub fn start(runtime: &Runtime, store: ObjectStoreRef) -> Self {
diff --git a/src/components/future_ext/Cargo.toml 
b/src/components/future_ext/Cargo.toml
index f9be39c1..1bc72d52 100644
--- a/src/components/future_ext/Cargo.toml
+++ b/src/components/future_ext/Cargo.toml
@@ -34,5 +34,6 @@ workspace = true
 futures = { workspace = true }
 lazy_static = { workspace = true }
 prometheus = { workspace = true }
+rand = { workspace = true }
 runtime = { workspace = true }
 tokio = { workspace = true, features = ["time"] }
diff --git a/src/components/future_ext/src/lib.rs 
b/src/components/future_ext/src/lib.rs
index 083908ed..ab8d71a2 100644
--- a/src/components/future_ext/src/lib.rs
+++ b/src/components/future_ext/src/lib.rs
@@ -21,4 +21,4 @@ mod cancel;
 mod retry;
 
 pub use cancel::CancellationSafeFuture;
-pub use retry::{retry_async, RetryConfig};
+pub use retry::{retry_async, BackoffConfig, RetryConfig};
diff --git a/src/components/future_ext/src/retry.rs 
b/src/components/future_ext/src/retry.rs
index 43df7e50..ea646b63 100644
--- a/src/components/future_ext/src/retry.rs
+++ b/src/components/future_ext/src/retry.rs
@@ -20,35 +20,101 @@
 use std::time::Duration;
 
 use futures::Future;
+use rand::prelude::*;
 
-// TODO: add backoff
-// 
https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26
 pub struct RetryConfig {
     pub max_retries: usize,
-    pub interval: Duration,
+    pub backoff: BackoffConfig,
 }
 
 impl Default for RetryConfig {
     fn default() -> Self {
         Self {
             max_retries: 3,
-            interval: Duration::from_millis(500),
+            backoff: Default::default(),
+        }
+    }
+}
+
+// This backoff implementation is ported from
+// 
https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26
+pub struct BackoffConfig {
+    /// The initial backoff duration
+    pub init_backoff: Duration,
+    /// The maximum backoff duration
+    pub max_backoff: Duration,
+    /// The base of the exponential to use
+    pub base: f64,
+}
+
+impl Default for BackoffConfig {
+    fn default() -> Self {
+        Self {
+            init_backoff: Duration::from_millis(100),
+            max_backoff: Duration::from_secs(15),
+            base: 2.,
         }
     }
 }
 
+pub struct Backoff {
+    init_backoff: f64,
+    next_backoff_secs: f64,
+    max_backoff_secs: f64,
+    base: f64,
+    rng: Option<Box<dyn RngCore + Sync + Send>>,
+}
+
+impl Backoff {
+    /// Create a new [`Backoff`] from the provided [`BackoffConfig`]
+    pub fn new(config: &BackoffConfig) -> Self {
+        Self::new_with_rng(config, None)
+    }
+
+    /// Creates a new `Backoff` with the optional `rng`
+    ///
+    /// Used [`rand::thread_rng()`] if no rng provided
+    pub fn new_with_rng(
+        config: &BackoffConfig,
+        rng: Option<Box<dyn RngCore + Sync + Send>>,
+    ) -> Self {
+        let init_backoff = config.init_backoff.as_secs_f64();
+        Self {
+            init_backoff,
+            next_backoff_secs: init_backoff,
+            max_backoff_secs: config.max_backoff.as_secs_f64(),
+            base: config.base,
+            rng,
+        }
+    }
+
+    /// Returns the next backoff duration to wait for
+    pub fn next(&mut self) -> Duration {
+        let range = self.init_backoff..(self.next_backoff_secs * self.base);
+
+        let rand_backoff = match self.rng.as_mut() {
+            Some(rng) => rng.gen_range(range),
+            None => thread_rng().gen_range(range),
+        };
+
+        let next_backoff = self.max_backoff_secs.min(rand_backoff);
+        Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, 
next_backoff))
+    }
+}
+
 pub async fn retry_async<F, Fut, T, E>(f: F, config: &RetryConfig) -> 
Fut::Output
 where
     F: Fn() -> Fut,
     Fut: Future<Output = Result<T, E>>,
 {
+    let mut backoff = Backoff::new(&config.backoff);
     for _ in 0..config.max_retries {
         let result = f().await;
 
         if result.is_ok() {
             return result;
         }
-        tokio::time::sleep(config.interval).await;
+        tokio::time::sleep(backoff.next()).await;
     }
 
     f().await
@@ -58,13 +124,15 @@ where
 mod tests {
     use std::sync::atomic::{AtomicU8, Ordering};
 
+    use rand::rngs::mock::StepRng;
+
     use super::*;
 
     #[tokio::test]
     async fn test_retry_async() {
         let config = RetryConfig {
             max_retries: 3,
-            interval: Duration::from_millis(5),
+            backoff: Default::default(),
         };
 
         // always fails
@@ -109,4 +177,47 @@ mod tests {
             assert_eq!(3, runs.load(Ordering::Relaxed));
         }
     }
+
+    #[test]
+    fn test_backoff() {
+        let init_backoff_secs = 1.0;
+        let max_backoff_secs = 500.0;
+        let base = 3.0;
+
+        let config = BackoffConfig {
+            init_backoff: Duration::from_secs_f64(init_backoff_secs),
+            max_backoff: Duration::from_secs_f64(max_backoff_secs),
+            base,
+        };
+
+        let assert_fuzzy_eq = |a: f64, b: f64| assert!((b - a).abs() < 0.0001, 
"{a} != {b}");
+
+        // Create a static rng that takes the minimum of the range
+        let rng = Box::new(StepRng::new(0, 0));
+        let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+        for _ in 0..20 {
+            assert_eq!(backoff.next().as_secs_f64(), init_backoff_secs);
+        }
+
+        // Create a static rng that takes the maximum of the range
+        let rng = Box::new(StepRng::new(u64::MAX, 0));
+        let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+        for i in 0..20 {
+            let value = (base.powi(i) * 
init_backoff_secs).min(max_backoff_secs);
+            assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
+        }
+
+        // Create a static rng that takes the mid point of the range
+        let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
+        let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+        let mut value = init_backoff_secs;
+        for _ in 0..20 {
+            assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
+            value =
+                (init_backoff_secs + (value * base - init_backoff_secs) / 
2.).min(max_backoff_secs);
+        }
+    }
 }
diff --git a/src/components/skiplist/benches/bench.rs 
b/src/components/skiplist/benches/bench.rs
index 66c51156..b048f737 100644
--- a/src/components/skiplist/benches/bench.rs
+++ b/src/components/skiplist/benches/bench.rs
@@ -76,13 +76,13 @@ fn bench_read_write_skiplist_frac(b: &mut Bencher<'_>, 
frac: &usize) {
         let mut rng = rand::thread_rng();
         while !s.load(Ordering::SeqCst) {
             let key = random_key(&mut rng);
-            let case = (key, frac > rng.gen_range(0, 11));
+            let case = (key, frac > rng.gen_range(0..11));
             skiplist_round(&l, &case, &v);
         }
     });
     let mut rng = rand::thread_rng();
     b.iter_batched_ref(
-        || (random_key(&mut rng), frac > rng.gen_range(0, 11)),
+        || (random_key(&mut rng), frac > rng.gen_range(0..11)),
         |case| skiplist_round(&list, case, &value),
         BatchSize::SmallInput,
     );
@@ -127,7 +127,7 @@ fn bench_read_write_map_frac(b: &mut Bencher<'_>, frac: 
&usize) {
     let handle = thread::spawn(move || {
         let mut rng = rand::thread_rng();
         while !thread_stop.load(Ordering::SeqCst) {
-            let f = rng.gen_range(0, 11);
+            let f = rng.gen_range(0..11);
             let case = (random_key(&mut rng), f < frac);
             map_round(&map_in_thread, &case, &v);
         }
@@ -135,7 +135,7 @@ fn bench_read_write_map_frac(b: &mut Bencher<'_>, frac: 
&usize) {
     let mut rng = rand::thread_rng();
     b.iter_batched_ref(
         || {
-            let f = rng.gen_range(0, 11);
+            let f = rng.gen_range(0..11);
             (random_key(&mut rng), f < frac)
         },
         |case| map_round(&map, case, &value),
diff --git a/src/components/table_kv/src/obkv/tests.rs 
b/src/components/table_kv/src/obkv/tests.rs
index 8f21d0f8..96b88bf9 100644
--- a/src/components/table_kv/src/obkv/tests.rs
+++ b/src/components/table_kv/src/obkv/tests.rs
@@ -147,7 +147,7 @@ impl Drop for ObkvTester {
 
 fn random_table_name(prefix: &str) -> String {
     let mut rng = thread_rng();
-    let v: u32 = rng.gen_range(0, MAX_TABLE_ID);
+    let v: u32 = rng.gen_range(0..MAX_TABLE_ID);
 
     format!("{prefix}_{v}")
 }
diff --git a/src/components/table_kv/src/tests.rs 
b/src/components/table_kv/src/tests.rs
index 1f95d695..a2e5b69c 100644
--- a/src/components/table_kv/src/tests.rs
+++ b/src/components/table_kv/src/tests.rs
@@ -161,7 +161,7 @@ fn new_memory_tester() -> TableKvTester<MemoryImpl> {
 
 fn random_table_name(prefix: &str) -> String {
     let mut rng = thread_rng();
-    let v: u32 = rng.gen_range(0, MAX_TABLE_ID);
+    let v: u32 = rng.gen_range(0..MAX_TABLE_ID);
 
     format!("{prefix}_{v}")
 }
diff --git a/src/meta_client/src/load_balance.rs 
b/src/meta_client/src/load_balance.rs
index 5c2a92ac..79ca96b2 100644
--- a/src/meta_client/src/load_balance.rs
+++ b/src/meta_client/src/load_balance.rs
@@ -46,7 +46,7 @@ impl LoadBalancer for RandomLoadBalancer {
             return Ok(&addresses[0]);
         }
         let mut rng = rand::thread_rng();
-        let idx = rng.gen_range(0, len);
+        let idx = rng.gen_range(0..len);
 
         Ok(&addresses[idx])
     }
diff --git a/src/server/src/grpc/meta_event_service/mod.rs 
b/src/server/src/grpc/meta_event_service/mod.rs
index de5a6af0..75fe48ea 100644
--- a/src/server/src/grpc/meta_event_service/mod.rs
+++ b/src/server/src/grpc/meta_event_service/mod.rs
@@ -37,7 +37,7 @@ use common_types::{
     schema::SchemaEncoder,
     table::{ShardId, ShardVersion},
 };
-use future_ext::RetryConfig;
+use future_ext::{BackoffConfig, RetryConfig};
 use generic_error::BoxError;
 use horaedbproto::meta_event::{
     meta_event_service_server::MetaEventService, ChangeShardRoleRequest, 
ChangeShardRoleResponse,
@@ -101,7 +101,11 @@ macro_rules! extract_updated_table_info {
 // TODO: configure retry
 const RETRY: RetryConfig = RetryConfig {
     max_retries: 10,
-    interval: Duration::from_secs(5),
+    backoff: BackoffConfig {
+        init_backoff: Duration::from_secs(1),
+        max_backoff: Duration::from_secs(5),
+        base: 2.0,
+    },
 };
 
 /// Builder for [MetaServiceImpl].


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to