This is an automated email from the ASF dual-hosted git repository.

numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 78f447e68 feat(shard): add v2 consumer-offset commands with AckLevel 
(#3151)
78f447e68 is described below

commit 78f447e682c0d55b0efc41bb1edf3f1174355d86
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed May 6 13:45:48 2026 +0530

    feat(shard): add v2 consumer-offset commands with AckLevel (#3151)
    
    - Introduce `AckLevel { NoAck, Quorum }` and v2 wire commands
    `StoreConsumerOffset2` / `DeleteConsumerOffset2` that carry an explicit
    ack byte; v1 commands and wire format are untouched.
    - Unify the server's two offset-write paths so `PollMessages`
    auto-commit and explicit stores share a single shard entry point, with
    `AckLevel` selecting the policy.
---
 core/binary_protocol/src/codes.rs                  |   4 +
 core/binary_protocol/src/consensus/operation.rs    |  11 +-
 core/binary_protocol/src/dispatch.rs               |  88 ++++----
 core/binary_protocol/src/lib.rs                    |   1 +
 core/binary_protocol/src/primitives/ack_level.rs   | 108 ++++++++++
 core/binary_protocol/src/primitives/mod.rs         |   1 +
 .../consumer_offsets/delete_consumer_offset_2.rs   | 199 ++++++++++++++++++
 .../src/requests/consumer_offsets/mod.rs           |   4 +
 .../consumer_offsets/store_consumer_offset_2.rs    | 211 +++++++++++++++++++
 .../consumer_offsets/delete_consumer_offset_2.rs}  |  11 +-
 .../src/responses/consumer_offsets/mod.rs          |   4 +
 .../consumer_offsets/store_consumer_offset_2.rs}   |  11 +-
 core/consensus/src/client_table.rs                 |   2 +-
 core/consensus/src/observability.rs                |   2 +
 core/consensus/src/plane_helpers.rs                |  58 +++++-
 .../server/scenarios/authentication_scenario.rs    |  10 +
 core/partitions/src/iggy_partition.rs              | 231 +++++++++++++++++----
 core/simulator/src/client.rs                       |  37 +++-
 18 files changed, 899 insertions(+), 94 deletions(-)

diff --git a/core/binary_protocol/src/codes.rs 
b/core/binary_protocol/src/codes.rs
index f663c85d0..ae33a9814 100644
--- a/core/binary_protocol/src/codes.rs
+++ b/core/binary_protocol/src/codes.rs
@@ -57,6 +57,8 @@ pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
 pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
 pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121;
 pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122;
+pub const STORE_CONSUMER_OFFSET_2_CODE: u32 = 123;
+pub const DELETE_CONSUMER_OFFSET_2_CODE: u32 = 124;
 
 // -- Streams --
 pub const GET_STREAM_CODE: u32 = 200;
