This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 78f447e68 feat(shard): add v2 consumer-offset commands with AckLevel
(#3151)
78f447e68 is described below
commit 78f447e682c0d55b0efc41bb1edf3f1174355d86
Author: Krishna Vishal <[email protected]>
AuthorDate: Wed May 6 13:45:48 2026 +0530
feat(shard): add v2 consumer-offset commands with AckLevel (#3151)
- Introduce `AckLevel { NoAck, Quorum }` and v2 wire commands
`StoreConsumerOffset2` / `DeleteConsumerOffset2` that carry an explicit
ack byte; v1 commands and wire format are untouched.
- Unify the server's two offset-write paths so `PollMessages`
auto-commit and explicit stores share a single shard entry point, with
`AckLevel` selecting the policy.
---
core/binary_protocol/src/codes.rs | 4 +
core/binary_protocol/src/consensus/operation.rs | 11 +-
core/binary_protocol/src/dispatch.rs | 88 ++++----
core/binary_protocol/src/lib.rs | 1 +
core/binary_protocol/src/primitives/ack_level.rs | 108 ++++++++++
core/binary_protocol/src/primitives/mod.rs | 1 +
.../consumer_offsets/delete_consumer_offset_2.rs | 199 ++++++++++++++++++
.../src/requests/consumer_offsets/mod.rs | 4 +
.../consumer_offsets/store_consumer_offset_2.rs | 211 +++++++++++++++++++
.../consumer_offsets/delete_consumer_offset_2.rs} | 11 +-
.../src/responses/consumer_offsets/mod.rs | 4 +
.../consumer_offsets/store_consumer_offset_2.rs} | 11 +-
core/consensus/src/client_table.rs | 2 +-
core/consensus/src/observability.rs | 2 +
core/consensus/src/plane_helpers.rs | 58 +++++-
.../server/scenarios/authentication_scenario.rs | 10 +
core/partitions/src/iggy_partition.rs | 231 +++++++++++++++++----
core/simulator/src/client.rs | 37 +++-
18 files changed, 899 insertions(+), 94 deletions(-)
diff --git a/core/binary_protocol/src/codes.rs
b/core/binary_protocol/src/codes.rs
index f663c85d0..ae33a9814 100644
--- a/core/binary_protocol/src/codes.rs
+++ b/core/binary_protocol/src/codes.rs
@@ -57,6 +57,8 @@ pub const FLUSH_UNSAVED_BUFFER_CODE: u32 = 102;
pub const GET_CONSUMER_OFFSET_CODE: u32 = 120;
pub const STORE_CONSUMER_OFFSET_CODE: u32 = 121;
pub const DELETE_CONSUMER_OFFSET_CODE: u32 = 122;
+pub const STORE_CONSUMER_OFFSET_2_CODE: u32 = 123;
+pub const DELETE_CONSUMER_OFFSET_2_CODE: u32 = 124;
// -- Streams --
pub const GET_STREAM_CODE: u32 = 200;
@@ -133,6 +135,8 @@ mod tests {
GET_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_CODE,
DELETE_CONSUMER_OFFSET_CODE,
+ STORE_CONSUMER_OFFSET_2_CODE,
+ DELETE_CONSUMER_OFFSET_2_CODE,
GET_STREAM_CODE,
GET_STREAMS_CODE,
CREATE_STREAM_CODE,
diff --git a/core/binary_protocol/src/consensus/operation.rs
b/core/binary_protocol/src/consensus/operation.rs
index f0a08a29d..64a43a3ca 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -65,6 +65,9 @@ pub enum Operation {
SendMessages = 160,
StoreConsumerOffset = 161,
DeleteConsumerOffset = 162,
+ // 163 is reserved for the planned DeleteSegments move (see TODO above).
+ StoreConsumerOffset2 = 164,
+ DeleteConsumerOffset2 = 165,
}
impl Operation {
@@ -165,7 +168,9 @@ impl Operation {
| Self::DeletePersonalAccessToken
| Self::SendMessages
| Self::StoreConsumerOffset
- | Self::DeleteConsumerOffset => match
crate::dispatch::lookup_by_operation(*self) {
+ | Self::DeleteConsumerOffset
+ | Self::StoreConsumerOffset2
+ | Self::DeleteConsumerOffset2 => match
crate::dispatch::lookup_by_operation(*self) {
Some(meta) => Some(meta.code),
None => None,
},
@@ -214,6 +219,8 @@ mod tests {
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
+ Operation::StoreConsumerOffset2,
+ Operation::DeleteConsumerOffset2,
];
for op in ops {
let code = op
@@ -270,5 +277,7 @@ mod tests {
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
assert!(Operation::DeleteConsumerOffset.is_partition());
+ assert!(Operation::StoreConsumerOffset2.is_partition());
+ assert!(Operation::DeleteConsumerOffset2.is_partition());
}
}
diff --git a/core/binary_protocol/src/dispatch.rs
b/core/binary_protocol/src/dispatch.rs
index fb5e33683..bb83280be 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -126,6 +126,16 @@ pub const COMMAND_TABLE: &[CommandMeta] = &[
"consumer_offset.delete",
Operation::DeleteConsumerOffset,
),
+ CommandMeta::replicated(
+ STORE_CONSUMER_OFFSET_2_CODE,
+ "consumer_offset.store.v2",
+ Operation::StoreConsumerOffset2,
+ ),
+ CommandMeta::replicated(
+ DELETE_CONSUMER_OFFSET_2_CODE,
+ "consumer_offset.delete.v2",
+ Operation::DeleteConsumerOffset2,
+ ),
// Streams
CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
@@ -212,28 +222,30 @@ pub const fn lookup_command(code: u32) -> Option<&'static
CommandMeta> {
GET_CONSUMER_OFFSET_CODE => 24,
STORE_CONSUMER_OFFSET_CODE => 25,
DELETE_CONSUMER_OFFSET_CODE => 26,
- GET_STREAM_CODE => 27,
- GET_STREAMS_CODE => 28,
- CREATE_STREAM_CODE => 29,
- DELETE_STREAM_CODE => 30,
- UPDATE_STREAM_CODE => 31,
- PURGE_STREAM_CODE => 32,
- GET_TOPIC_CODE => 33,
- GET_TOPICS_CODE => 34,
- CREATE_TOPIC_CODE => 35,
- DELETE_TOPIC_CODE => 36,
- UPDATE_TOPIC_CODE => 37,
- PURGE_TOPIC_CODE => 38,
- CREATE_PARTITIONS_CODE => 39,
- DELETE_PARTITIONS_CODE => 40,
- DELETE_SEGMENTS_CODE => 41,
- GET_CONSUMER_GROUP_CODE => 42,
- GET_CONSUMER_GROUPS_CODE => 43,
- CREATE_CONSUMER_GROUP_CODE => 44,
- DELETE_CONSUMER_GROUP_CODE => 45,
- JOIN_CONSUMER_GROUP_CODE => 46,
- LEAVE_CONSUMER_GROUP_CODE => 47,
- LOGIN_REGISTER_WITH_PAT_CODE => 48,
+ STORE_CONSUMER_OFFSET_2_CODE => 27,
+ DELETE_CONSUMER_OFFSET_2_CODE => 28,
+ GET_STREAM_CODE => 29,
+ GET_STREAMS_CODE => 30,
+ CREATE_STREAM_CODE => 31,
+ DELETE_STREAM_CODE => 32,
+ UPDATE_STREAM_CODE => 33,
+ PURGE_STREAM_CODE => 34,
+ GET_TOPIC_CODE => 35,
+ GET_TOPICS_CODE => 36,
+ CREATE_TOPIC_CODE => 37,
+ DELETE_TOPIC_CODE => 38,
+ UPDATE_TOPIC_CODE => 39,
+ PURGE_TOPIC_CODE => 40,
+ CREATE_PARTITIONS_CODE => 41,
+ DELETE_PARTITIONS_CODE => 42,
+ DELETE_SEGMENTS_CODE => 43,
+ GET_CONSUMER_GROUP_CODE => 44,
+ GET_CONSUMER_GROUPS_CODE => 45,
+ CREATE_CONSUMER_GROUP_CODE => 46,
+ DELETE_CONSUMER_GROUP_CODE => 47,
+ JOIN_CONSUMER_GROUP_CODE => 48,
+ LEAVE_CONSUMER_GROUP_CODE => 49,
+ LOGIN_REGISTER_WITH_PAT_CODE => 50,
_ => return None,
};
Some(&COMMAND_TABLE[idx])
@@ -247,19 +259,19 @@ pub const fn lookup_command(code: u32) -> Option<&'static
CommandMeta> {
pub const fn lookup_by_operation(op: Operation) -> Option<&'static
CommandMeta> {
// Indices must match the order of entries in COMMAND_TABLE above.
let idx = match op {
- Operation::CreateStream => 29,
- Operation::UpdateStream => 31,
- Operation::DeleteStream => 30,
- Operation::PurgeStream => 32,
- Operation::CreateTopic => 35,
- Operation::UpdateTopic => 37,
- Operation::DeleteTopic => 36,
- Operation::PurgeTopic => 38,
- Operation::CreatePartitions => 39,
- Operation::DeletePartitions => 40,
- Operation::DeleteSegments => 41,
- Operation::CreateConsumerGroup => 44,
- Operation::DeleteConsumerGroup => 45,
+ Operation::CreateStream => 31,
+ Operation::UpdateStream => 33,
+ Operation::DeleteStream => 32,
+ Operation::PurgeStream => 34,
+ Operation::CreateTopic => 37,
+ Operation::UpdateTopic => 39,
+ Operation::DeleteTopic => 38,
+ Operation::PurgeTopic => 40,
+ Operation::CreatePartitions => 41,
+ Operation::DeletePartitions => 42,
+ Operation::DeleteSegments => 43,
+ Operation::CreateConsumerGroup => 46,
+ Operation::DeleteConsumerGroup => 47,
Operation::CreateUser => 9,
Operation::UpdateUser => 11,
Operation::DeleteUser => 10,
@@ -270,6 +282,8 @@ pub const fn lookup_by_operation(op: Operation) ->
Option<&'static CommandMeta>
Operation::SendMessages => 22,
Operation::StoreConsumerOffset => 25,
Operation::DeleteConsumerOffset => 26,
+ Operation::StoreConsumerOffset2 => 27,
+ Operation::DeleteConsumerOffset2 => 28,
Operation::CreateTopicWithAssignments
| Operation::CreatePartitionsWithAssignments
| Operation::Reserved
@@ -313,6 +327,8 @@ mod tests {
GET_CONSUMER_OFFSET_CODE,
STORE_CONSUMER_OFFSET_CODE,
DELETE_CONSUMER_OFFSET_CODE,
+ STORE_CONSUMER_OFFSET_2_CODE,
+ DELETE_CONSUMER_OFFSET_2_CODE,
GET_STREAM_CODE,
GET_STREAMS_CODE,
CREATE_STREAM_CODE,
@@ -394,6 +410,8 @@ mod tests {
Operation::SendMessages,
Operation::StoreConsumerOffset,
Operation::DeleteConsumerOffset,
+ Operation::StoreConsumerOffset2,
+ Operation::DeleteConsumerOffset2,
];
for op in replicated_ops {
let meta = lookup_by_operation(op)
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 8d4544c2b..3ef0a8513 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -80,6 +80,7 @@ pub use framing::{RequestFrame, RequestFrame2, ResponseFrame,
ResponseFrame2, ST
pub use message_view::{
WireMessageIterator, WireMessageIteratorMut, WireMessageView,
WireMessageViewMut,
};
+pub use primitives::ack_level::AckLevel;
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier,
WireName};
pub use primitives::partition_assignment::CreatedPartitionAssignment;
diff --git a/core/binary_protocol/src/primitives/ack_level.rs
b/core/binary_protocol/src/primitives/ack_level.rs
new file mode 100644
index 000000000..a2ddb2389
--- /dev/null
+++ b/core/binary_protocol/src/primitives/ack_level.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+
+/// Acknowledgement policy for consumer-offset write commands.
+///
+/// Wire format: single `u8` discriminant.
+/// - `NoAck(0)`: leader-local write only; respond as soon as the in-memory
+/// and on-disk state have been updated. Matches the fast path used by
+/// `PollMessages` auto-commit.
+/// - `Quorum(1)`: submit through the partition VSR consensus pipeline and
+/// respond only after the write has been committed by a quorum of replicas.
+/// This is the default for explicit client writes.
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum AckLevel {
+ NoAck = 0,
+ #[default]
+ Quorum = 1,
+}
+
+impl AckLevel {
+ /// Decode an `AckLevel` from its wire discriminant.
+ ///
+ /// # Errors
+ /// Returns `WireError::UnknownDiscriminant` for unrecognised values.
+ pub const fn from_code(code: u8) -> Result<Self, WireError> {
+ match code {
+ 0 => Ok(Self::NoAck),
+ 1 => Ok(Self::Quorum),
+ other => Err(WireError::UnknownDiscriminant {
+ type_name: "AckLevel",
+ value: other,
+ offset: 0,
+ }),
+ }
+ }
+
+ /// Encode this `AckLevel` as its wire discriminant.
+ #[must_use]
+ pub const fn as_u8(self) -> u8 {
+ self as u8
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip_no_ack() {
+ let level = AckLevel::NoAck;
+ let decoded = AckLevel::from_code(level.as_u8()).unwrap();
+ assert_eq!(decoded, level);
+ }
+
+ #[test]
+ fn roundtrip_quorum() {
+ let level = AckLevel::Quorum;
+ let decoded = AckLevel::from_code(level.as_u8()).unwrap();
+ assert_eq!(decoded, level);
+ }
+
+ #[test]
+ fn default_is_quorum() {
+ assert_eq!(AckLevel::default(), AckLevel::Quorum);
+ }
+
+ #[test]
+ fn discriminant_values() {
+ assert_eq!(AckLevel::NoAck.as_u8(), 0);
+ assert_eq!(AckLevel::Quorum.as_u8(), 1);
+ }
+
+ #[test]
+ fn unknown_discriminant_rejected() {
+ for code in 2u8..=u8::MAX {
+ let err = AckLevel::from_code(code).unwrap_err();
+ match err {
+ WireError::UnknownDiscriminant {
+ type_name,
+ value,
+ offset,
+ } => {
+ assert_eq!(type_name, "AckLevel");
+ assert_eq!(value, code);
+ assert_eq!(offset, 0);
+ }
+ other => panic!("unexpected error variant: {other:?}"),
+ }
+ }
+ }
+}
diff --git a/core/binary_protocol/src/primitives/mod.rs
b/core/binary_protocol/src/primitives/mod.rs
index 064f0fcfb..f3c7ee4e3 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ b/core/binary_protocol/src/primitives/mod.rs
@@ -17,6 +17,7 @@
//! Shared wire primitives reused across request and response types.
+pub mod ack_level;
pub mod consumer;
pub mod identifier;
pub mod partition_assignment;
diff --git
a/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
new file mode 100644
index 000000000..ccfa104ad
--- /dev/null
+++
b/core/binary_protocol/src/requests/consumer_offsets/delete_consumer_offset_2.rs
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le};
+use crate::primitives::ack_level::AckLevel;
+use crate::primitives::consumer::WireConsumer;
+use bytes::{BufMut, BytesMut};
+
+/// `DeleteConsumerOffset` v2 request.
+///
+/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR
+/// pipeline.
+///
+/// Wire format:
+/// ```text
+/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4 LE][ack:1]
+/// ```
+///
+/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes
+/// for the u32 value (0 when None).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DeleteConsumerOffset2Request {
+ pub consumer: WireConsumer,
+ pub stream_id: WireIdentifier,
+ pub topic_id: WireIdentifier,
+ pub partition_id: Option<u32>,
+ pub ack: AckLevel,
+}
+
+impl WireEncode for DeleteConsumerOffset2Request {
+ fn encoded_size(&self) -> usize {
+ self.consumer.encoded_size()
+ + self.stream_id.encoded_size()
+ + self.topic_id.encoded_size()
+ + 1
+ + 4
+ + 1
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.consumer.encode(buf);
+ self.stream_id.encode(buf);
+ self.topic_id.encode(buf);
+ if let Some(pid) = self.partition_id {
+ buf.put_u8(1);
+ buf.put_u32_le(pid);
+ } else {
+ buf.put_u8(0);
+ buf.put_u32_le(0);
+ }
+ buf.put_u8(self.ack.as_u8());
+ }
+}
+
+impl WireDecode for DeleteConsumerOffset2Request {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let mut pos = 0;
+ let (consumer, n) = WireConsumer::decode(&buf[pos..])?;
+ pos += n;
+ let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?;
+ pos += n;
+ let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?;
+ pos += n;
+ let partition_flag = read_u8(buf, pos)?;
+ pos += 1;
+ let partition_raw = read_u32_le(buf, pos)?;
+ pos += 4;
+ let partition_id = if partition_flag == 1 {
+ Some(partition_raw)
+ } else {
+ None
+ };
+ let ack_code = read_u8(buf, pos)?;
+ pos += 1;
+ let ack = AckLevel::from_code(ack_code)?;
+ Ok((
+ Self {
+ consumer,
+ stream_id,
+ topic_id,
+ partition_id,
+ ack,
+ },
+ pos,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip_with_partition_quorum() {
+ let req = DeleteConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(10),
+ topic_id: WireIdentifier::numeric(20),
+ partition_id: Some(5),
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn roundtrip_without_partition_no_ack() {
+ let req = DeleteConsumerOffset2Request {
+ consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: None,
+ ack: AckLevel::NoAck,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn roundtrip_named_identifiers() {
+ let req = DeleteConsumerOffset2Request {
+ consumer:
WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()),
+ stream_id: WireIdentifier::named("stream-1").unwrap(),
+ topic_id: WireIdentifier::named("topic-1").unwrap(),
+ partition_id: Some(0),
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
DeleteConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn ack_byte_is_last() {
+ let req = DeleteConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(0),
+ ack: AckLevel::NoAck,
+ };
+ let bytes = req.to_bytes();
+ assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8());
+ }
+
+ #[test]
+ fn unknown_ack_rejected() {
+ let req = DeleteConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(0),
+ ack: AckLevel::Quorum,
+ };
+ let mut bytes = req.to_bytes().to_vec();
+ let last = bytes.len() - 1;
+ bytes[last] = 0xFF;
+ assert!(DeleteConsumerOffset2Request::decode(&bytes).is_err());
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let req = DeleteConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(1),
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ DeleteConsumerOffset2Request::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+}
diff --git a/core/binary_protocol/src/requests/consumer_offsets/mod.rs
b/core/binary_protocol/src/requests/consumer_offsets/mod.rs
index bde4d02b5..f8d3cee59 100644
--- a/core/binary_protocol/src/requests/consumer_offsets/mod.rs
+++ b/core/binary_protocol/src/requests/consumer_offsets/mod.rs
@@ -16,9 +16,13 @@
// under the License.
pub mod delete_consumer_offset;
+pub mod delete_consumer_offset_2;
pub mod get_consumer_offset;
pub mod store_consumer_offset;
+pub mod store_consumer_offset_2;
pub use delete_consumer_offset::DeleteConsumerOffsetRequest;
+pub use delete_consumer_offset_2::DeleteConsumerOffset2Request;
pub use get_consumer_offset::GetConsumerOffsetRequest;
pub use store_consumer_offset::StoreConsumerOffsetRequest;
+pub use store_consumer_offset_2::StoreConsumerOffset2Request;
diff --git
a/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs
b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs
new file mode 100644
index 000000000..37e89fb1c
--- /dev/null
+++
b/core/binary_protocol/src/requests/consumer_offsets/store_consumer_offset_2.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::WireIdentifier;
+use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le, read_u64_le};
+use crate::primitives::ack_level::AckLevel;
+use crate::primitives::consumer::WireConsumer;
+use bytes::{BufMut, BytesMut};
+
+/// `StoreConsumerOffset` v2 request.
+///
+/// Adds an `ack` byte: `NoAck` = leader-local fast path, `Quorum` = VSR
+/// pipeline.
+///
+/// Wire format:
+/// ```text
+/// [consumer][stream_id][topic_id][partition_flag:1][partition_id:4
LE][offset:8 LE][ack:1]
+/// ```
+///
+/// `partition_id` encoding: a u8 flag (1=Some, 0=None) followed by 4 bytes
+/// for the u32 value (0 when None).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct StoreConsumerOffset2Request {
+ pub consumer: WireConsumer,
+ pub stream_id: WireIdentifier,
+ pub topic_id: WireIdentifier,
+ pub partition_id: Option<u32>,
+ pub offset: u64,
+ pub ack: AckLevel,
+}
+
+impl WireEncode for StoreConsumerOffset2Request {
+ fn encoded_size(&self) -> usize {
+ self.consumer.encoded_size()
+ + self.stream_id.encoded_size()
+ + self.topic_id.encoded_size()
+ + 1
+ + 4
+ + 8
+ + 1
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ self.consumer.encode(buf);
+ self.stream_id.encode(buf);
+ self.topic_id.encode(buf);
+ if let Some(pid) = self.partition_id {
+ buf.put_u8(1);
+ buf.put_u32_le(pid);
+ } else {
+ buf.put_u8(0);
+ buf.put_u32_le(0);
+ }
+ buf.put_u64_le(self.offset);
+ buf.put_u8(self.ack.as_u8());
+ }
+}
+
+impl WireDecode for StoreConsumerOffset2Request {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let mut pos = 0;
+ let (consumer, n) = WireConsumer::decode(&buf[pos..])?;
+ pos += n;
+ let (stream_id, n) = WireIdentifier::decode(&buf[pos..])?;
+ pos += n;
+ let (topic_id, n) = WireIdentifier::decode(&buf[pos..])?;
+ pos += n;
+ let partition_flag = read_u8(buf, pos)?;
+ pos += 1;
+ let partition_raw = read_u32_le(buf, pos)?;
+ pos += 4;
+ let partition_id = if partition_flag == 1 {
+ Some(partition_raw)
+ } else {
+ None
+ };
+ let offset = read_u64_le(buf, pos)?;
+ pos += 8;
+ let ack_code = read_u8(buf, pos)?;
+ pos += 1;
+ let ack = AckLevel::from_code(ack_code)?;
+ Ok((
+ Self {
+ consumer,
+ stream_id,
+ topic_id,
+ partition_id,
+ offset,
+ ack,
+ },
+ pos,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip_with_partition_quorum() {
+ let req = StoreConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(10),
+ topic_id: WireIdentifier::numeric(20),
+ partition_id: Some(5),
+ offset: 12345,
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn roundtrip_without_partition_no_ack() {
+ let req = StoreConsumerOffset2Request {
+ consumer: WireConsumer::consumer_group(WireIdentifier::numeric(3)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: None,
+ offset: u64::MAX,
+ ack: AckLevel::NoAck,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn roundtrip_named_identifiers() {
+ let req = StoreConsumerOffset2Request {
+ consumer:
WireConsumer::consumer(WireIdentifier::named("my-consumer").unwrap()),
+ stream_id: WireIdentifier::named("stream-1").unwrap(),
+ topic_id: WireIdentifier::named("topic-1").unwrap(),
+ partition_id: Some(0),
+ offset: 0,
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ let (decoded, consumed) =
StoreConsumerOffset2Request::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, req);
+ }
+
+ #[test]
+ fn ack_byte_is_last() {
+ let req = StoreConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(0),
+ offset: 0,
+ ack: AckLevel::NoAck,
+ };
+ let bytes = req.to_bytes();
+ assert_eq!(*bytes.last().unwrap(), AckLevel::NoAck.as_u8());
+ }
+
+ #[test]
+ fn unknown_ack_rejected() {
+ let req = StoreConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(0),
+ offset: 0,
+ ack: AckLevel::Quorum,
+ };
+ let mut bytes = req.to_bytes().to_vec();
+ let last = bytes.len() - 1;
+ bytes[last] = 0xFF;
+ assert!(StoreConsumerOffset2Request::decode(&bytes).is_err());
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let req = StoreConsumerOffset2Request {
+ consumer: WireConsumer::consumer(WireIdentifier::numeric(1)),
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(1),
+ partition_id: Some(1),
+ offset: 100,
+ ack: AckLevel::Quorum,
+ };
+ let bytes = req.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ StoreConsumerOffset2Request::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+}
diff --git a/core/binary_protocol/src/primitives/mod.rs
b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
similarity index 77%
copy from core/binary_protocol/src/primitives/mod.rs
copy to
core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
index 064f0fcfb..a145aab18 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++
b/core/binary_protocol/src/responses/consumer_offsets/delete_consumer_offset_2.rs
@@ -15,12 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-//! Shared wire primitives reused across request and response types.
-
-pub mod consumer;
-pub mod identifier;
-pub mod partition_assignment;
-pub mod partitioning;
-pub mod permissions;
-pub mod polling_strategy;
-pub mod user_headers;
+/// `DeleteConsumerOffset2` response is empty.
+pub type DeleteConsumerOffset2Response = super::EmptyResponse;
diff --git a/core/binary_protocol/src/responses/consumer_offsets/mod.rs
b/core/binary_protocol/src/responses/consumer_offsets/mod.rs
index 915648c18..6dbaf454d 100644
--- a/core/binary_protocol/src/responses/consumer_offsets/mod.rs
+++ b/core/binary_protocol/src/responses/consumer_offsets/mod.rs
@@ -16,10 +16,14 @@
// under the License.
mod delete_consumer_offset;
+mod delete_consumer_offset_2;
pub mod get_consumer_offset;
mod store_consumer_offset;
+mod store_consumer_offset_2;
pub use super::EmptyResponse;
pub use delete_consumer_offset::DeleteConsumerOffsetResponse;
+pub use delete_consumer_offset_2::DeleteConsumerOffset2Response;
pub use get_consumer_offset::ConsumerOffsetResponse;
pub use store_consumer_offset::StoreConsumerOffsetResponse;
+pub use store_consumer_offset_2::StoreConsumerOffset2Response;
diff --git a/core/binary_protocol/src/primitives/mod.rs
b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
similarity index 77%
copy from core/binary_protocol/src/primitives/mod.rs
copy to
core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
index 064f0fcfb..0f76f603b 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++
b/core/binary_protocol/src/responses/consumer_offsets/store_consumer_offset_2.rs
@@ -15,12 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-//! Shared wire primitives reused across request and response types.
-
-pub mod consumer;
-pub mod identifier;
-pub mod partition_assignment;
-pub mod partitioning;
-pub mod permissions;
-pub mod polling_strategy;
-pub mod user_headers;
+/// `StoreConsumerOffset2` response is empty.
+pub type StoreConsumerOffset2Response = super::EmptyResponse;
diff --git a/core/consensus/src/client_table.rs
b/core/consensus/src/client_table.rs
index 54a888034..7ea5db56c 100644
--- a/core/consensus/src/client_table.rs
+++ b/core/consensus/src/client_table.rs
@@ -430,8 +430,8 @@ impl ClientTable {
if let Some(entry) = slot {
let commit = entry.reply.header().commit;
let should_evict = match evictee {
- None => true,
Some((_, min_commit)) => commit < min_commit,
+ None => true,
};
if should_evict {
evictee = Some((idx, commit));
diff --git a/core/consensus/src/observability.rs
b/core/consensus/src/observability.rs
index bb042af10..2ac1d0044 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -644,6 +644,8 @@ pub const fn operation_as_str(operation: Operation) ->
&'static str {
Operation::SendMessages => "send_messages",
Operation::StoreConsumerOffset => "store_consumer_offset",
Operation::DeleteConsumerOffset => "delete_consumer_offset",
+ Operation::StoreConsumerOffset2 => "store_consumer_offset_2",
+ Operation::DeleteConsumerOffset2 => "delete_consumer_offset_2",
}
}
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 4078d9eba..e4da88084 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -21,7 +21,9 @@ use crate::{
Status, VsrConsensus,
};
use iggy_binary_protocol::consensus::iobuf::Owned;
-use iggy_binary_protocol::{Command2, Message, PrepareHeader, PrepareOkHeader,
ReplyHeader};
+use iggy_binary_protocol::{
+ Command2, Message, PrepareHeader, PrepareOkHeader, ReplyHeader,
RequestHeader,
+};
use message_bus::{MessageBus, SendError};
use std::ops::AsyncFnOnce;
@@ -392,6 +394,60 @@ where
.expect("reply buffer must contain a valid reply message")
}
+/// Reply for fast paths that skip the VSR pipeline (e.g. `AckLevel::NoAck`).
+///
+/// Stamps `op` and `commit` with `commit_max` — monotonic, so
+/// `ClientTable::commit_reply` regression checks always pass.
+///
+/// # Panics
+/// If the constructed message buffer is not valid.
+#[allow(clippy::needless_pass_by_value, clippy::cast_possible_truncation)]
+pub fn build_reply_from_request<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ request_header: &RequestHeader,
+ body: bytes::Bytes,
+) -> Message<ReplyHeader>
+where
+ B: MessageBus,
+ P: Pipeline<Entry = PipelineEntry>,
+{
+ let header_size = std::mem::size_of::<ReplyHeader>();
+ let total_size = header_size + body.len();
+ let mut buffer = bytes::BytesMut::zeroed(total_size);
+
+ let commit = consensus.commit_max();
+ let header = bytemuck::checked::try_from_bytes_mut::<ReplyHeader>(&mut
buffer[..header_size])
+ .expect("zeroed bytes are valid");
+ *header = ReplyHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: consensus.cluster(),
+ size: total_size as u32,
+ view: consensus.view(),
+ release: 0,
+ command: Command2::Reply,
+ replica: consensus.replica(),
+ reserved_frame: [0; 66],
+ request_checksum: request_header.request_checksum,
+ context: 0,
+ client: request_header.client,
+ op: commit,
+ commit,
+ timestamp: request_header.timestamp,
+ request: request_header.request,
+ operation: request_header.operation,
+ namespace: request_header.namespace,
+ ..Default::default()
+ };
+
+ if !body.is_empty() {
+ buffer[header_size..].copy_from_slice(&body);
+ }
+
+ Message::try_from(Owned::<4096>::copy_from_slice(buffer.as_ref()))
+ .expect("reply buffer must contain a valid reply message")
+}
+
/// Verify hash chain would not break if we add this header.
///
/// # Panics
diff --git a/core/integration/tests/server/scenarios/authentication_scenario.rs
b/core/integration/tests/server/scenarios/authentication_scenario.rs
index ab5245e3c..24a0f2c44 100644
--- a/core/integration/tests/server/scenarios/authentication_scenario.rs
+++ b/core/integration/tests/server/scenarios/authentication_scenario.rs
@@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client:
&IggyClient) {
) {
continue;
}
+ // v2 consumer-offset ops are registered in the dispatch table for the
+ // consensus/simulator pathway but are not wired into the legacy binary
+ // server's dispatch. They'll move into server-ng alongside the rest of
+ // the v2 surface; re-enable these codes here once that lands.
+ if matches!(
+ code,
+ STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE
+ ) {
+ continue;
+ }
// ================================================================
// REQUIRES AUTH
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 37e99679e..e85717cfd 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -31,13 +31,13 @@ use crate::{
use consensus::{
CommitLogEvent, Consensus, PartitionDiagEvent, Pipeline, PipelineEntry,
PlaneKind, Project,
ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus,
ack_preflight,
- ack_quorum_reached, build_reply_message, drain_committable_prefix,
+ ack_quorum_reached, build_reply_from_request, build_reply_message,
drain_committable_prefix,
emit_namespace_progress_event, emit_partition_diag, emit_sim_event,
fence_old_prepare_by_commit, replicate_preflight,
replicate_to_next_in_chain,
request_preflight, send_prepare_ok as send_prepare_ok_common,
};
use iggy_binary_protocol::consensus::iobuf::Frozen;
-use iggy_binary_protocol::{Message, Operation, PrepareHeader};
+use iggy_binary_protocol::{AckLevel, Message, Operation, PrepareHeader};
use iggy_binary_protocol::{PrepareOkHeader, RequestHeader};
use iggy_common::{
ConsumerGroupId, ConsumerGroupOffsets, ConsumerKind, ConsumerOffset,
ConsumerOffsets,
@@ -83,6 +83,19 @@ where
observed_view: u32,
}
+/// Post-preflight dispatch in `on_request`: replicate via VSR or take the
+/// `NoAck` leader-local fast path. `RequestHeader` is boxed to avoid the
+/// 277-byte inline variant tripping clippy's `large_enum_variant`.
+enum Disposition {
+ Replicate(Message<PrepareHeader>),
+ NoAck {
+ request_header: Box<RequestHeader>,
+ kind: ConsumerKind,
+ consumer_id: u32,
+ offset: Option<u64>,
+ },
+}
+
#[derive(Debug, Clone, Copy, PartialEq)]
struct PendingConsumerOffsetCommit {
kind: ConsumerKind,
@@ -362,6 +375,74 @@ where
Ok(())
}
+ /// `AckLevel::NoAck` fast path: persist, apply, cache + send reply, no
+ /// replication. Single-replica durability.
+ #[allow(clippy::future_not_send)]
+ async fn apply_consumer_offset_no_ack(
+ &self,
+ request_header: Box<RequestHeader>,
+ kind: ConsumerKind,
+ consumer_id: u32,
+ offset: Option<u64>,
+ ) {
+ let pending = offset.map_or_else(
+ || PendingConsumerOffsetCommit::delete(kind, consumer_id),
+ |value| PendingConsumerOffsetCommit::upsert(kind, consumer_id,
value),
+ );
+
+ if let Err(error) = self.persist_consumer_offset_commit(pending).await
{
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset
persist failed")
+ .with_operation(request_header.operation)
+ .with_error(error.to_string()),
+ );
+ return;
+ }
+ if let Err(error) = self.apply_consumer_offset_commit(pending) {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(self.diag_ctx(), "no_ack offset apply
failed")
+ .with_operation(request_header.operation)
+ .with_error(error.to_string()),
+ );
+ return;
+ }
+
+ let reply = build_reply_from_request(&self.consensus, &request_header,
bytes::Bytes::new());
+ let session = self
+ .consensus
+ .client_table()
+ .borrow()
+ .get_session(request_header.client)
+ .unwrap_or_else(|| {
+ panic!(
+ "apply_consumer_offset_no_ack: client {} not registered",
+ request_header.client
+ )
+ });
+ self.consensus.client_table().borrow_mut().commit_reply(
+ request_header.client,
+ session,
+ reply.clone(),
+ );
+
+ let reply_buffers = reply.into_generic().into_frozen();
+ if let Err(error) = self
+ .consensus
+ .message_bus()
+ .send_to_client(request_header.client, reply_buffers)
+ .await
+ {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(self.diag_ctx(), "no_ack reply send
failed")
+ .with_operation(request_header.operation)
+ .with_error(error.to_string()),
+ );
+ }
+ }
+
fn persisted_offset_path(&self, kind: ConsumerKind, consumer_id: u32) ->
Option<String> {
match kind {
ConsumerKind::Consumer => self
@@ -607,7 +688,7 @@ where
/// # Panics
/// Panics if called when this partition's consensus instance is not the
/// primary, is not in normal status, or is currently syncing.
- #[allow(clippy::future_not_send)]
+ #[allow(clippy::future_not_send, clippy::too_many_lines)]
pub async fn on_request(&mut self, message: Message<RequestHeader>) {
self.clear_pending_consumer_offset_commits_if_view_changed();
let namespace = IggyNamespace::from_raw(message.header().namespace);
@@ -635,7 +716,7 @@ where
}
}
- let prepare = {
+ let disposition = {
let consensus = self.consensus();
emit_sim_event(
SimEventKind::ClientRequestReceived,
@@ -667,25 +748,51 @@ where
message
};
- if message.header().operation == Operation::DeleteConsumerOffset {
- match
Self::parse_consumer_offset_request(message.header().operation, &message)
- .and_then(|(kind, consumer_id, _)| {
- self.ensure_consumer_offset_exists(kind, consumer_id)
- }) {
- Ok(()) => {}
- Err(error) => {
- emit_partition_diag(
- tracing::Level::WARN,
- &PartitionDiagEvent::new(
- ReplicaLogContext::from_consensus(consensus,
PlaneKind::Partitions),
- "rejecting delete_consumer_offset for missing
offset",
- )
- .with_operation(Operation::DeleteConsumerOffset)
- .with_error(error.to_string()),
- );
- return;
+ // Parse once for both the delete-existence check and AckLevel
dispatch.
+ let consumer_offset = match message.header().operation {
+ Operation::StoreConsumerOffset
+ | Operation::StoreConsumerOffset2
+ | Operation::DeleteConsumerOffset
+ | Operation::DeleteConsumerOffset2 => {
+ match
Self::parse_consumer_offset_request(message.header().operation, &message)
+ {
+ Ok(parsed) => Some(parsed),
+ Err(error) => {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(
+ ReplicaLogContext::from_consensus(
+ consensus,
+ PlaneKind::Partitions,
+ ),
+ "failed to parse consumer offset request",
+ )
+ .with_operation(message.header().operation)
+ .with_error(error.to_string()),
+ );
+ return;
+ }
}
}
+ _ => None,
+ };
+
+ if matches!(
+ message.header().operation,
+ Operation::DeleteConsumerOffset |
Operation::DeleteConsumerOffset2
+ ) && let Some((kind, consumer_id, _, _)) = consumer_offset
+ && let Err(error) = self.ensure_consumer_offset_exists(kind,
consumer_id)
+ {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(
+ ReplicaLogContext::from_consensus(consensus,
PlaneKind::Partitions),
+ "rejecting delete_consumer_offset for missing offset",
+ )
+ .with_operation(message.header().operation)
+ .with_error(error.to_string()),
+ );
+ return;
}
if request_preflight(consensus, client_id, session, request)
@@ -699,12 +806,39 @@ where
assert!(consensus.is_normal(), "on_request: status must be
normal");
assert!(!consensus.is_syncing(), "on_request: must not be
syncing");
- let prepare = message.project(consensus);
- consensus.verify_pipeline();
- consensus.pipeline_message(PlaneKind::Partitions, &prepare);
- prepare
+ // NoAck v2 -> fast path. Quorum + v1 -> VSR pipeline.
+ if let Some((kind, consumer_id, offset, AckLevel::NoAck)) =
consumer_offset
+ && matches!(
+ message.header().operation,
+ Operation::StoreConsumerOffset2 |
Operation::DeleteConsumerOffset2,
+ )
+ {
+ Disposition::NoAck {
+ request_header: Box::new(*message.header()),
+ kind,
+ consumer_id,
+ offset,
+ }
+ } else {
+ let prepare = message.project(consensus);
+ consensus.verify_pipeline();
+ consensus.pipeline_message(PlaneKind::Partitions, &prepare);
+ Disposition::Replicate(prepare)
+ }
};
- self.on_replicate(prepare).await;
+
+ match disposition {
+ Disposition::Replicate(prepare) =>
self.on_replicate(prepare).await,
+ Disposition::NoAck {
+ request_header,
+ kind,
+ consumer_id,
+ offset,
+ } => {
+ self.apply_consumer_offset_no_ack(request_header, kind,
consumer_id, offset)
+ .await;
+ }
+ }
}
#[allow(clippy::future_not_send, clippy::too_many_lines)]
@@ -972,8 +1106,12 @@ where
);
Ok(())
}
- Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset
=> {
- let (kind, consumer_id, offset) =
+ Operation::StoreConsumerOffset
+ | Operation::DeleteConsumerOffset
+ | Operation::StoreConsumerOffset2
+ | Operation::DeleteConsumerOffset2 => {
+ // Replicated path is Quorum-only by construction; ack ignored.
+ let (kind, consumer_id, offset, _ack) =
Self::parse_staged_consumer_offset_commit(header.operation, &message)?;
let write_lock = self.write_lock.clone();
let _guard = write_lock.lock().await;
@@ -995,7 +1133,7 @@ where
.map_err(|_| IggyError::CannotAppendMessage)?;
match header.operation {
- Operation::StoreConsumerOffset => {
+ Operation::StoreConsumerOffset |
Operation::StoreConsumerOffset2 => {
self.stage_consumer_offset_upsert(
header.op,
kind,
@@ -1003,7 +1141,7 @@ where
offset.expect("store_consumer_offset must include
offset"),
);
}
- Operation::DeleteConsumerOffset => {
+ Operation::DeleteConsumerOffset |
Operation::DeleteConsumerOffset2 => {
self.stage_consumer_offset_delete(header.op, kind,
consumer_id)?;
}
_ => unreachable!(),
@@ -1331,7 +1469,10 @@ where
}
!*failed_commit
}
- Operation::StoreConsumerOffset | Operation::DeleteConsumerOffset
=> {
+ Operation::StoreConsumerOffset
+ | Operation::DeleteConsumerOffset
+ | Operation::StoreConsumerOffset2
+ | Operation::DeleteConsumerOffset2 => {
self.commit_consumer_offset_entry(prepare_header,
failed_commit)
.await
}
@@ -1372,7 +1513,7 @@ where
fn parse_consumer_offset_request(
operation: Operation,
message: &Message<RequestHeader>,
- ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+ ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
let total_size =
usize::try_from(message.header().size).map_err(|_|
IggyError::InvalidCommand)?;
let body = message
@@ -1385,7 +1526,7 @@ where
fn parse_staged_consumer_offset_commit(
operation: Operation,
message: &Message<PrepareHeader>,
- ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+ ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
let total_size =
usize::try_from(message.header().size).map_err(|_|
IggyError::InvalidCommand)?;
let body = message
@@ -1398,7 +1539,7 @@ where
fn parse_consumer_offset_payload(
operation: Operation,
body: &[u8],
- ) -> Result<(ConsumerKind, u32, Option<u64>), IggyError> {
+ ) -> Result<(ConsumerKind, u32, Option<u64>, AckLevel), IggyError> {
let consumer_kind = *body.first().ok_or(IggyError::InvalidCommand)?;
let consumer_id = body
.get(1..5)
@@ -1409,8 +1550,10 @@ where
.map_err(|_| IggyError::InvalidCommand)
})?;
let kind = ConsumerKind::from_code(consumer_kind)?;
+ // v1 implicitly Quorum. v2 trailing ack byte validated; unknown
+ // discriminants rejected so malformed wire bytes fail fast.
match operation {
- Operation::StoreConsumerOffset => {
+ Operation::StoreConsumerOffset | Operation::StoreConsumerOffset2
=> {
let offset =
body.get(5..13)
.ok_or(IggyError::InvalidCommand)
@@ -1419,9 +1562,23 @@ where
.map(u64::from_le_bytes)
.map_err(|_| IggyError::InvalidCommand)
})?;
- Ok((kind, consumer_id, Some(offset)))
+ let ack = if matches!(operation,
Operation::StoreConsumerOffset2) {
+ let ack_byte =
*body.get(13).ok_or(IggyError::InvalidCommand)?;
+ AckLevel::from_code(ack_byte).map_err(|_|
IggyError::InvalidCommand)?
+ } else {
+ AckLevel::Quorum
+ };
+ Ok((kind, consumer_id, Some(offset), ack))
+ }
+ Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2
=> {
+ let ack = if matches!(operation,
Operation::DeleteConsumerOffset2) {
+ let ack_byte =
*body.get(5).ok_or(IggyError::InvalidCommand)?;
+ AckLevel::from_code(ack_byte).map_err(|_|
IggyError::InvalidCommand)?
+ } else {
+ AckLevel::Quorum
+ };
+ Ok((kind, consumer_id, None, ack))
}
- Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
_ => Err(IggyError::InvalidCommand),
}
}
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 96a20fa92..0c2c5b588 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -19,7 +19,7 @@ use bytes::Bytes;
use iggy_binary_protocol::consensus::iobuf::Owned;
use iggy_binary_protocol::requests::streams::{CreateStreamRequest,
DeleteStreamRequest};
use iggy_binary_protocol::{
- Message, Operation, RequestHeader, WireEncode, WireIdentifier, WireName,
+ AckLevel, Message, Operation, RequestHeader, WireEncode, WireIdentifier,
WireName,
};
use iggy_common::send_messages2::{
IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned,
@@ -181,6 +181,41 @@ impl SimClient {
self.build_request_with_namespace(Operation::DeleteConsumerOffset,
&payload, namespace)
}
+ /// v2 of `store_consumer_offset` with an `AckLevel` byte. `NoAck` takes
+ /// the primary's fast path (no replication); `Quorum` goes through VSR.
+ pub fn store_consumer_offset_v2(
+ &self,
+ namespace: IggyNamespace,
+ consumer_kind: u8,
+ consumer_id: u32,
+ offset: u64,
+ ack: AckLevel,
+ ) -> Message<RequestHeader> {
+ let mut payload = Vec::with_capacity(14);
+ payload.push(consumer_kind);
+ payload.extend_from_slice(&consumer_id.to_le_bytes());
+ payload.extend_from_slice(&offset.to_le_bytes());
+ payload.push(ack.as_u8());
+
+ self.build_request_with_namespace(Operation::StoreConsumerOffset2,
&payload, namespace)
+ }
+
+ /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
+ pub fn delete_consumer_offset_v2(
+ &self,
+ namespace: IggyNamespace,
+ consumer_kind: u8,
+ consumer_id: u32,
+ ack: AckLevel,
+ ) -> Message<RequestHeader> {
+ let mut payload = Vec::with_capacity(6);
+ payload.push(consumer_kind);
+ payload.extend_from_slice(&consumer_id.to_le_bytes());
+ payload.push(ack.as_u8());
+
+ self.build_request_with_namespace(Operation::DeleteConsumerOffset2,
&payload, namespace)
+ }
+
#[allow(clippy::cast_possible_truncation)]
fn build_request_with_namespace(
&self,