This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new ad46d9912 feat(cluster): Add deterministic tick-based timeout
mechanism for VSR (#2450)
ad46d9912 is described below
commit ad46d991268811ab45231c71fb887595df957130
Author: Krishna Vishal <[email protected]>
AuthorDate: Tue Dec 9 00:24:35 2025 +0530
feat(cluster): Add deterministic tick-based timeout mechanism for VSR
(#2450)
This PR adds `timeout` mechanism to be used in the consensus.
`Timeout` - is strictly numerical and sync that is it only does counting
down to zero.
`TimeoutManager` - this will be used in `VsrConsensus` and this handles
ticking of all timeouts belonging to a replica.
This tick based timeout system works in two phases phase-1 does
decrement of all `Timeout`s and in 2nd phase we check each `Timeout` and
if it fires we handle by calling respective handler.
We use `xoshiro256+` rng for generating fast random numbers similar to
TB.
---
Cargo.lock | 11 ++
Cargo.toml | 1 +
DEPENDENCIES.md | 1 +
core/consensus/Cargo.toml | 2 +
core/consensus/src/lib.rs | 2 +
core/consensus/src/vsr_timeout.rs | 298 ++++++++++++++++++++++++++++++++++++++
6 files changed, 315 insertions(+)
diff --git a/Cargo.lock b/Cargo.lock
index cc9d1f831..0bc5e6a4e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1937,6 +1937,8 @@ dependencies = [
"bit-set",
"iggy_common",
"message_bus",
+ "rand 0.9.2",
+ "rand_xoshiro",
]
[[package]]
@@ -7370,6 +7372,15 @@ dependencies = [
"getrandom 0.3.4",
]
+[[package]]
+name = "rand_xoshiro"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41"
+dependencies = [
+ "rand_core 0.9.3",
+]
+
[[package]]
name = "raw-cpuid"
version = "11.6.0"
diff --git a/Cargo.toml b/Cargo.toml
index a851ec3b4..503724038 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -152,6 +152,7 @@ postcard = { version = "1.1.3", features = ["alloc"] }
predicates = "3.1.3"
quinn = "0.11.9"
rand = "0.9.2"
+rand_xoshiro = "0.7.0"
regex = "1.12.2"
reqwest = { version = "0.12.24", default-features = false, features = [
"json",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 6be507bd2..704dd5464 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -649,6 +649,7 @@ rand_chacha: 0.3.1, "Apache-2.0 OR MIT",
rand_chacha: 0.9.0, "Apache-2.0 OR MIT",
rand_core: 0.6.4, "Apache-2.0 OR MIT",
rand_core: 0.9.3, "Apache-2.0 OR MIT",
+rand_xoshiro: 0.7.0, "Apache-2.0 OR MIT",
raw-cpuid: 11.6.0, "MIT",
rayon: 1.11.0, "Apache-2.0 OR MIT",
rayon-core: 1.13.0, "Apache-2.0 OR MIT",
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index d84f8b411..2060b4bd2 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -31,3 +31,5 @@ readme = "../../../README.md"
bit-set = { workspace = true }
iggy_common = { path = "../common" }
message_bus = { path = "../message_bus" }
+rand = { workspace = true }
+rand_xoshiro = { workspace = true }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 400fcfa3e..1db81e9eb 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -41,3 +41,5 @@ pub trait Consensus {
mod impls;
pub use impls::*;
+
+mod vsr_timeout;
diff --git a/core/consensus/src/vsr_timeout.rs
b/core/consensus/src/vsr_timeout.rs
new file mode 100644
index 000000000..d572bb65e
--- /dev/null
+++ b/core/consensus/src/vsr_timeout.rs
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A deterministic, tick-based timeout mechanism for VSR consensus.
+//! This module provides deterministic timeouts driven by logical ticks rather
than
+//! wall-clock time, enabling:
+//! - Deterministic testing
+//! - Predictable behavior in consensus
+//! - Per-replica PRNG seeding for jitter
+//!
+//! Two-phase tick model:
+//!
+//! 1. Phase 1 (tick): All timeouts are advanced by one tick
+//! 2. Phase 2 (check & handle): Check which timeouts fired and invoke handlers
+
+/// A deterministic, tick-based timeout.
+///
+/// Timeouts count down from an initial duration (`after`) and fire when
+/// reaching zero. They support exponential backoff with jitter for retries.
+use rand::Rng;
+use rand_xoshiro::Xoshiro256Plus;
+use rand_xoshiro::rand_core::SeedableRng;
+
+#[derive(Debug, Clone)]
+#[allow(unused)]
+pub struct Timeout {
+ pub id: u128,
+ after: u64,
+ ticks_remaining: u64,
+ pub ticking: bool,
+ pub attempts: u32,
+}
+
+impl Timeout {
+ pub fn new(id: u128, after: u64) -> Self {
+ Self {
+ id,
+ after,
+ ticks_remaining: 0,
+ ticking: false,
+ attempts: 0,
+ }
+ }
+
+ pub fn start(&mut self) {
+ self.ticks_remaining = self.after;
+ self.ticking = true;
+ self.attempts = 0;
+ }
+
+ pub fn stop(&mut self) {
+ self.ticking = false;
+ self.ticks_remaining = 0;
+ self.attempts = 0;
+ }
+
+ pub fn reset(&mut self) {
+ self.ticks_remaining = self.after;
+ self.attempts = 0;
+ }
+
+ pub fn tick(&mut self) {
+ if self.ticking {
+ self.ticks_remaining = self.ticks_remaining.saturating_sub(1);
+ }
+ }
+
+ pub fn fired(&self) -> bool {
+ self.ticking && self.ticks_remaining == 0
+ }
+
+ pub fn backoff(&mut self, prng: &mut Xoshiro256Plus) {
+ self.attempts = self.attempts.wrapping_add(1);
+ let max_backoff = self.after.saturating_mul(16);
+ let shift = self.attempts.min(4);
+ let backoff = self.after.saturating_mul(1 << shift).min(max_backoff);
+ let jitter = prng.random_range(0..=backoff);
+ self.ticks_remaining = backoff.saturating_add(jitter);
+ }
+}
+
+/// Timeout types in VSR.
+#[allow(unused)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum TimeoutKind {
+ Ping,
+ Prepare,
+ CommitMessage,
+ NormalHeartbeat,
+ StartViewChangeMessage,
+ DoViewChangeMessage,
+ RequestStartViewMessage,
+}
+
+/// Manager for all VSR timeouts of a replica.
+#[allow(unused)]
+pub struct TimeoutManager {
+ ping: Timeout,
+ prepare: Timeout,
+ commit_message: Timeout,
+ normal_heartbeat: Timeout,
+ start_view_change_message: Timeout,
+ do_view_change_message: Timeout,
+ request_start_view_message: Timeout,
+ prng: Xoshiro256Plus,
+}
+
+#[allow(unused)]
+impl TimeoutManager {
+ // Timeout durations in ticks (10ms per tick). Values are taken from TB.
+ // TODO define 10ms per tick in a separate constant.
+ const PING_TICKS: u64 = 100;
+ const PREPARE_TICKS: u64 = 25;
+ const COMMIT_MESSAGE_TICKS: u64 = 50;
+ const NORMAL_HEARTBEAT_TICKS: u64 = 500;
+ const START_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
+ const DO_VIEW_CHANGE_MESSAGE_TICKS: u64 = 50;
+ const REQUEST_START_VIEW_MESSAGE_TICKS: u64 = 100;
+
+ pub fn new(replica_id: u128) -> Self {
+ Self {
+ ping: Timeout::new(replica_id, Self::PING_TICKS),
+ prepare: Timeout::new(replica_id, Self::PREPARE_TICKS),
+ commit_message: Timeout::new(replica_id,
Self::COMMIT_MESSAGE_TICKS),
+ normal_heartbeat: Timeout::new(replica_id,
Self::NORMAL_HEARTBEAT_TICKS),
+ start_view_change_message: Timeout::new(
+ replica_id,
+ Self::START_VIEW_CHANGE_MESSAGE_TICKS,
+ ),
+ do_view_change_message: Timeout::new(replica_id,
Self::DO_VIEW_CHANGE_MESSAGE_TICKS),
+ request_start_view_message: Timeout::new(
+ replica_id,
+ Self::REQUEST_START_VIEW_MESSAGE_TICKS,
+ ),
+ prng: Xoshiro256Plus::seed_from_u64(replica_id as u64),
+ }
+ }
+
+ /// Tick all timeouts
+ /// This is the first phase of the two-phase tick-based timeout mechanism.
+ /// 2nd phase is checking which timeouts have fired and calling the
appropriate handlers.
+ pub fn tick(&mut self) {
+ self.ping.tick();
+ self.prepare.tick();
+ self.commit_message.tick();
+ self.normal_heartbeat.tick();
+ self.start_view_change_message.tick();
+ self.do_view_change_message.tick();
+ self.request_start_view_message.tick();
+ }
+
+ pub fn fired(&self, kind: TimeoutKind) -> bool {
+ self.get(kind).fired()
+ }
+
+ pub fn get(&self, kind: TimeoutKind) -> &Timeout {
+ match kind {
+ TimeoutKind::Ping => &self.ping,
+ TimeoutKind::Prepare => &self.prepare,
+ TimeoutKind::CommitMessage => &self.commit_message,
+ TimeoutKind::NormalHeartbeat => &self.normal_heartbeat,
+ TimeoutKind::StartViewChangeMessage =>
&self.start_view_change_message,
+ TimeoutKind::DoViewChangeMessage => &self.do_view_change_message,
+ TimeoutKind::RequestStartViewMessage =>
&self.request_start_view_message,
+ }
+ }
+
+ pub fn get_mut(&mut self, kind: TimeoutKind) -> &mut Timeout {
+ match kind {
+ TimeoutKind::Ping => &mut self.ping,
+ TimeoutKind::Prepare => &mut self.prepare,
+ TimeoutKind::CommitMessage => &mut self.commit_message,
+ TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
+ TimeoutKind::StartViewChangeMessage => &mut
self.start_view_change_message,
+ TimeoutKind::DoViewChangeMessage => &mut
self.do_view_change_message,
+ TimeoutKind::RequestStartViewMessage => &mut
self.request_start_view_message,
+ }
+ }
+
+ pub fn start(&mut self, kind: TimeoutKind) {
+ self.get_mut(kind).start();
+ }
+
+ pub fn stop(&mut self, kind: TimeoutKind) {
+ self.get_mut(kind).stop();
+ }
+
+ pub fn reset(&mut self, kind: TimeoutKind) {
+ self.get_mut(kind).reset();
+ }
+
+ pub fn backoff(&mut self, kind: TimeoutKind) {
+ let timeout = match kind {
+ TimeoutKind::Ping => &mut self.ping,
+ TimeoutKind::Prepare => &mut self.prepare,
+ TimeoutKind::CommitMessage => &mut self.commit_message,
+ TimeoutKind::NormalHeartbeat => &mut self.normal_heartbeat,
+ TimeoutKind::StartViewChangeMessage => &mut
self.start_view_change_message,
+ TimeoutKind::DoViewChangeMessage => &mut
self.do_view_change_message,
+ TimeoutKind::RequestStartViewMessage => &mut
self.request_start_view_message,
+ };
+ timeout.backoff(&mut self.prng);
+ }
+
+ pub fn is_ticking(&self, kind: TimeoutKind) -> bool {
+ self.get(kind).ticking
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_timeout_lifecycle() {
+ let mut timeout = Timeout::new(0, 10);
+
+ assert!(!timeout.ticking);
+ assert!(!timeout.fired());
+
+ timeout.start();
+ assert!(timeout.ticking);
+
+ for _ in 0..9 {
+ timeout.tick();
+ assert!(!timeout.fired());
+ }
+
+ timeout.tick();
+ assert!(timeout.fired());
+ }
+
+ #[test]
+ fn test_timeout_reset() {
+ let mut timeout = Timeout::new(0, 10);
+ timeout.start();
+
+ for _ in 0..5 {
+ timeout.tick();
+ }
+
+ timeout.reset();
+ assert!(timeout.ticking);
+
+ for _ in 0..9 {
+ timeout.tick();
+ assert!(!timeout.fired());
+ }
+
+ timeout.tick();
+ assert!(timeout.fired());
+ }
+
+ #[test]
+ fn test_timeout_stop() {
+ let mut timeout = Timeout::new(0, 10);
+ timeout.start();
+
+ for _ in 0..5 {
+ timeout.tick();
+ }
+
+ timeout.stop();
+ assert!(!timeout.ticking);
+
+ for _ in 0..10 {
+ timeout.tick();
+ }
+ assert!(!timeout.fired());
+ }
+
+ #[test]
+ fn test_backoff_increases() {
+ let mut timeout = Timeout::new(0, 10);
+ let mut prng = Xoshiro256Plus::seed_from_u64(42);
+
+ timeout.start();
+ let initial = timeout.ticks_remaining;
+
+ timeout.backoff(&mut prng);
+ // After backoff, ticks_remaining should be larger than initial with
some jitter.
+ assert!(timeout.ticks_remaining >= initial);
+ }
+}