krishvishal commented on code in PR #3223:
URL: https://github.com/apache/iggy/pull/3223#discussion_r3228526421
##########
core/consensus/src/client_table.rs:
##########
@@ -15,207 +15,160 @@
// specific language governing permissions and limitations
// under the License.
+use iggy_binary_protocol::consensus::MESSAGE_ALIGN;
+use iggy_binary_protocol::consensus::iobuf::Frozen;
use iggy_binary_protocol::{Message, ReplyHeader};
-use std::cell::RefCell;
use std::collections::HashMap;
-use std::future::Future;
-use std::rc::Rc;
-use std::task::Waker;
-
-/// Identifies a specific request from a specific client.
-/// Used as the key for the pending-commit waiter map.
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
-pub struct ClientRequest {
- pub client_id: u128,
- pub request: u64,
-}
+use std::mem::size_of;
+use tracing::trace;
-/// Inner state shared between `Notify` clones via `Rc`.
-#[derive(Debug)]
-struct NotifyInner {
- waker: RefCell<Option<Waker>>,
- notified: std::cell::Cell<bool>,
-}
-
-/// Lightweight, single-threaded async notification primitive.
-///
-/// ## Usage
+/// Refcounted wrapper around a committed reply.
///
-/// ```ignore
-/// let notify = Notify::new();
-/// let waiter = notify.clone();
+/// Bytes are deterministic across replicas: `build_reply_message` reads
+/// only from the prepare header, so a backup-promoted primary replays
+/// the exact bytes the original primary produced.
///
-/// // Producer side (in commit_reply):
-/// notify.notify();
-///
-/// // Consumer side (caller awaiting the commit):
-/// waiter.notified().await;
-/// ```
+/// Immutable by construction: [`Frozen`] has no mutable accessor.
#[derive(Debug, Clone)]
-pub struct Notify {
- inner: Rc<NotifyInner>,
+pub struct CachedReply {
+ bytes: Frozen<MESSAGE_ALIGN>,
}
-impl Notify {
- /// Create a new `Notify` in the un-notified state.
+impl CachedReply {
+ /// Reply header view.
+ ///
+ /// # Panics
+ /// Unreachable: prefix validated by [`Message::try_from`] at construction;
+ /// `Frozen` has no mutable accessor.
#[must_use]
- pub fn new() -> Self {
- Self {
- inner: Rc::new(NotifyInner {
- waker: RefCell::new(None),
- notified: std::cell::Cell::new(false),
- }),
- }
+ pub fn header(&self) -> &ReplyHeader {
+
bytemuck::checked::try_from_bytes(&self.bytes.as_slice()[..size_of::<ReplyHeader>()])
+ .expect("cached reply bytes contain a valid ReplyHeader (validated
at storage time)")
}
- /// Wake the waiter, if any. If `notified()` is polled later, it will
- /// resolve immediately.
- pub fn notify(&self) {
- self.inner.notified.set(true);
- if let Some(waker) = self.inner.waker.borrow_mut().take() {
- waker.wake();
- }
- }
-
- /// Returns a future that resolves when [`notify()`](Self::notify) is
called.
+ /// Consume into wire-shareable [`Frozen`] buffer.
///
- /// If `notify()` was already called before this future is polled, it
- /// resolves immediately (permit is consumed).
- #[allow(clippy::future_not_send)]
- pub fn notified(&self) -> impl Future<Output = ()> + '_ {
- std::future::poll_fn(move |cx| {
- if self.inner.notified.get() {
- self.inner.notified.set(false);
- std::task::Poll::Ready(())
- } else {
- *self.inner.waker.borrow_mut() = Some(cx.waker().clone());
- std::task::Poll::Pending
- }
- })
+ /// `MessageBus::send_to_client` takes `Frozen<MESSAGE_ALIGN>` directly.
+ /// To retain the cached entry, `.clone()` (Arc bump) first.
+ #[must_use]
+ pub fn into_wire_bytes(self) -> Frozen<MESSAGE_ALIGN> {
+ self.bytes
}
}
-impl Default for Notify {
- fn default() -> Self {
- Self::new()
+impl CachedReply {
+ /// Freeze owned buffer in place; no alloc. Subsequent `Clone`s are Arc
bumps.
+ ///
+ /// `pub(crate)` so [`Self::header`]'s validity invariant cannot be
+ /// bypassed by an unvalidated buffer from outside the crate.
+ pub(crate) fn from_message(msg: Message<ReplyHeader>) -> Self {
+ Self {
+ bytes: msg.into_generic().into_frozen(),
+ }
}
}
-/// Per-client entry in the clients table (VR paper Section 4, Figure 2).
+/// Reserved request number for
[`Operation::Register`](iggy_binary_protocol::Operation::Register).
+/// Real requests start at 1 (header validation enforces `request > 0`).
+pub const REGISTER_REQUEST_ID: u64 = 0;
+
+/// Per-client entry (VR paper ยง4, Fig. 2): session + latest committed reply.
///
-/// Stores the session number and the reply for the client's latest committed
-/// request. The session number is assigned once at registration (from the
-/// commit op number) and never changes for the lifetime of the entry.
+/// `session` is assigned at registration and fixed for the entry's lifetime.
#[derive(Debug)]
pub struct ClientEntry {
- /// Session number assigned at registration time (= commit op number of the
- /// register operation). Monotonically increasing across registrations.
- /// Used to distinguish between successive registrations from different
- /// client processes, a new register always gets a higher session.
+ /// Session number = commit op of the register. Monotonic across
+ /// registrations; new register always gets a higher session.
pub session: u64,
- /// The cached reply for the client's latest committed request (header +
body).
- pub reply: Message<ReplyHeader>,
+ /// Cached reply for client's latest committed request.
+ pub reply: CachedReply,
}
/// Result of checking a request against the client table.
+///
+/// In-progress dedup is the caller's job, preflights consult
+/// `pipeline.has_message_from_client(client_id)`. `ClientTable` only sees
+/// committed state.
#[derive(Debug)]
pub enum RequestStatus {
- /// Request not seen before, proceed with consensus.
+ /// Not seen; proceed with consensus.
New,
- /// Exact request already committed, re-send cached reply.
- Duplicate(Message<ReplyHeader>),
- /// Request is in the pipeline awaiting commit, drop (client should wait).
- InProgress,
- /// Request number is older than the client's latest committed request.
- /// Already handled in a prior commit cycle, drop silently.
+ /// Exact request already committed; re-send cached reply.
+ Duplicate(CachedReply),
Review Comment:
1. This PR splits the planes precisely to avoid that: ClientTable is
metadata-only, partition plane (Send/Poll) is at-least-once with no reply
cache. Only metadata ops (Create*/Delete*/Register/ACL) land in ClientTable.
Capped at CLIENTS_TABLE_MAX=8192 with oldest-committed eviction; entries are
header + small body sized.
2. VSR paper's client table design mandates at most once.
Error-on-duplicate breaks at-most-once: client reads the error as failure,
retries with a new request_id, server creates a second stream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]