numinnex commented on code in PR #2426:
URL: https://github.com/apache/iggy/pull/2426#discussion_r2578293194
##########
core/common/src/types/consensus/header.rs:
##########
@@ -156,6 +156,7 @@ pub struct RequestHeader {
pub replica: u8,
pub reserved_frame: [u8; 12],
+ pub client: u128,
Review Comment:
There are two clients, do we need that ?
##########
core/consensus/src/impls.rs:
##########
@@ -16,44 +16,444 @@
// under the License.
use crate::{Consensus, Project};
+use bit_set::BitSet;
use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader,
RequestHeader};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
-pub struct VsrConsensus {
+pub trait Sequencer {
Review Comment:
Rename from `op` to `sequence` and also make associated type `Sequence`
instead of using `u64`
##########
core/consensus/src/impls.rs:
##########
@@ -16,44 +16,444 @@
// under the License.
use crate::{Consensus, Project};
+use bit_set::BitSet;
use iggy_common::header::{Command2, PrepareHeader, PrepareOkHeader,
RequestHeader};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
-use std::cell::Cell;
+use std::cell::{Cell, RefCell};
+use std::collections::VecDeque;
-pub struct VsrConsensus {
+pub trait Sequencer {
+ /// Get the current op number
+ fn current_op(&self) -> u64;
+
+ /// Allocate the next op number.
+ /// TODO Should this return a Future<Output = u64>? for async case?
+ fn next_op(&self) -> u64;
+
+ /// Update the current op number.
+ fn set_op(&self, op: u64);
+}
+
+pub struct LocalSequencer {
op: Cell<u64>,
}
-impl VsrConsensus {
- pub fn advance_commit_number(&self) {}
+impl LocalSequencer {
+ pub fn new(initial_op: u64) -> Self {
+ Self {
+ op: Cell::new(initial_op),
+ }
+ }
+}
- pub fn update_op(&self, op: u64) {
- self.op.set(op);
+impl Sequencer for LocalSequencer {
+ fn current_op(&self) -> u64 {
+ self.op.get()
}
- pub fn op(&self) -> u64 {
+ fn next_op(&self) -> u64 {
+ self.op.set(self.op.get() + 1);
self.op.get()
}
+
+ fn set_op(&self, op: u64) {
+ self.op.set(op);
+ }
+}
+
+/// TODO The below numbers need to be added a consensus config
+/// TODO understand how to configure these numbers.
+/// Maximum number of prepares that can be in-flight in the pipeline.
+pub const PIPELINE_PREPARE_QUEUE_MAX: usize = 8;
+/// Maximum number of requests waiting to be prepared.
+pub const PIPELINE_REQUEST_QUEUE_MAX: usize = 8;
+
+/// Maximum number of replicas in a cluster.
+pub const REPLICAS_MAX: usize = 32;
+
+pub struct PipelineEntry {
+ pub message: Message<PrepareHeader>,
+ /// Bitmap of replicas that have acknowledged this prepare.
+ pub ok_from_replicas: BitSet<u32>,
+ /// Whether we've received a quorum of prepare_ok messages.
+ pub ok_quorum_received: bool,
+}
+
+impl PipelineEntry {
+ pub fn new(message: Message<PrepareHeader>) -> Self {
+ Self {
+ message,
+ ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX),
+ ok_quorum_received: false,
+ }
+ }
+
+ /// Record a prepare_ok from the given replica.
+ /// Returns the new count of acknowledgments.
+ pub fn add_ack(&mut self, replica: u8) -> usize {
+ self.ok_from_replicas.insert(replica as usize);
+ self.ok_from_replicas.len()
+ }
+
+ /// Check if we have an ack from the given replica.
+ pub fn has_ack(&self, replica: u8) -> bool {
+ self.ok_from_replicas.contains(replica as usize)
+ }
+
+ /// Get the number of acks received.
+ pub fn ack_count(&self) -> usize {
+ self.ok_from_replicas.len()
+ }
+}
+
+/// A request message waiting to be prepared.
+pub struct RequestEntry {
+ pub message: Message<RequestHeader>,
+ /// Timestamp when the request was received (for ordering/timeout).
+ pub received_at: i64, //TODO figure the correct way to do this
+}
+
+impl RequestEntry {
+ pub fn new(message: Message<RequestHeader>) -> Self {
+ Self {
+ message,
+ received_at: 0, //TODO figure the correct way to do this
+ }
+ }
+}
+
+pub struct Pipeline {
+ /// Messages being prepared (uncommitted and being replicated).
+ prepare_queue: VecDeque<PipelineEntry>,
+ /// Messages accepted from clients but not yet preparing.
+ request_queue: VecDeque<RequestEntry>,
Review Comment:
I think we won't use the `Request` queue, as reaching pipeline limit will
function as backpressure for us. Remove this element.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]