@@ -133,6 +135,8 @@ mod tests {
         GET_CONSUMER_OFFSET_CODE,
         STORE_CONSUMER_OFFSET_CODE,
         DELETE_CONSUMER_OFFSET_CODE,
+        STORE_CONSUMER_OFFSET_2_CODE,
+        DELETE_CONSUMER_OFFSET_2_CODE,
         GET_STREAM_CODE,
         GET_STREAMS_CODE,
         CREATE_STREAM_CODE,
diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
index f0a08a29d..64a43a3ca 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -65,6 +65,9 @@ pub enum Operation {
     SendMessages = 160,
     StoreConsumerOffset = 161,
     DeleteConsumerOffset = 162,
+    // 163 is reserved for the planned DeleteSegments move (see TODO above).
+    StoreConsumerOffset2 = 164,
+    DeleteConsumerOffset2 = 165,
 }
 
 impl Operation {
@@ -165,7 +168,9 @@ impl Operation {
             | Self::DeletePersonalAccessToken
             | Self::SendMessages
             | Self::StoreConsumerOffset
-            | Self::DeleteConsumerOffset => match 
crate::dispatch::lookup_by_operation(*self) {
+            | Self::DeleteConsumerOffset
+            | Self::StoreConsumerOffset2
+            | Self::DeleteConsumerOffset2 => match 
crate::dispatch::lookup_by_operation(*self) {
                 Some(meta) => Some(meta.code),
                 None => None,
             },
@@ -214,6 +219,8 @@ mod tests {
             Operation::SendMessages,
             Operation::StoreConsumerOffset,
             Operation::DeleteConsumerOffset,
+            Operation::StoreConsumerOffset2,
+            Operation::DeleteConsumerOffset2,
         ];
         for op in ops {
             let code = op
@@ -270,5 +277,7 @@ mod tests {
         assert!(!Operation::SendMessages.is_metadata());
         assert!(Operation::DeleteSegments.is_partition());
         assert!(Operation::DeleteConsumerOffset.is_partition());
+        assert!(Operation::StoreConsumerOffset2.is_partition());
+        assert!(Operation::DeleteConsumerOffset2.is_partition());
     }
 }
diff --git a/core/binary_protocol/src/dispatch.rs 
b/core/binary_protocol/src/dispatch.rs
index fb5e33683..bb83280be 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -126,6 +126,16 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
         "consumer_offset.delete",
         Operation::DeleteConsumerOffset,
     ),
+    CommandMeta::replicated(
+        STORE_CONSUMER_OFFSET_2_CODE,
+        "consumer_offset.store.v2",
+        Operation::StoreConsumerOffset2,
+    ),
+    CommandMeta::replicated(
+        DELETE_CONSUMER_OFFSET_2_CODE,
+        "consumer_offset.delete.v2",
+        Operation::DeleteConsumerOffset2,
+    ),
     // Streams
     CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
     CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
@@ -212,28 +222,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static 
CommandMeta> {
         GET_CONSUMER_OFFSET_CODE => 24,
         STORE_CONSUMER_OFFSET_CODE => 25,
         DELETE_CONSUMER_OFFSET_CODE => 26,
-        GET_STREAM_CODE => 27,
-        GET_STREAMS_CODE => 28,
-        CREATE_STREAM_CODE => 29,
-        DELETE_STREAM_CODE => 30,
-        UPDATE_STREAM_CODE => 31,
-        PURGE_STREAM_CODE => 32,
-        GET_TOPIC_CODE => 33,
-        GET_TOPICS_CODE => 34,
-        CREATE_TOPIC_CODE => 35,
-        DELETE_TOPIC_CODE => 36,
-        UPDATE_TOPIC_CODE => 37,
-        PURGE_TOPIC_CODE => 38,
-        CREATE_PARTITIONS_CODE => 39,
-        DELETE_PARTITIONS_CODE => 40,
-        DELETE_SEGMENTS_CODE => 41,
-        GET_CONSUMER_GROUP_CODE => 42,
-        GET_CONSUMER_GROUPS_CODE => 43,
-        CREATE_CONSUMER_GROUP_CODE => 44,
-        DELETE_CONSUMER_GROUP_CODE => 45,
-        JOIN_CONSUMER_GROUP_CODE => 46,
-        LEAVE_CONSUMER_GROUP_CODE => 47,
-        LOGIN_REGISTER_WITH_PAT_CODE => 48,
+        STORE_CONSUMER_OFFSET_2_CODE => 27,
+        DELETE_CONSUMER_OFFSET_2_CODE => 28,
+        GET_STREAM_CODE => 29,
+        GET_STREAMS_CODE => 30,
+        CREATE_STREAM_CODE => 31,
+        DELETE_STREAM_CODE => 32,
+        UPDATE_STREAM_CODE => 33,
+        PURGE_STREAM_CODE => 34,
+        GET_TOPIC_CODE => 35,
+        GET_TOPICS_CODE => 36,
+        CREATE_TOPIC_CODE => 37,
+        DELETE_TOPIC_CODE => 38,
+        UPDATE_TOPIC_CODE => 39,
+        PURGE_TOPIC_CODE => 40,
+        CREATE_PARTITIONS_CODE => 41,
+        DELETE_PARTITIONS_CODE => 42,
+        DELETE_SEGMENTS_CODE => 43,
+        GET_CONSUMER_GROUP_CODE => 44,
+        GET_CONSUMER_GROUPS_CODE => 45,
+        CREATE_CONSUMER_GROUP_CODE => 46,
+        DELETE_CONSUMER_GROUP_CODE => 47,
+        JOIN_CONSUMER_GROUP_CODE => 48,
+        LEAVE_CONSUMER_GROUP_CODE => 49,
+        LOGIN_REGISTER_WITH_PAT_CODE => 50,
         _ => return None,
     };
     Some(&COMMAND_TABLE[idx])
@@ -247,19 +259,19 @@ pub const fn lookup_command(code: u32) -> Option<&'static 
CommandMeta> {
 pub const fn lookup_by_operation(op: Operation) -> Option<&'static 
CommandMeta> {
     // Indices must match the order of entries in COMMAND_TABLE above.
     let idx = match op {
-        Operation::CreateStream => 29,
-        Operation::UpdateStream => 31,
-        Operation::DeleteStream => 30,
-        Operation::PurgeStream => 32,
-        Operation::CreateTopic => 35,
-        Operation::UpdateTopic => 37,
-        Operation::DeleteTopic => 36,
-        Operation::PurgeTopic => 38,
-        Operation::CreatePartitions => 39,
-        Operation::DeletePartitions => 40,
-        Operation::DeleteSegments => 41,
-        Operation::CreateConsumerGroup => 44,
-        Operation::DeleteConsumerGroup => 45,
+        Operation::CreateStream => 31,
+        Operation::UpdateStream => 33,
+        Operation::DeleteStream => 32,
+        Operation::PurgeStream => 34,
+        Operation::CreateTopic => 37,
+        Operation::UpdateTopic => 39,
+        Operation::DeleteTopic => 38,
+        Operation::PurgeTopic => 40,
+        Operation::CreatePartitions => 41,
+        Operation::DeletePartitions => 42,
+        Operation::DeleteSegments => 43,
+        Operation::CreateConsumerGroup => 46,
+        Operation::DeleteConsumerGroup => 47,
         Operation::CreateUser => 9,
         Operation::UpdateUser => 11,
         Operation::DeleteUser => 10,
@@ -270,6 +282,8 @@ pub const fn lookup_by_operation(op: Operation) -> 
Option<&'static CommandMeta>
         Operation::SendMessages => 22,
         Operation::StoreConsumerOffset => 25,
         Operation::DeleteConsumerOffset => 26,
+        Operation::StoreConsumerOffset2 => 27,
+        Operation::DeleteConsumerOffset2 => 28,
         Operation::CreateTopicWithAssignments
         | Operation::CreatePartitionsWithAssignments
         | Operation::Reserved
@@ -313,6 +327,8 @@ mod tests {
             GET_CONSUMER_OFFSET_CODE,
             STORE_CONSUMER_OFFSET_CODE,
             DELETE_CONSUMER_OFFSET_CODE,
+            STORE_CONSUMER_OFFSET_2_CODE,
+            DELETE_CONSUMER_OFFSET_2_CODE,
             GET_STREAM_CODE,
             GET_STREAMS_CODE,
             CREATE_STREAM_CODE,
@@ -394,6 +410,8 @@ mod tests {
             Operation::SendMessages,
             Operation::StoreConsumerOffset,
             Operation::DeleteConsumerOffset,
+            Operation::StoreConsumerOffset2,
+            Operation::DeleteConsumerOffset2,
         ];
         for op in replicated_ops {
             let meta = lookup_by_operation(op)
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 8d4544c2b..3ef0a8513 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -80,6 +80,7 @@ pub use framing::{RequestFrame, RequestFrame2, ResponseFrame, 
ResponseFrame2, ST
 pub use message_view::{
     WireMessageIterator, WireMessageIteratorMut, WireMessageView, 
WireMessageViewMut,
 };
+pub use primitives::ack_level::AckLevel;
 pub use primitives::consumer::WireConsumer;
 pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier, 
WireName};
 pub use primitives::partition_assignment::CreatedPartitionAssignment;
diff --git a/core/binary_protocol/src/primitives/ack_level.rs 
b/core/binary_protocol/src/primitives/ack_level.rs
new file mode 100644
index 000000000..a2ddb2389
--- /dev/null
+++ b/core/binary_protocol/src/primitives/ack_level.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+
+/// Acknowledgement policy for consumer-offset write commands.
+///
+/// Wire format: single `u8` discriminant.
+/// - `NoAck(0)`:  leader-local write only; respond as soon as the in-memory
+///   and on-disk state have been updated. Matches the fast path used by
+///   `PollMessages` auto-commit.
+/// - `Quorum(1)`: submit through the partition VSR consensus pipeline and
+///   respond only after the write has been committed by a quorum of replicas.
+///   This is the default for explicit client writes.
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum AckLevel {
+    NoAck = 0,
+    #[default]
+    Quorum = 1,
+}
+
+impl AckLevel {
+    /// Decode an `AckLevel` from its wire discriminant.
+    ///
+    /// # Errors
+    /// Returns `WireError::UnknownDiscriminant` for unrecognised values.
+    pub const fn from_code(code: u8) -> Result<Self, WireError> {
+        match code {
+            0 => Ok(Self::NoAck),
+            1 => Ok(Self::Quorum),
+            other => Err(WireError::UnknownDiscriminant {
+                type_name: "AckLevel",
+                value: other,
+                offset: 0,
+            }),
+        }
+    }
+
+    /// Encode this `AckLevel` as its wire discriminant.
+    #[must_use]
+    pub const fn as_u8(self) -> u8 {
+        self as u8
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip_no_ack() {
+        let level = AckLevel::NoAck;
+        let decoded = AckLevel::from_code(level.as_u8()).unwrap();
+        assert_eq!(decoded, level);
+    }
+
+    #[test]
+    fn roundtrip_quorum() {
+        let level = AckLevel::Quorum;
+        let decoded = AckLevel::from_code(level.as_u8()).unwrap();
+        assert_eq!(decoded, level);
+    }
+
+    #[test]
+    fn default_is_quorum() {
+        assert_eq!(AckLevel::default(), AckLevel::Quorum);
+    }
+
+    #[test]
+    fn discriminant_values() {
+        assert_eq!(AckLevel::NoAck.as_u8(), 0);
+        assert_eq!(AckLevel::Quorum.as_u8(), 1);
+    }
+
+    #[test]
+    fn unknown_discriminant_rejected() {
+        for code in 2u8..=u8::MAX {
+            let err = AckLevel::from_code(code).unwrap_err();
+            match err {
+                WireError::UnknownDiscriminant {
+                    type_name,
+                    value,
+                    offset,
+                } => {
+                    assert_eq!(type_name, "AckLevel");
+                    assert_eq!(value, code);
+                    assert_eq!(offset, 0);
+                }
+                other => panic!("unexpected error variant: {other:?}"),
+            }
+        }
+    }
+}
diff --git a/core/binary_protocol/src/primitives/mod.rs 
b/core/binary_protocol/src/primitives/mod.rs
index 064f0fcfb..f3c7ee4e3 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ b/core/binary_protocol/src/primitives/mod.rs
@@ -17,6 +17,7 @@
 
 //! Shared wire primitives reused across request and response types.
 
+pub mod ack_level;
 pub mod consumer;
 pub mod identifier;
 pub mod partition_assignment;
diff --git 
a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
 
b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
new file mode 100644
index 000000000..ccfa104ad
--- /dev/null
+++ 
b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le};
+use crate::primitives::ack_level::AckLevel;
+use crate::primitives::consumer::WireConsumer;
+use bytes::{BufMut, BytesMut};
+
+/// `DeleteConsumerOffset` v2 request.
+///
+/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR
+/// pipeline.
+///
+/// Wire format:
+/// ```text
+/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][ack:1]
+/// ```
+///
+/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes
+/// for the u32 value (0 when None).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DeleteConsumerOffset2Request {
+    pub consumer: WireConsumer,
+    pub stream_id: WireIdentifier,
+    pub topic_id: WireIdentifier,
+    pub partition_id: Option<u32>,
+    pub ack: AckLevel,
+}
+
+impl WireEncode for DeleteConsumerOffset2Request {
+    fn encoded_size(&self) -> usize {
+        self.consumer.encoded_size()
+            + self.stream_id.encoded_size()
+            + self.topic_id.encoded_size()
+            + 1
+            + 4
+            + 1
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.consumer.encode(buf);
+        self.stream_id.encode(buf);
+        self.topic_id.encode(buf);
+        if let Some(pid) = self.partition_id {
+            buf.put_u8(1);
+            buf.put_u32_le(pid);
+        } else {
+            buf.put_u8(0);
+            buf.put_u32_le(0);
+        }
+        buf.put_u8(self.ack.as_u8());
+    }
+}
+
+impl WireDecode for DeleteConsumerOffset2Request {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let mut pos = 0;
+        let (consumer, n) = WireConsumer::decode(&buf[pos..])?;
+        pos += n;
+        let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?;
+        pos += n;
+        let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?;
+        pos += n;
+        let partition_flag = read_u8(buf, pos)?;
+        pos += 1;
+        let partition_raw = read_u32_le(buf, pos)?;
+        pos += 4;
+        let partition_id = if partition_flag == 1 {
+            Some(partition_raw)
+        } else {
+            None
+        };
+        let ack_code = read_u8(buf, pos)?;
+        pos += 1;
+        let ack = AckLevel::from_code(ack_code)?;
+        Ok((
+            Self {
+                consumer,
+                stream_id,
+                topic_id,
+                partition_id,
+                ack,
+            },
+            pos,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip_with_partition_quorum() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(10),
+            topic_id: WireIdentifier::numeric(20),
+            partition_id: Some(5),
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn roundtrip_without_partition_no_ack() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: None,
+            ack: AckLevel::NoAck,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn roundtrip_named_identifiers() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: 
WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()),
+            stream_id: WireIdentifier::named("stream-1").unwrap(),
+            topic_id: WireIdentifier::named("topic-1").unwrap(),
+            partition_id: Some(0),
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn ack_byte_is_last() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(0),
+            ack: AckLevel::NoAck,
+        };
+        let bytes = req.to_bytes();
+        assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8());
+    }
+
+    #[test]
+    fn unknown_ack_rejected() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(0),
+            ack: AckLevel::Quorum,
+        };
+        let mut bytes = req.to_bytes().to_vec();
+        let last = bytes.len() - 1;
+        bytes[last] = 0xFF;
+        assert!(DeleteConsumerOffset2Request::decode(&bytes).is_err());
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let req = DeleteConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(1),
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                DeleteConsumerOffset2Request::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+}
diff --git a/core/binary_protocol/src/requests/consumer_offsets/mod.rs 
b/core/binary_protocol/src/requests/consumer_offsets/mod.rs
index bde4d02b5..f8d3cee59 100644
--- a/core/binary_protocol/src/requests/consumer_offsets/mod.rs
+++ b/core/binary_protocol/src/requests/consumer_offsets/mod.rs
@@ -16,9 +16,13 @@
 // under the License.
 
 pub mod delete_consumer_offset;
+pub mod delete_consumer_offset_2;
 pub mod get_consumer_offset;
 pub mod store_consumer_offset;
+pub mod store_consumer_offset_2;
 
 pub use delete_consumer_offset::DeleteConsumerOffsetRequest;
+pub use delete_consumer_offset_2::DeleteConsumerOffset2Request;
 pub use get_consumer_offset::GetConsumerOffsetRequest;
 pub use store_consumer_offset::StoreConsumerOffsetRequest;
+pub use store_consumer_offset_2::StoreConsumerOffset2Request;
diff --git 
a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs 
b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs
new file mode 100644
index 000000000..37e89fb1c
--- /dev/null
+++ 
b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le};
+use crate::primitives::ack_level::AckLevel;
+use crate::primitives::consumer::WireConsumer;
+use bytes::{BufMut, BytesMut};
+
+/// `StoreConsumerOffset` v2 request.
+///
+/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR
+/// pipeline.
+///
+/// Wire format:
+/// ```text
+/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 
LE][offset:8 LE][ack:1]
+/// ```
+///
+/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes
+/// for the u32 value (0 when None).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StoreConsumerOffset2Request {
+    pub consumer: WireConsumer,
+    pub stream_id: WireIdentifier,
+    pub topic_id: WireIdentifier,
+    pub partition_id: Option<u32>,
+    pub offset: u64,
+    pub ack: AckLevel,
+}
+
+impl WireEncode for StoreConsumerOffset2Request {
+    fn encoded_size(&self) -> usize {
+        self.consumer.encoded_size()
+            + self.stream_id.encoded_size()
+            + self.topic_id.encoded_size()
+            + 1
+            + 4
+            + 8
+            + 1
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        self.consumer.encode(buf);
+        self.stream_id.encode(buf);
+        self.topic_id.encode(buf);
+        if let Some(pid) = self.partition_id {
+            buf.put_u8(1);
+            buf.put_u32_le(pid);
+        } else {
+            buf.put_u8(0);
+            buf.put_u32_le(0);
+        }
+        buf.put_u64_le(self.offset);
+        buf.put_u8(self.ack.as_u8());
+    }
+}
+
+impl WireDecode for StoreConsumerOffset2Request {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let mut pos = 0;
+        let (consumer, n) = WireConsumer::decode(&buf[pos..])?;
+        pos += n;
+        let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?;
+        pos += n;
+        let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?;
+        pos += n;
+        let partition_flag = read_u8(buf, pos)?;
+        pos += 1;
+        let partition_raw = read_u32_le(buf, pos)?;
+        pos += 4;
+        let partition_id = if partition_flag == 1 {
+            Some(partition_raw)
+        } else {
+            None
+        };
+        let offset = read_u64_le(buf, pos)?;
+        pos += 8;
+        let ack_code = read_u8(buf, pos)?;
+        pos += 1;
+        let ack = AckLevel::from_code(ack_code)?;
+        Ok((
+            Self {
+                consumer,
+                stream_id,
+                topic_id,
+                partition_id,
+                offset,
+                ack,
+            },
+            pos,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn roundtrip_with_partition_quorum() {
+        let req = StoreConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(10),
+            topic_id: WireIdentifier::numeric(20),
+            partition_id: Some(5),
+            offset: 12345,
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn roundtrip_without_partition_no_ack() {
+        let req = StoreConsumerOffset2Request {
+            consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: None,
+            offset: u64::MAX,
+            ack: AckLevel::NoAck,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn roundtrip_named_identifiers() {
+        let req = StoreConsumerOffset2Request {
+            consumer: 
WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()),
+            stream_id: WireIdentifier::named("stream-1").unwrap(),
+            topic_id: WireIdentifier::named("topic-1").unwrap(),
+            partition_id: Some(0),
+            offset: 0,
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        let (decoded, consumed) = 
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, req);
+    }
+
+    #[test]
+    fn ack_byte_is_last() {
+        let req = StoreConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(0),
+            offset: 0,
+            ack: AckLevel::NoAck,
+        };
+        let bytes = req.to_bytes();
+        assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8());
+    }
+
+    #[test]
+    fn unknown_ack_rejected() {
+        let req = StoreConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(0),
+            offset: 0,
+            ack: AckLevel::Quorum,
+        };
+        let mut bytes = req.to_bytes().to_vec();
+        let last = bytes.len() - 1;
+        bytes[last] = 0xFF;
+        assert!(StoreConsumerOffset2Request::decode(&bytes).is_err());
+    }
+
+    #[test]
+    fn truncated_returns_error() {
+        let req = StoreConsumerOffset2Request {
+            consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+            stream_id: WireIdentifier::numeric(1),
+            topic_id: WireIdentifier::numeric(1),
+            partition_id: Some(1),
+            offset: 100,
+            ack: AckLevel::Quorum,
+        };
+        let bytes = req.to_bytes();
+        for i in 0..bytes.len() {
+            assert!(
+                StoreConsumerOffset2Request::decode(&bytes[..i]).is_err(),
+                "expected error for truncation at byte {i}"
+            );
+        }
+    }
+}
diff --git a/core/binary_protocol/src/primitives/mod.rs 
b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
similarity index 77%
copy from core/binary_protocol/src/primitives/mod.rs
copy to 
core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
index 064f0fcfb..a145aab18 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ 
b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
@@ -15,12 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Shared wire primitives reused across request and response types.
-
-pub mod consumer;
-pub mod identifier;
-pub mod partition_assignment;
-pub mod partitioning;
-pub mod permissions;
-pub mod polling_strategy;
-pub mod user_headers;
+/// `DeleteConsumerOffset2` response is empty.
+pub type DeleteConsumerOffset2Response = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/consumer_offsets/mod.rs 
b/core/binary_protocol/src/responses/consumer_offsets/mod.rs
index 915648c18..6dbaf454d 100644
--- a/core/binary_protocol/src/responses/consumer_offsets/mod.rs
+++ b/core/binary_protocol/src/responses/consumer_offsets/mod.rs
@@ -16,10 +16,14 @@
 // under the License.
 
 mod delete_consumer_offset;
+mod delete_consumer_offset_2;
 pub mod get_consumer_offset;
 mod store_consumer_offset;
+mod store_consumer_offset_2;
 
 pub use super::EmptyResponse;
 pub use delete_consumer_offset::DeleteConsumerOffsetResponse;
+pub use delete_consumer_offset_2::DeleteConsumerOffset2Response;
 pub use get_consumer_offset::ConsumerOffsetResponse;
 pub use store_consumer_offset::StoreConsumerOffsetResponse;
+pub use store_consumer_offset_2::StoreConsumerOffset2Response;
diff --git a/core/binary_protocol/src/primitives/mod.rs 
b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
similarity index 77%
copy from core/binary_protocol/src/primitives/mod.rs
copy to 
core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
index 064f0fcfb..0f76f603b 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ 
b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
@@ -15,12 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Shared wire primitives reused across request and response types.
-
-pub mod consumer;
-pub mod identifier;
-pub mod partition_assignment;
-pub mod partitioning;
-pub mod permissions;
-pub mod polling_strategy;
-pub mod user_headers;
+/// `StoreConsumerOffset2` response is empty.
+pub type StoreConsumerOffset2Response = super::EmptyResponse;
diff --git a/core/consensus/src/client_table.rs 
b/core/consensus/src/client_table.rs
index 54a888034..7ea5db56c 100644
--- a/core/consensus/src/client_table.rs
+++ b/core/consensus/src/client_table.rs
@@ -430,8 +430,8 @@ impl ClientTable {
             if let Some(entry) = slot {
                 let commit = entry.reply.header().commit;
                 let should_evict = match evictee {
-                    None => true,
                     Some((_, min_commit)) => commit < min_commit,
+                    None => true,
                 };
                 if should_evict {
                     evictee = Some((idx, commit));
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index bb042af10..2ac1d0044 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -644,6 +644,8 @@ pub const fn operation_as_str(operation: Operation) -> 
&'static str {
         Operation::SendMessages => "send_messages",
         Operation::StoreConsumerOffset => "store_consumer_offset",
         Operation::DeleteConsumerOffset => "delete_consumer_offset",
+        Operation::StoreConsumerOffset2 => "store_consumer_offset_2",
+        Operation::DeleteConsumerOffset2 => "delete_consumer_offset_2",
     }
 }
 
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 4078d9eba..e4da88084 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -21,7 +21,9 @@ use crate::{
     Status, VsrConsensus,
 };
 use iggy_binary_protocol::consensus::iobuf::Owned;
-use iggy_binary_protocol::{Command2, Message, PrepareHeader, PrepareOkHeader, 
ReplyHeader};
+use iggy_binary_protocol::{
+    Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader, 
RequestHeader,
+};
 use message_bus::{MessageBus, SendError};
 use std::ops::AsyncFnOnce;
 
@@ -392,6 +394,60 @@ where
         .expect("reply buffer must contain a valid reply message")
 }
 
+/// Reply for fast paths that skip the VSR pipeline (e.g. `AckLevel::NoAck`).
+///
+/// Stamps `op` and `commit` with `commit_max` — monotonic, so
+/// `ClientTable::commit_reply` regression checks always pass.
+///
+/// # Panics
+/// If the constructed message buffer is not valid.
+#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)]
+pub fn build_reply_from_request<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    request_header: &RequestHeader,
+    body: bytes::Bytes,
+) -> Message<ReplyHeader>
+where
+    B: MessageBus,
+    P: Pipeline<Entry = PipelineEntry>,
+{
+    let header_size = std::mem::size_of::<ReplyHeader>();
+    let total_size = header_size + body.len();
+    let mut buffer = bytes::BytesMut::zeroed(total_size);
+
+    let commit = consensus.commit_max();
+    let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(&mut 
buffer[..header_size])
+        .expect("zeroed bytes are valid");
+    *header = ReplyHeader {
+        checksum: 0,
+        checksum_body: 0,
+        cluster: consensus.cluster(),
+        size: total_size as u32,
+        view: consensus.view(),
+        release: 0,
+        command: Command2::Reply,
+        replica: consensus.replica(),
+        reserved_frame: [0; 66],
+        request_checksum: request_header.request_checksum,
+        context: 0,
+        client: request_header.client,
+        op: commit,
+        commit,
+        timestamp: request_header.timestamp,
+        request: request_header.request,
+        operation: request_header.operation,
+        namespace: request_header.namespace,
+        ..Default::default()
+    };
+
+    if !body.is_empty() {
+        buffer[header_size..].copy_from_slice(&body);
+    }
+
+    Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref()))
+        .expect("reply buffer must contain a valid reply message")
+}
+
 /// Verify hash chain would not break if we add this header.
 ///
 /// # Panics
diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs 
b/core/integration/tests/server/scenarios/authentication_scenario.rs
index ab5245e3c..24a0f2c44 100644
--- a/core/integration/tests/server/scenarios/authentication_scenario.rs
+++ b/core/integration/tests/server/scenarios/authentication_scenario.rs
@@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client: 
&IggyClient) {
         ) {
             continue;
         }
+        // v2 consumer-offset ops are registered in the dispatch table for the
+        // consensus/simulator pathway but are not wired into the legacy binary
+        // server's dispatch. They'll move into server-ng alongside the rest of
+        // the v2 surface; re-enable these codes here once that lands.
+        if matches!(
+            code,
+            STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE
+        ) {
+            continue;
+        }
 
         // ================================================================
         // REQUIRES AUTH
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 37e99679e..e85717cfd 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -31,13 +31,13 @@ use crate::{
 use consensus::{
     CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry, 
PlaneKind, Project,
     ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, 
ack_preflight,
-    ack_quorum_reached, build_reply_message, drain_committable_prefix,
+    ack_quorum_reached, build_reply_from_request, build_reply_message, 
drain_committable_prefix,
     emit_namespace_progress_event, emit_partition_diag, emit_sim_event,
     fence_old_prepare_by_commit, replicate_preflight, 
replicate_to_next_in_chain,
     request_preflight, send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_binary_protocol::consensus::iobuf::Frozen;
-use iggy_binary_protocol::{Message, Operation, PrepareHeader};
+use iggy_binary_protocol::{AckLevel, Message, Operation, PrepareHeader};
 use iggy_binary_protocol::{PrepareOkHeader, RequestHeader};
 use iggy_common::{
     ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset, 
ConsumerOffsets,
@@ -83,6 +83,19 @@ where
     observed_view: u32,
 }
 
+/// Post-preflight dispatch in `on_request`: replicate via VSR or take the
+/// `NoAck` leader-local fast path. `RequestHeader` is boxed to avoid the
+/// 277-byte inline variant tripping clippy's `large_enum_variant`.
+enum Disposition {
+    Replicate(Message<PrepareHeader>),
+    NoAck {
+        request_header: Box<RequestHeader>,
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: Option<u64>,
+    },
+}
+
 #[derive(Debug, Clone, Copy, PartialEq)]
 struct PendingConsumerOffsetCommit {
     kind: ConsumerKind,
@@ -362,6 +375,74 @@ where
         Ok(())
     }
 
+    /// `AckLevel::NoAck` fast path: persist, apply, cache + send reply, no
+    /// replication. Single-replica durability.
+    #[allow(clippy::future_not_send)]
+    async fn apply_consumer_offset_no_ack(
+        &self,
+        request_header: Box<RequestHeader>,
+        kind: ConsumerKind,
+        consumer_id: u32,
+        offset: Option<u64>,
+    ) {
+        let pending = offset.map_or_else(
+            || PendingConsumerOffsetCommit::delete(kind, consumer_id),
+            |value| PendingConsumerOffsetCommit::upsert(kind, consumer_id, 
value),
+        );
+
+        if let Err(error) = self.persist_consumer_offset_commit(pending).await 
{
+            emit_partition_diag(
+                tracing::Level::WARN,
+                &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset 
persist failed")
+                    .with_operation(request_header.operation)
+                    .with_error(error.to_string()),
+            );
+            return;
+        }
+        if let Err(error) = self.apply_consumer_offset_commit(pending) {
+            emit_partition_diag(
+                tracing::Level::WARN,
+                &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset apply 
failed")
+                    .with_operation(request_header.operation)
+                    .with_error(error.to_string()),
+            );
+            return;
+        }
+
+        let reply = build_reply_from_request(&self.consensus, &request_header, 
bytes::Bytes::new());
+        let session = self
+            .consensus
+            .client_table()
+            .borrow()
+            .get_session(request_header.client)
+            .unwrap_or_else(|| {
+                panic!(
+                    "apply_consumer_offset_no_ack: client {} not registered",
+                    request_header.client
+                )
+            });
+        self.consensus.client_table().borrow_mut().commit_reply(
+            request_header.client,
+            session,
+            reply.clone(),
+        );
+
+        let reply_buffers = reply.into_generic().into_frozen();
+        if let Err(error) = self
+            .consensus
+            .message_bus()
+            .send_to_client(request_header.client, reply_buffers)
+            .await
+        {
+            emit_partition_diag(
+                tracing::Level::WARN,
+                &PartitionDiagEvent::new(self.diag_ctx(), "no_ack reply send 
failed")
+                    .with_operation(request_header.operation)
+                    .with_error(error.to_string()),
+            );
+        }
+    }
+
     fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) -> 
Option<String> {
         match kind {
             ConsumerKind::Consumer => self
@@ -607,7 +688,7 @@ where
     /// # Panics
     /// Panics if called when this partition's consensus instance is not the
     /// primary, is not in normal status, or is currently syncing.
-    #[allow(clippy::future_not_send)]
+    #[allow(clippy::future_not_send, clippy::too_many_lines)]
     pub async fn on_request(&mut self, message: Message<RequestHeader>) {
         self.clear_pending_consumer_offset_commits_if_view_changed();
         let namespace = IggyNamespace::from_raw(message.header().namespace);
@@ -635,7 +716,7 @@ where
             }
         }
 
-        let prepare = {
+        let disposition = {
             let consensus = self.consensus();
             emit_sim_event(
                 SimEventKind::ClientRequestReceived,
@@ -667,25 +748,51 @@ where
                 message
             };
 
-            if message.header().operation == Operation::DeleteConsumerOffset {
-                match 
Self::parse_consumer_offset_request(message.header().operation, &message)
-                    .and_then(|(kind, consumer_id, _)| {
-                        self.ensure_consumer_offset_exists(kind, consumer_id)
-                    }) {
-                    Ok(()) => {}
-                    Err(error) => {
-                        emit_partition_diag(
-                            tracing::Level::WARN,
-                            &PartitionDiagEvent::new(
-                                ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
-                                "rejecting delete_consumer_offset for missing 
offset",
-                            )
-                            .with_operation(Operation::DeleteConsumerOffset)
-                            .with_error(error.to_string()),
-                        );
-                        return;
+            // Parse once for both the delete-existence check and AckLevel 
dispatch.
+            let consumer_offset = match message.header().operation {
+                Operation::StoreConsumerOffset
+                | Operation::StoreConsumerOffset2
+                | Operation::DeleteConsumerOffset
+                | Operation::DeleteConsumerOffset2 => {
+                    match 
Self::parse_consumer_offset_request(message.header().operation, &message)
+                    {
+                        Ok(parsed) => Some(parsed),
+                        Err(error) => {
+                            emit_partition_diag(
+                                tracing::Level::WARN,
+                                &PartitionDiagEvent::new(
+                                    ReplicaLogContext::from_consensus(
+                                        consensus,
+                                        PlaneKind::Partitions,
+                                    ),
+                                    "failed to parse consumer offset request",
+                                )
+                                .with_operation(message.header().operation)
+                                .with_error(error.to_string()),
+                            );
+                            return;
+                        }
                     }
                 }
+                _ => None,
+            };
+
+            if matches!(
+                message.header().operation,
+                Operation::DeleteConsumerOffset | 
Operation::DeleteConsumerOffset2
+            ) && let Some((kind, consumer_id, _, _)) = consumer_offset
+                && let Err(error) = self.ensure_consumer_offset_exists(kind, 
consumer_id)
+            {
+                emit_partition_diag(
+                    tracing::Level::WARN,
+                    &PartitionDiagEvent::new(
+                        ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
+                        "rejecting delete_consumer_offset for missing offset",
+                    )
+                    .with_operation(message.header().operation)
+                    .with_error(error.to_string()),
+                );
+                return;
             }
 
             if request_preflight(consensus, client_id, session, request)
@@ -699,12 +806,39 @@ where
             assert!(consensus.is_normal(), "on_request: status must be 
normal");
             assert!(!consensus.is_syncing(), "on_request: must not be 
syncing");
 
-            let prepare = message.project(consensus);
-            consensus.verify_pipeline();
-            consensus.pipeline_message(PlaneKind::Partitions, &prepare);
-            prepare
+            // NoAck v2 -> fast path. Quorum + v1 -> VSR pipeline.
+            if let Some((kind, consumer_id, offset, AckLevel::NoAck)) = 
consumer_offset
+                && matches!(
+                    message.header().operation,
+                    Operation::StoreConsumerOffset2 | 
Operation::DeleteConsumerOffset2,
+                )
+            {
+                Disposition::NoAck {
+                    request_header: Box::new(*message.header()),
+                    kind,
+                    consumer_id,
+                    offset,
+                }
+            } else {
+                let prepare = message.project(consensus);
+                consensus.verify_pipeline();
+                consensus.pipeline_message(PlaneKind::Partitions, &prepare);
+                Disposition::Replicate(prepare)
+            }
         };
-        self.on_replicate(prepare).await;
+
+        match disposition {
+            Disposition::Replicate(prepare) => 
self.on_replicate(prepare).await,
+            Disposition::NoAck {
+                request_header,
+                kind,
+                consumer_id,
+                offset,
+            } => {
+                self.apply_consumer_offset_no_ack(request_header, kind, 
consumer_id, offset)
+                    .await;
+            }
+        }
     }
 
     #[allow(clippy::future_not_send, clippy::too_many_lines)]
@@ -972,8 +1106,12 @@ where
                 );
                 Ok(())
             }
-            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
-                let (kind, consumer_id, offset) =
+            Operation::StoreConsumerOffset
+            | Operation::DeleteConsumerOffset
+            | Operation::StoreConsumerOffset2
+            | Operation::DeleteConsumerOffset2 => {
+                // Replicated path is Quorum-only by construction; ack ignored.
+                let (kind, consumer_id, offset, _ack) =
                     
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
                 let write_lock = self.write_lock.clone();
                 let _guard = write_lock.lock().await;
@@ -995,7 +1133,7 @@ where
                     .map_err(|_| IggyError::CannotAppendMessage)?;
 
                 match header.operation {
-                    Operation::StoreConsumerOffset => {
+                    Operation::StoreConsumerOffset | 
Operation::StoreConsumerOffset2 => {
                         self.stage_consumer_offset_upsert(
                             header.op,
                             kind,
@@ -1003,7 +1141,7 @@ where
                             offset.expect("store_consumer_offset must include 
offset"),
                         );
                     }
-                    Operation::DeleteConsumerOffset => {
+                    Operation::DeleteConsumerOffset | 
Operation::DeleteConsumerOffset2 => {
                         self.stage_consumer_offset_delete(header.op, kind, 
consumer_id)?;
                     }
                     _ => unreachable!(),
@@ -1331,7 +1469,10 @@ where
                 }
                 !*failed_commit
             }
-            Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset 
=> {
+            Operation::StoreConsumerOffset
+            | Operation::DeleteConsumerOffset
+            | Operation::StoreConsumerOffset2
+            | Operation::DeleteConsumerOffset2 => {
                 self.commit_consumer_offset_entry(prepare_header, 
failed_commit)
                     .await
             }
@@ -1372,7 +1513,7 @@ where
     fn parse_consumer_offset_request(
         operation: Operation,
         message: &Message<RequestHeader>,
-    ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+    ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
         let total_size =
             usize::try_from(message.header().size).map_err(|_| 
IggyError::InvalidCommand)?;
         let body = message
@@ -1385,7 +1526,7 @@ where
     fn parse_staged_consumer_offset_commit(
         operation: Operation,
         message: &Message<PrepareHeader>,
-    ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+    ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
         let total_size =
             usize::try_from(message.header().size).map_err(|_| 
IggyError::InvalidCommand)?;
         let body = message
@@ -1398,7 +1539,7 @@ where
     fn parse_consumer_offset_payload(
         operation: Operation,
         body: &[u8],
-    ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+    ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
         let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
         let consumer_id = body
             .get(1..5)
@@ -1409,8 +1550,10 @@ where
                     .map_err(|_| IggyError::InvalidCommand)
             })?;
         let kind = ConsumerKind::from_code(consumer_kind)?;
+        // v1 implicitly Quorum. v2 trailing ack byte validated; unknown
+        // discriminants rejected so malformed wire bytes fail fast.
         match operation {
-            Operation::StoreConsumerOffset => {
+            Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2 
=> {
                 let offset =
                     body.get(5..13)
                         .ok_or(IggyError::InvalidCommand)
@@ -1419,9 +1562,23 @@ where
                                 .map(u64::from_le_bytes)
                                 .map_err(|_| IggyError::InvalidCommand)
                         })?;
-                Ok((kind, consumer_id, Some(offset)))
+                let ack = if matches!(operation, 
Operation::StoreConsumerOffset2) {
+                    let ack_byte = 
*body.get(13).ok_or(IggyError::InvalidCommand)?;
+                    AckLevel::from_code(ack_byte).map_err(|_| 
IggyError::InvalidCommand)?
+                } else {
+                    AckLevel::Quorum
+                };
+                Ok((kind, consumer_id, Some(offset), ack))
+            }
+            Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 
=> {
+                let ack = if matches!(operation, 
Operation::DeleteConsumerOffset2) {
+                    let ack_byte = 
*body.get(5).ok_or(IggyError::InvalidCommand)?;
+                    AckLevel::from_code(ack_byte).map_err(|_| 
IggyError::InvalidCommand)?
+                } else {
+                    AckLevel::Quorum
+                };
+                Ok((kind, consumer_id, None, ack))
             }
-            Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
             _ => Err(IggyError::InvalidCommand),
         }
     }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 96a20fa92..0c2c5b588 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -19,7 +19,7 @@ use bytes::Bytes;
 use iggy_binary_protocol::consensus::iobuf::Owned;
 use iggy_binary_protocol::requests::streams::{CreateStreamRequest, 
DeleteStreamRequest};
 use iggy_binary_protocol::{
-    Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName,
+    AckLevel, Message, Operation, RequestHeader, WireEncode, WireIdentifier, 
WireName,
 };
 use iggy_common::send_messages2::{
     IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned,
@@ -181,6 +181,41 @@ impl SimClient {
         self.build_request_with_namespace(Operation::DeleteConsumerOffset, 
&payload, namespace)
     }
 
+    /// v2 of `store_consumer_offset` with an `AckLevel` byte. `NoAck` takes
+    /// the primary's fast path (no replication); `Quorum` goes through VSR.
+    pub fn store_consumer_offset_v2(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+        offset: u64,
+        ack: AckLevel,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(14);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+        payload.extend_from_slice(&offset.to_le_bytes());
+        payload.push(ack.as_u8());
+
+        self.build_request_with_namespace(Operation::StoreConsumerOffset2, 
&payload, namespace)
+    }
+
+    /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
+    pub fn delete_consumer_offset_v2(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+        ack: AckLevel,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(6);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+        payload.push(ack.as_u8());
+
+        self.build_request_with_namespace(Operation::DeleteConsumerOffset2, 
&payload, namespace)
+    }
+
     #[allow(clippy::cast_possible_truncation)]
     fn build_request_with_namespace(
         &self,

Reply via email to