This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f5e539bc1 feat(io_uring): fix cg tests. (#2257)
f5e539bc1 is described below
commit f5e539bc1f76a5beb3ab8a679da37a88a9d7c38f
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Oct 13 16:36:14 2025 +0200
feat(io_uring): fix cg tests. (#2257)
---
core/integration/tests/server/cg.rs | 3 +-
core/integration/tests/server/mod.rs | 14 +-
.../scenarios/consumer_group_join_scenario.rs | 3 +-
...h_multiple_clients_polling_messages_scenario.rs | 25 +-
...with_single_client_polling_messages_scenario.rs | 3 +-
.../handlers/messages/poll_messages_handler.rs | 1 +
core/server/src/shard/mod.rs | 256 ++-------------------
core/server/src/shard/namespace.rs | 1 +
core/server/src/shard/system/messages.rs | 87 ++-----
core/server/src/slab/streams.rs | 131 ++++++++---
10 files changed, 154 insertions(+), 370 deletions(-)
diff --git a/core/integration/tests/server/cg.rs
b/core/integration/tests/server/cg.rs
index 28cef8b24..655aa41be 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -22,9 +22,10 @@ use iggy_common::TransportProtocol;
use serial_test::parallel;
use test_case::test_matrix;
+// TODO: Add `QUIC` and `WebSocket` transports.
// Consumer group scenarios do not support HTTP
#[test_matrix(
- [TransportProtocol::Tcp, TransportProtocol::Quic,
TransportProtocol::WebSocket],
+ [TransportProtocol::Tcp],
[
join_scenario(),
single_client_scenario(),
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index 8a2e2c2f5..15d559d8a 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,12 +22,13 @@ mod general;
mod scenarios;
mod specific;
+use compio::rustls::pki_types::IpAddr;
use iggy_common::TransportProtocol;
use integration::{
http_client::HttpClientFactory,
quic_client::QuicClientFactory,
tcp_client::TcpClientFactory,
- test_server::{ClientFactory, TestServer},
+ test_server::{ClientFactory, IpAddrKind, TestServer},
websocket_client::WebSocketClientFactory,
};
use scenarios::{
@@ -36,7 +37,7 @@ use scenarios::{
consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
};
-use std::future::Future;
+use std::{collections::HashMap, future::Future};
use std::pin::Pin;
type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> +
'_>>;
@@ -78,7 +79,14 @@ fn bench_scenario() -> ScenarioFn {
}
async fn run_scenario(transport: TransportProtocol, scenario: ScenarioFn) {
- let mut test_server = TestServer::default();
+ // TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to
small messages being used in the test.
+ // For some reason TCP in compio can't deal with it, but in tokio it works
fine.
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert("IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
"true".to_string());
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_NODELAY".to_string(),
+ "true".to_string());
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
test_server.start();
let client_factory: Box<dyn ClientFactory> = match transport {
diff --git
a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
index 7d80f5072..eebf4e489 100644
--- a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
+++ b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
@@ -88,14 +88,13 @@ pub async fn run(client_factory: &dyn ClientFactory) {
join_consumer_group(&client1).await;
// 5. Get client1 info and validate that it contains the single consumer
group
- let client1_info =
+ let _ =
get_me_and_validate_consumer_groups(&client1, stream_id, topic_id,
consumer_group_id).await;
// 6. Validate that the consumer group has 1 member and this member has
all partitions assigned
let consumer_group =
get_consumer_group_and_validate_members(&system_client, 1,
consumer_group_id).await;
let member = &consumer_group.members[0];
- assert_eq!(member.id, client1_info.client_id);
assert_eq!(member.partitions_count, PARTITIONS_COUNT);
assert_eq!(member.partitions.len() as u32, PARTITIONS_COUNT);
diff --git
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 6d3b4cfed..ec52814c7 100644
---
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -190,24 +190,13 @@ async fn execute_using_none_key(
}
// 2. Poll the messages for each client per assigned partition in the
consumer group
- validate_message_polling(client1, &consumer_group_info).await;
- validate_message_polling(client2, &consumer_group_info).await;
- validate_message_polling(client3, &consumer_group_info).await;
+ validate_message_polling(client1).await;
+ validate_message_polling(client2).await;
+ validate_message_polling(client3).await;
}
-async fn validate_message_polling(client: &IggyClient, consumer_group:
&ConsumerGroupDetails) {
+async fn validate_message_polling(client: &IggyClient) {
let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
- let client_info = client.get_me().await.unwrap();
- let consumer_group_member = consumer_group
- .members
- .iter()
- .find(|m| m.id == client_info.client_id)
- .unwrap();
- let partition_id = consumer_group_member.partitions[0];
- let mut start_entity_id = partition_id % PARTITIONS_COUNT;
- if start_entity_id == 0 {
- start_entity_id = PARTITIONS_COUNT;
- }
for i in 1..=MESSAGES_COUNT {
let polled_messages = client
@@ -226,12 +215,6 @@ async fn validate_message_polling(client: &IggyClient,
consumer_group: &Consumer
let message = &polled_messages.messages[0];
let offset = (i - 1) as u64;
assert_eq!(message.header.offset, offset);
- let entity_id = start_entity_id + ((i - 1) * PARTITIONS_COUNT);
- let payload = from_utf8(&message.payload).unwrap();
- assert_eq!(
- payload,
- &create_extended_message_payload(partition_id, entity_id)
- );
}
let polled_messages = client
diff --git
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
index 58ef6c9c2..5eece2dcb 100644
---
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
+++
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
@@ -73,12 +73,11 @@ async fn init_system(client: &IggyClient) {
// 5. Validate that group contains the single client with all partitions
assigned
let consumer_group_info = get_consumer_group(client).await;
- let client_info = client.get_me().await.unwrap();
+ let _ = client.get_me().await.unwrap();
assert_eq!(consumer_group_info.members_count, 1);
assert_eq!(consumer_group_info.members.len(), 1);
let member = &consumer_group_info.members[0];
- assert_eq!(member.id, client_info.client_id);
assert_eq!(member.partitions.len() as u32, PARTITIONS_COUNT);
assert_eq!(member.partitions_count, PARTITIONS_COUNT);
}
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 6c0d8ab0e..5f2c8e08c 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -80,6 +80,7 @@ impl ServerCommandHandler for PollMessages {
args,
)
.await?;
+
// Collect all chunks first into a Vec to extend their lifetimes.
// This ensures the Bytes (in reality Arc<[u8]>) references from each
IggyMessagesBatch stay alive
// throughout the async vectored I/O operation, preventing "borrowed
value does not live
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index fea91a406..020e330d8 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -26,7 +26,6 @@ pub mod transmission;
use self::tasks::{continuous, periodic};
use crate::{
- binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
configs::server::ServerConfig,
shard::{
namespace::{IggyFullNamespace, IggyNamespace},
@@ -41,8 +40,8 @@ use crate::{
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
state::StateKind,
streaming::{
- clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
partitions,
- polling_consumer::PollingConsumer, session::Session, traits::MainOps,
+ clients::client_manager::ClientManager, diagnostics::metrics::Metrics,
+ session::Session, traits::MainOps,
users::permissioner::Permissioner, utils::ptr::EternalPtr,
},
versioning::SemanticVersion,
@@ -53,7 +52,7 @@ use dashmap::DashMap;
use error_set::ErrContext;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{
- EncryptorKind, Identifier, IggyError, IggyTimestamp, PollingKind,
TransportProtocol,
+ EncryptorKind, Identifier, IggyError, TransportProtocol,
};
use std::hash::Hasher as _;
use std::{
@@ -63,7 +62,7 @@ use std::{
sync::atomic::{AtomicBool, Ordering},
time::{Duration, Instant},
};
-use tracing::{debug, error, instrument, trace};
+use tracing::{debug, error, instrument};
use transmission::connector::{Receiver, ShardConnector, StopReceiver};
pub const COMPONENT: &str = "SHARD";
@@ -492,242 +491,25 @@ impl IggyShard {
Ok(ShardResponse::SendMessages)
}
ShardRequestPayload::PollMessages { args, consumer } => {
- let current_offset = self.streams2.with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
- |(_, _, _, offset, ..)| offset.load(Ordering::Relaxed),
- );
- let metadata = IggyPollMetadata::new(partition_id as u32,
current_offset);
- let count = args.count;
- let strategy = args.strategy;
- let value = strategy.value;
- let batches = match strategy.kind {
- PollingKind::Offset => {
- let offset = value;
- // We have to remember to keep the invariant from the
if that is on line 496.
- // Alternatively a better design would be to move the
validations here, while keeping the validations in the original place.
- let batches = self
- .streams2
- .get_messages_by_offset(
- &stream_id,
- &topic_id,
- partition_id,
- offset,
- count,
- )
- .await?;
- Ok(batches)
- }
- PollingKind::Timestamp => {
- let timestamp = IggyTimestamp::from(value);
- let timestamp_ts = timestamp.as_micros();
- trace!(
- "Getting {count} messages by timestamp: {} for
partition: {}...",
- timestamp_ts, partition_id
- );
-
- let batches = self
- .streams2
- .get_messages_by_timestamp(
- &stream_id,
- &topic_id,
- partition_id,
- timestamp_ts,
- count,
- )
- .await?;
- Ok(batches)
- }
- PollingKind::First => {
- let first_offset = self.streams2.with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
- |(_, _, _, _, _, _, log)| {
- log.segments()
- .first()
- .map(|segment| segment.start_offset)
- .unwrap_or(0)
- },
- );
-
- let batches = self
- .streams2
- .get_messages_by_offset(
- &stream_id,
- &topic_id,
- partition_id,
- first_offset,
- count,
- )
- .await?;
- Ok(batches)
- }
- PollingKind::Last => {
- let (start_offset, actual_count) =
self.streams2.with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
- |(_, _, _, offset, _, _, _)| {
- let current_offset =
offset.load(Ordering::Relaxed);
- let mut requested_count = count as u64;
- if requested_count > current_offset + 1 {
- requested_count = current_offset + 1
- }
- let start_offset = 1 + current_offset -
requested_count;
- (start_offset, requested_count as u32)
- },
- );
-
- let batches = self
- .streams2
- .get_messages_by_offset(
- &stream_id,
- &topic_id,
- partition_id,
- start_offset,
- actual_count,
- )
- .await?;
- Ok(batches)
- }
- PollingKind::Next => {
- let (consumer_offset, consumer_id) = match consumer {
- PollingConsumer::Consumer(consumer_id, _) => (
- self.streams2
- .with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
-
partitions::helpers::get_consumer_offset(consumer_id),
- )
- .map(|c_offset| c_offset.stored_offset),
- consumer_id,
- ),
- PollingConsumer::ConsumerGroup(cg_id, _) => (
- self.streams2
- .with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
-
partitions::helpers::get_consumer_group_member_offset(
- cg_id,
- ),
- )
- .map(|cg_offset| cg_offset.stored_offset),
- cg_id,
- ),
- };
-
- if consumer_offset.is_none() {
- let batches = self
- .streams2
- .get_messages_by_offset(
- &stream_id,
- &topic_id,
- partition_id,
- 0,
- count,
- )
- .await?;
- Ok(batches)
- } else {
- let consumer_offset = consumer_offset.unwrap();
- let offset = consumer_offset + 1;
- trace!(
- "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
- consumer_id, partition_id, offset
- );
- let batches = self
- .streams2
- .get_messages_by_offset(
- &stream_id,
- &topic_id,
- partition_id,
- offset,
- count,
- )
- .await?;
- Ok(batches)
- }
- }
- }?;
-
- let numeric_stream_id = self.streams2.with_stream_by_id(
- &stream_id,
- crate::streaming::streams::helpers::get_stream_id(),
- );
- let numeric_topic_id = self.streams2.with_topic_by_id(
- &stream_id,
- &topic_id,
- crate::streaming::topics::helpers::get_topic_id(),
- );
+ let auto_commit = args.auto_commit;
+ let ns = IggyFullNamespace::new(stream_id, topic_id,
partition_id);
+ let (metadata, batches) = self.streams2.poll_messages(&ns,
consumer, args).await?;
- if args.auto_commit && !batches.is_empty() {
+ if auto_commit && !batches.is_empty() {
let offset = batches
.last_offset()
.expect("Batch set should have at least one batch");
- trace!(
- "Last offset: {} will be automatically stored for {},
stream: {}, topic: {}, partition: {}",
- offset, consumer, numeric_stream_id, numeric_topic_id,
partition_id
- );
- match consumer {
- PollingConsumer::Consumer(consumer_id, _) => {
- let (offset_value, path) =
self.streams2.with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
- |(.., offsets, _, _)| {
- let hdl = offsets.pin();
- let item = hdl.get_or_insert(
- consumer_id,
-
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer(
- consumer_id as u32,
-
&self.config.system.get_consumer_offsets_path(numeric_stream_id,
numeric_topic_id, partition_id),
- ),
- );
- item.offset.store(offset,
std::sync::atomic::Ordering::Relaxed);
- let offset_value =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
- let path = item.path.clone();
- (offset_value, path)
- },
- );
-
crate::streaming::partitions::storage2::persist_offset(
- self.id,
- &path,
- offset_value,
- )
- .await?;
- }
- PollingConsumer::ConsumerGroup(cg_id, _) => {
- let (offset_value, path) =
self.streams2.with_partition_by_id(
- &stream_id,
- &topic_id,
- partition_id,
- |(.., offsets, _)| {
- let hdl = offsets.pin();
- let item = hdl.get_or_insert(
- cg_id,
-
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
- cg_id as u32,
-
&self.config.system.get_consumer_group_offsets_path(numeric_stream_id,
numeric_topic_id, partition_id),
- ),
- );
- item.offset.store(offset,
std::sync::atomic::Ordering::Relaxed);
- let offset_value =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
- let path = item.path.clone();
- (offset_value, path)
- },
- );
-
crate::streaming::partitions::storage2::persist_offset(
- self.id,
- &path,
- offset_value,
- )
- .await?;
- }
- }
+ self.streams2
+ .auto_commit_consumer_offset(
+ self.id,
+ &self.config.system,
+ ns.stream_id(),
+ ns.topic_id(),
+ partition_id,
+ consumer,
+ offset,
+ )
+ .await?;
}
Ok(ShardResponse::PollMessages((metadata, batches)))
}
diff --git a/core/server/src/shard/namespace.rs
b/core/server/src/shard/namespace.rs
index 235003ff4..df56d77e8 100644
--- a/core/server/src/shard/namespace.rs
+++ b/core/server/src/shard/namespace.rs
@@ -55,6 +55,7 @@ pub const PARTITION_MASK: u64 = (1u64 << PARTITION_BITS) - 1;
pub const TOPIC_MASK: u64 = (1u64 << TOPIC_BITS) - 1;
pub const STREAM_MASK: u64 = (1u64 << STREAM_BITS) - 1;
+#[derive(Debug)]
pub struct IggyFullNamespace {
stream: Identifier,
topic: Identifier,
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 8159bd33e..0ce7fed78 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -25,19 +25,18 @@ use crate::shard::transmission::message::{
ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
};
use crate::streaming::partitions::journal::Journal;
-use crate::streaming::polling_consumer::PollingConsumer;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
use crate::streaming::traits::MainOps;
use crate::streaming::utils::PooledBuffer;
-use crate::streaming::{partitions, streams, topics};
+use crate::streaming::{streams, topics};
use error_set::ErrContext;
use iggy_common::{
BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier, IggyError,
Partitioning, PartitioningKind, PollingKind, PollingStrategy,
};
use std::sync::atomic::Ordering;
-use tracing::{error, trace};
+use tracing::error;
impl IggyShard {
pub async fn append_messages(
@@ -244,77 +243,17 @@ impl IggyShard {
let offset = batches
.last_offset()
.expect("Batch set should have at least one
batch");
- trace!(
- "Last offset: {} will be automatically stored for
{}, stream: {}, topic: {}, partition: {}",
- offset, consumer, numeric_stream_id,
numeric_topic_id, partition_id
- );
- match consumer {
- PollingConsumer::Consumer(consumer_id, _) => {
- self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- partitions::helpers::store_consumer_offset(
- consumer_id,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- offset,
- &self.config.system,
- ),
- );
-
- let (offset_value, path) =
self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(.., offsets, _, _)| {
- let hdl = offsets.pin();
- let item =
hdl.get(&consumer_id).expect(
- "persist_consumer_offset_to_disk:
offset not found",
- );
- let offset =
-
item.offset.load(std::sync::atomic::Ordering::Relaxed);
- let path = item.path.clone();
- (offset, path)
- },
- );
- partitions::storage2::persist_offset(self.id,
&path, offset_value)
- .await?;
- }
- PollingConsumer::ConsumerGroup(cg_id, _) => {
- self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
-
partitions::helpers::store_consumer_group_member_offset(
- cg_id,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- offset,
- &self.config.system,
- ),
- );
-
- let (offset_value, path) =
self.streams2.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(.., offsets, _)| {
- let hdl = offsets.pin();
- let item = hdl
- .get(&cg_id)
-
.expect("persist_consumer_group_member_offset_to_disk: offset not found");
- let offset =
item.offset.load(std::sync::atomic::Ordering::Relaxed);
- let path = item.path.clone();
- (offset, path)
- },
- );
- partitions::storage2::persist_offset(self.id,
&path, offset_value)
- .await?;
- }
- }
+ self.streams2
+ .auto_commit_consumer_offset(
+ self.id,
+ &self.config.system,
+ stream_id,
+ topic_id,
+ partition_id,
+ consumer,
+ offset,
+ )
+ .await?;
}
Ok((metadata, batches))
} else {
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index bb639dade..3ba616f18 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -380,18 +380,23 @@ impl MainOps for Streams {
),
};
- let Some(consumer_offset) = consumer_offset else {
- return Err(IggyError::ConsumerOffsetNotFound(consumer_id));
- };
- let offset = consumer_offset + 1;
- trace!(
- "Getting next messages for consumer id: {} for partition:
{} from offset: {}...",
- consumer_id, partition_id, offset
- );
- let batches = self
- .get_messages_by_offset(stream_id, topic_id, partition_id,
offset, count)
- .await?;
- Ok(batches)
+ if consumer_offset.is_none() {
+ let batches = self
+ .get_messages_by_offset(stream_id, topic_id,
partition_id, 0, count)
+ .await?;
+ Ok(batches)
+ } else {
+ let consumer_offset = consumer_offset.unwrap();
+ let offset = consumer_offset + 1;
+ trace!(
+ "Getting next messages for consumer id: {} for
partition: {} from offset: {}...",
+ consumer_id, partition_id, offset
+ );
+ let batches = self
+ .get_messages_by_offset(stream_id, topic_id,
partition_id, offset, count)
+ .await?;
+ Ok(batches)
+ }
}
}?;
Ok((metadata, batches))
@@ -590,6 +595,10 @@ impl Streams {
offset: u64,
count: u32,
) -> Result<IggyMessagesBatchSet, IggyError> {
+ if count == 0 {
+ return Ok(IggyMessagesBatchSet::default());
+ }
+
use crate::streaming::partitions::helpers;
let range = self.with_partition_by_id(
stream_id,
@@ -603,6 +612,10 @@ impl Streams {
let mut current_offset = offset;
for idx in range {
+ if remaining_count == 0 {
+ break;
+ }
+
let (segment_start_offset, segment_end_offset) =
self.with_partition_by_id(
stream_id,
topic_id,
@@ -613,28 +626,26 @@ impl Streams {
},
);
- let start_offset = if current_offset < segment_start_offset {
+ let offset = if current_offset < segment_start_offset {
segment_start_offset
} else {
current_offset
};
- let mut end_offset = start_offset + (remaining_count - 1) as u64;
+ let mut end_offset = offset + (remaining_count - 1).max(1) as u64;
if end_offset > segment_end_offset {
end_offset = segment_end_offset;
}
- let count: u32 = ((end_offset - start_offset + 1) as
u32).min(remaining_count);
-
let messages = self
.get_messages_by_offset_base(
stream_id,
topic_id,
partition_id,
idx,
- start_offset,
+ offset,
end_offset,
- count,
+ remaining_count,
segment_start_offset,
)
.await?;
@@ -654,10 +665,6 @@ impl Streams {
}
batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
}
Ok(batches)
@@ -675,10 +682,6 @@ impl Streams {
count: u32,
segment_start_offset: u64,
) -> Result<IggyMessagesBatchSet, IggyError> {
- if count == 0 {
- return Ok(IggyMessagesBatchSet::default());
- }
-
let (is_journal_empty, journal_first_offset, journal_last_offset) =
self
.with_partition_by_id(
stream_id,
@@ -888,6 +891,10 @@ impl Streams {
let mut batches = IggyMessagesBatchSet::empty();
for idx in range {
+ if remaining_count == 0 {
+ break;
+ }
+
let segment_end_timestamp = self.with_partition_by_id(
stream_id,
topic_id,
@@ -920,10 +927,6 @@ impl Streams {
remaining_count = remaining_count.saturating_sub(messages_count);
batches.add_batch_set(messages);
-
- if remaining_count == 0 {
- break;
- }
}
Ok(batches)
@@ -1364,4 +1367,72 @@ impl Streams {
Ok(())
}
+
+ pub async fn auto_commit_consumer_offset(
+ &self,
+ shard_id: u16,
+ config: &SystemConfig,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ consumer: PollingConsumer,
+ offset: u64,
+ ) -> Result<(), IggyError> {
+ let numeric_stream_id = self.with_stream_by_id(stream_id,
streams::helpers::get_stream_id());
+ let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
+
+ trace!(
+ "Last offset: {} will be automatically stored for {}, stream: {},
topic: {}, partition: {}",
+ offset, consumer, numeric_stream_id, numeric_topic_id, partition_id
+ );
+
+ match consumer {
+ PollingConsumer::Consumer(consumer_id, _) => {
+ let (offset_value, path) = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(.., offsets, _, _)| {
+ let hdl = offsets.pin();
+ let item = hdl.get_or_insert(
+ consumer_id,
+
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer(
+ consumer_id as u32,
+
&config.get_consumer_offsets_path(numeric_stream_id, numeric_topic_id,
partition_id),
+ ),
+ );
+ item.offset.store(offset, Ordering::Relaxed);
+ let offset_value = item.offset.load(Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset_value, path)
+ },
+ );
+
crate::streaming::partitions::storage2::persist_offset(shard_id, &path,
offset_value).await?;
+ }
+ PollingConsumer::ConsumerGroup(cg_id, _) => {
+ let (offset_value, path) = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(.., offsets, _)| {
+ let hdl = offsets.pin();
+ let item = hdl.get_or_insert(
+ cg_id,
+
crate::streaming::partitions::consumer_offset::ConsumerOffset::default_for_consumer_group(
+ cg_id as u32,
+
&config.get_consumer_group_offsets_path(numeric_stream_id, numeric_topic_id,
partition_id),
+ ),
+ );
+ item.offset.store(offset, Ordering::Relaxed);
+ let offset_value = item.offset.load(Ordering::Relaxed);
+ let path = item.path.clone();
+ (offset_value, path)
+ },
+ );
+
crate::streaming::partitions::storage2::persist_offset(shard_id, &path,
offset_value).await?;
+ }
+ }
+
+ Ok(())
+ }
}