This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 63c0b2f4cdf78b2ea7470a7bece901f66683008e Author: Grzegorz Koszyk <[email protected]> AuthorDate: Mon Jun 30 20:26:35 2025 +0200 refactor(io_uring): refactor the send/poll method flow (#1944) --- .../handlers/messages/poll_messages_handler.rs | 95 ++------- .../handlers/messages/send_messages_handler.rs | 89 ++------ core/server/src/shard/mod.rs | 29 +-- core/server/src/shard/system/messages.rs | 236 +++++++++++++-------- core/server/src/shard/transmission/message.rs | 14 +- core/server/src/streaming/streams/messages.rs | 90 +++++++- 6 files changed, 291 insertions(+), 262 deletions(-) diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 3abfb6a1..dfc4a89f 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -20,11 +20,11 @@ use crate::binary::command::{BinaryServerCommand, ServerCommand, ServerCommandHa use crate::binary::handlers::messages::COMPONENT; use crate::binary::handlers::utils::receive_and_validate; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::shard::namespace::IggyNamespace; use crate::shard::system::messages::PollingArgs; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; -use crate::shard::{IggyShard, ShardRequestResult}; use crate::streaming::segments::IggyMessagesBatchSet; use crate::streaming::session::Session; use crate::to_iovec; @@ -63,93 +63,30 @@ impl ServerCommandHandler for PollMessages { shard: &Rc<IggyShard>, ) -> Result<(), IggyError> { debug!("session: {session}, command: {self}"); - - let stream = shard - .get_stream(&self.stream_id) - .with_error_context(|error| { - format!( - "{COMPONENT} (error: {error}) - stream not found for stream ID: {}", - self.stream_id - ) - })?; - let topic = stream.get_topic( - &self.topic_id, - ).with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - topic not found for stream ID: {}, topic_id: {}", - self.stream_id, self.topic_id - ))?; - let PollMessages { consumer, partition_id, strategy, count, auto_commit, - .. + stream_id, + topic_id, } = self; let args = PollingArgs::new(strategy, count, auto_commit); - shard.permissioner - .borrow() - .poll_messages(session.get_user_id(), topic.stream_id, topic.topic_id) - .with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - permission denied to poll messages for user {} on stream ID: {}, topic ID: {}", - session.get_user_id(), - topic.stream_id, - topic.topic_id - ))?; - - if !topic.has_partitions() { - return Err(IggyError::NoPartitions(topic.topic_id, topic.stream_id)); - } - - // There might be no partition assigned, if it's the consumer group member without any partitions. - let Some((consumer, partition_id)) = topic - .resolve_consumer_with_partition_id(&consumer, session.client_id, partition_id, true) - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))? else { - todo!("Send early response"); - //return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); - }; - - let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); - let payload = ShardRequestPayload::PollMessages { - consumer: consumer.clone(), - args, - count, - }; - let request = ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); - let message = ShardMessage::Request(request); - let (metadata, batch) = match shard.send_request_to_shard(&namespace, message).await { - ShardRequestResult::SameShard(message) => match message { - ShardMessage::Request(request) => match request.payload { - ShardRequestPayload::PollMessages { - consumer, - args, - count, - } => { - topic - .get_messages(consumer, partition_id, args.strategy, count) - .await? - } - _ => unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" - ), - }, - _ => unreachable!( - "Expected a request message inside of an command handler, impossible state" - ), - }, - ShardRequestResult::Result(result) => match result? { - ShardResponse::PollMessages(response) => response, - ShardResponse::ErrorResponse(err) => { - return Err(err); - } - _ => unreachable!( - "Expected a PollMessages response inside of PollMessages handler, impossible state" - ), - }, - }; - + let user_id = session.get_user_id(); + let client_id = session.client_id; + let (metadata, batch) = shard + .poll_messages( + client_id, + user_id, + &stream_id, + &topic_id, + consumer, + partition_id, + args, + ) + .await?; // Collect all chunks first into a Vec to extend their lifetimes. // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive // throughout the async vectored I/O operation, preventing "borrowed value does not live diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index f4497bf6..47df8270 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -19,22 +19,20 @@ use super::COMPONENT; use crate::binary::command::{BinaryServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; +use crate::shard::IggyShard; use crate::shard::namespace::IggyNamespace; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}; -use crate::shard::{IggyShard, ShardRequestResult}; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::utils::PooledBuffer; use anyhow::Result; use bytes::BytesMut; -use error_set::ErrContext; use iggy_common::Identifier; use iggy_common::Sizeable; use iggy_common::{INDEX_SIZE, IdKind}; use iggy_common::{IggyError, Partitioning, SendMessages, Validatable}; use std::rc::Rc; -use std::sync::Arc; use tracing::instrument; impl ServerCommandHandler for SendMessages { @@ -112,80 +110,17 @@ impl ServerCommandHandler for SendMessages { ); batch.validate()?; - let stream = shard - .get_stream(&self.stream_id) - .with_error_context(|error| { - format!( - "Failed to get stream with ID: {} (error: {})", - self.stream_id, error - ) - })?; - let topic = stream - .get_topic(&self.topic_id) - .with_error_context(|error| { - format!( - "Failed to get topic with ID: {} (error: {})", - self.topic_id, error - ) - })?; - let stream_id = stream.stream_id; - let topic_id = topic.topic_id; - let partition_id = topic.calculate_partition_id(&self.partitioning)?; - - // Validate permissions for given user on stream and topic. - shard.permissioner.borrow().append_messages( - session.get_user_id(), - stream_id, - topic_id - ).with_error_context(|error| format!( - "{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", - session.get_user_id(), - stream_id, - topic_id - ))?; - let messages_count = batch.count(); - shard.metrics.increment_messages(messages_count as u64); - - // Encrypt messages if encryptor is enabled in configuration. - let batch = shard.maybe_encrypt_messages(batch)?; - - let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); - let payload = ShardRequestPayload::SendMessages { batch }; - let request = ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); - let message = ShardMessage::Request(request); - // Egh... I don't like those nested match statements, - // Technically there is only two `request` types that will ever be dispatched - // to different shards, thus we could get away with generic structs - // maybe todo ? - // how to make this code reusable, in a sense that we will have exactly the same code inside of - // `PollMessages` handler, but with different request.... - match shard.send_request_to_shard(&namespace, message).await { - ShardRequestResult::SameShard(message) => match message { - ShardMessage::Request(request) => { - let partition_id = request.partition_id; - match request.payload { - ShardRequestPayload::SendMessages { batch } => { - topic.append_messages(partition_id, batch).await? - } - _ => unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" - ), - } - } - _ => unreachable!( - "Expected a request message inside of an command handler, impossible state" - ), - }, - ShardRequestResult::Result(result) => match result? { - ShardResponse::SendMessages => (), - ShardResponse::ErrorResponse(err) => { - return Err(err); - } - _ => unreachable!( - "Expected a SendMessages response inside of SendMessages handler, impossible state" - ), - }, - }; + let user_id = session.get_user_id(); + shard + .append_messages( + user_id, + &self.stream_id, + &self.topic_id, + &self.partitioning, + batch, + ) + .await?; + sender.send_empty_ok_response().await?; Ok(()) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 48bf183a..6c3718f8 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -54,7 +54,7 @@ use crate::{ transmission::{ event::ShardEvent, frame::{ShardFrame, ShardResponse}, - message::{ShardMessage, ShardRequest, ShardRequestPayload}, + message::{ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult}, }, }, state::{ @@ -120,11 +120,6 @@ impl ShardInfo { } } -pub enum ShardRequestResult<T, E> { - SameShard(ShardMessage), - Result(Result<T, E>), -} - pub struct IggyShard { pub id: u16, shards: Vec<Shard>, @@ -498,13 +493,9 @@ impl IggyShard { topic.append_messages(partition_id, batch).await?; Ok(ShardResponse::SendMessages) } - ShardRequestPayload::PollMessages { - args, - consumer, - count, - } => { + ShardRequestPayload::PollMessages { args, consumer } => { let (metadata, batch) = topic - .get_messages(consumer, partition_id, args.strategy, count) + .get_messages(consumer, partition_id, args.strategy, args.count) .await?; Ok(ShardResponse::PollMessages((metadata, batch))) } @@ -551,14 +542,14 @@ impl IggyShard { } } - pub async fn send_request_to_shard( + pub async fn send_request_to_shard_or_recoil( &self, namespace: &IggyNamespace, message: ShardMessage, - ) -> ShardRequestResult<ShardResponse, IggyError> { + ) -> Result<ShardSendRequestResult, IggyError> { if let Some(shard) = self.find_shard(namespace) { if shard.id == self.id { - return ShardRequestResult::SameShard(message); + return Ok(ShardSendRequestResult::Recoil(message)); } let response = match shard.send_request(message).await { @@ -568,16 +559,16 @@ impl IggyShard { "{COMPONENT} - failed to send request to shard with ID: {}, error: {err}", shard.id ); - return ShardRequestResult::Result(Err(err)); + return Err(err); } }; - ShardRequestResult::Result(Ok(response)) + Ok(ShardSendRequestResult::Response(response)) } else { - ShardRequestResult::Result(Err(IggyError::ShardNotFound( + Err(IggyError::ShardNotFound( namespace.stream_id, namespace.topic_id, namespace.partition_id, - ))) + )) } } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 36ae234b..b27b034c 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -19,83 +19,192 @@ use super::COMPONENT; use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; use crate::shard::IggyShard; +use crate::shard::namespace::IggyNamespace; +use crate::shard::transmission::frame::ShardResponse; +use crate::shard::transmission::message::{ + ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, +}; +use crate::streaming::partitions::partition; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::session::Session; use crate::streaming::utils::PooledBuffer; use async_zip::tokio::read::stream; use error_set::ErrContext; +use futures::stream_select; use iggy_common::{ BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, - Partitioning, PollingStrategy, + Partitioning, PollingStrategy, UserId, }; use tracing::{error, trace}; impl IggyShard { + pub async fn append_messages( + &self, + user_id: u32, + stream_id: &Identifier, + topic_id: &Identifier, + partitioning: &Partitioning, + batch: IggyMessagesBatchMut, + ) -> Result<(), IggyError> { + let stream = self.get_stream(stream_id).with_error_context(|error| { + format!( + "Failed to get stream with ID: {} (error: {})", + stream_id, error + ) + })?; + let stream_id = stream.stream_id; + let numeric_topic_id = stream.get_topic(topic_id).map(|topic| topic.topic_id).with_error_context(|error| { + format!( + "Failed to get topic with ID: {} (error: {})", + topic_id, error + ) + })?; + // TODO: We should look into refactoring those permissioners, so they can accept `Identifier` instead of numeric IDs. + // Validate permissions for given user on stream and topic. + self.permissioner.borrow().append_messages( + user_id, + stream.stream_id, + numeric_topic_id + ).with_error_context(|error| format!( + "{COMPONENT} (error: {error}) - permission denied to append messages for user {} on stream ID: {}, topic ID: {}", + user_id, + stream.stream_id, + numeric_topic_id + ))?; + // Encrypt messages if encryptor is enabled in configuration. + let batch = self.maybe_encrypt_messages(batch)?; + let messages_count = batch.count(); + stream + .append_messages(topic_id, partitioning, async |topic, partition_id| { + let namespace = IggyNamespace::new(stream_id, topic.topic_id, partition_id); + let payload = ShardRequestPayload::SendMessages { batch }; + let request = ShardRequest::new(stream_id, numeric_topic_id, partition_id, payload); + let message = ShardMessage::Request(request); + + match self + .send_request_to_shard_or_recoil(&namespace, message) + .await? + { + ShardSendRequestResult::Recoil(message) => { + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message + && let ShardRequestPayload::SendMessages { batch } = payload + { + topic.append_messages(partition_id, batch).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to append messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) + } else { + unreachable!( + "Expected a SendMessages request inside of SendMessages handler, impossible state" + ); + } + } + ShardSendRequestResult::Response(response) => { + match response { + ShardResponse::SendMessages => { Ok(()) } + ShardResponse::ErrorResponse(err) => { + Err(err) + } + _ => unreachable!( + "Expected a SendMessages response inside of SendMessages handler, impossible state" + ), + } + + } + } + }) + .await?; + + self.metrics.increment_messages(messages_count as u64); + Ok(()) + } + pub async fn poll_messages( &self, - session: &Session, - consumer: &Consumer, + client_id: u32, + user_id: u32, stream_id: &Identifier, topic_id: &Identifier, - partition_id: Option<u32>, + consumer: Consumer, + maybe_partition_id: Option<u32>, args: PollingArgs, ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { - self.ensure_authenticated(session)?; - if args.count == 0 { - return Err(IggyError::InvalidMessagesCount); - } - let stream = self.get_stream(stream_id).with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - stream not found for stream ID: {stream_id}") + format!( + "{COMPONENT} (error: {error}) - stream not found for stream ID: {}", + stream_id + ) })?; let stream_id = stream.stream_id; - let topic = self.find_topic(session, &stream, topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?; + let numeric_topic_id = stream.get_topic(topic_id).map(|topic| topic.topic_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - topic not found for stream ID: {}, topic_id: {}", + stream_id, topic_id + ) + })?; + self.permissioner .borrow() - .poll_messages(session.get_user_id(), topic.stream_id, topic.topic_id) + .poll_messages(user_id, stream_id, numeric_topic_id) .with_error_context(|error| format!( "{COMPONENT} (error: {error}) - permission denied to poll messages for user {} on stream ID: {}, topic ID: {}", - session.get_user_id(), - topic.stream_id, - topic.topic_id + user_id, + stream_id, + numeric_topic_id ))?; - if !topic.has_partitions() { - return Err(IggyError::NoPartitions(topic.topic_id, topic.stream_id)); - } - // There might be no partition assigned, if it's the consumer group member without any partitions. - let Some((polling_consumer, partition_id)) = topic - .resolve_consumer_with_partition_id(consumer, session.client_id, partition_id, true) - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", session.client_id, partition_id))? else { - return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); - }; + //return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); - let (metadata, batch_set) = topic - .get_messages(polling_consumer, partition_id, args.strategy, args.count) - .await?; + let (metadata, batch) = stream.poll_messages(topic_id, client_id, consumer, maybe_partition_id, args.auto_commit, async |topic, consumer, partition_id| { + let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, partition_id); + let payload = ShardRequestPayload::PollMessages { + consumer, + args, + }; + let request = ShardRequest::new(stream.stream_id, topic.topic_id, partition_id, payload); + let message = ShardMessage::Request(request); - if args.auto_commit && !batch_set.is_empty() { - let offset = batch_set - .last_offset() - .expect("Batch set should have at least one batch"); - trace!( - "Last offset: {} will be automatically stored for {}, stream: {}, topic: {}, partition: {}", - offset, consumer, stream_id, topic_id, partition_id - ); - topic - .store_consumer_offset_internal(polling_consumer, offset, partition_id) - .await - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {}, offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?; - } + match self + .send_request_to_shard_or_recoil(&namespace, message) + .await? + { + ShardSendRequestResult::Recoil(message) => { + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message + && let ShardRequestPayload::PollMessages { consumer, args } = payload + { + topic.get_messages(consumer, partition_id, args.strategy, args.count).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to get messages for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) + } else { + unreachable!( + "Expected a PollMessages request inside of PollMessages handler, impossible state" + ); + } + } + ShardSendRequestResult::Response(response) => { + match response { + ShardResponse::PollMessages(result) => { Ok(result) } + ShardResponse::ErrorResponse(err) => { + Err(err) + } + _ => unreachable!( + "Expected a SendMessages response inside of SendMessages handler, impossible state" + ), + } - let batch_set = if let Some(encryptor) = &self.encryptor { - self.decrypt_messages(batch_set, encryptor).await? + } + } + }).await?; + + let batch = if let Some(_encryptor) = &self.encryptor { + //TODO: Bring back decryptor + todo!(); + //self.decrypt_messages(batch, encryptor.as_ref()).await? } else { - batch_set + batch }; - Ok((metadata, batch_set)) + Ok((metadata, batch)) } pub async fn flush_unsaved_buffer( @@ -126,45 +235,6 @@ impl IggyShard { Ok(()) } - pub async fn decrypt_messages( - &self, - batches: IggyMessagesBatchSet, - encryptor: &EncryptorKind, - ) -> Result<IggyMessagesBatchSet, IggyError> { - let mut decrypted_batches = Vec::with_capacity(batches.containers_count()); - for batch in batches.iter() { - let count = batch.count(); - - let mut indexes = IggyIndexesMut::with_capacity(batch.count() as usize, 0); - let mut decrypted_messages = PooledBuffer::with_capacity(batch.size() as usize); - let mut position = 0; - - for message in batch.iter() { - let payload = encryptor.decrypt(message.payload()); - match payload { - Ok(payload) => { - message.header().write_to_buffer(&mut decrypted_messages); - decrypted_messages.extend_from_slice(&payload); - if let Some(user_headers) = message.user_headers() { - decrypted_messages.extend_from_slice(user_headers); - } - indexes.insert(0, position as u32, 0); - position += message.size(); - } - Err(error) => { - error!("Cannot decrypt the message. Error: {}", error); - continue; - } - } - } - let decrypted_batch = - IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, decrypted_messages); - decrypted_batches.push(decrypted_batch); - } - - Ok(IggyMessagesBatchSet::from_vec(decrypted_batches)) - } - pub fn maybe_encrypt_messages( &self, batch: IggyMessagesBatchMut, diff --git a/core/server/src/shard/transmission/message.rs b/core/server/src/shard/transmission/message.rs index aa196fde..fe369bad 100644 --- a/core/server/src/shard/transmission/message.rs +++ b/core/server/src/shard/transmission/message.rs @@ -20,10 +20,19 @@ use iggy_common::PollingStrategy; * under the License. */ use crate::{ - shard::{system::messages::PollingArgs, transmission::event::ShardEvent}, + shard::{ + system::messages::PollingArgs, + transmission::{event::ShardEvent, frame::ShardResponse}, + }, streaming::{polling_consumer::PollingConsumer, segments::IggyMessagesBatchMut}, }; +pub enum ShardSendRequestResult { + // TODO: In the future we can add other variants, for example backpressure from the destination shard, + Recoil(ShardMessage), + Response(ShardResponse), +} + #[derive(Debug)] pub enum ShardMessage { Request(ShardRequest), @@ -60,9 +69,8 @@ pub enum ShardRequestPayload { batch: IggyMessagesBatchMut, }, PollMessages { - args: PollingArgs, consumer: PollingConsumer, - count: u32, + args: PollingArgs, }, } diff --git a/core/server/src/streaming/streams/messages.rs b/core/server/src/streaming/streams/messages.rs index 855bda66..bb6ac41e 100644 --- a/core/server/src/streaming/streams/messages.rs +++ b/core/server/src/streaming/streams/messages.rs @@ -16,11 +16,99 @@ * under the License. */ -use crate::streaming::streams::stream::Stream; +use error_set::ErrContext; +use iggy_common::{Consumer, Identifier, IggyError, Partitioning}; +use tracing::trace; + +use super::COMPONENT; +use crate::{ + binary::handlers::messages::poll_messages_handler::IggyPollMetadata, + shard::{ + namespace::IggyNamespace, + transmission::message::{ShardMessage, ShardRequest, ShardRequestPayload}, + }, + streaming::{ + partitions::partition, + polling_consumer::PollingConsumer, + segments::{IggyMessagesBatchMut, IggyMessagesBatchSet}, + streams::stream::Stream, + topics::topic::Topic, + }, +}; use std::sync::atomic::Ordering; impl Stream { pub fn get_messages_count(&self) -> u64 { self.messages_count.load(Ordering::SeqCst) } + + // TODO: Create a types module, where we will put types of those closures. + pub async fn append_messages( + &self, + topic_id: &Identifier, + partitioning: &Partitioning, + append_messages: impl AsyncFnOnce(&Topic, u32) -> Result<(), IggyError>, + ) -> Result<(), IggyError> { + // It's quite empty there, but in the future if we would like to introduce consumer group subscription + // based on topic name wildcards, we would have to lift those to stream level, + // thus it would be perfect place to perform those calculations here. + let topic = self.get_topic(topic_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get topic with ID: {} for stream ID: {}", + topic_id, self.stream_id + ) + })?; + + let partition_id = topic.calculate_partition_id(partitioning)?; + append_messages(topic, partition_id).await + } + + pub async fn poll_messages( + &self, + topic_id: &Identifier, + client_id: u32, + consumer: Consumer, + maybe_partition_id: Option<u32>, + auto_commit: bool, + poll_messages: impl AsyncFnOnce( + &Topic, + PollingConsumer, + u32, + ) + -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError>, + ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { + // Same as in `append_messages` method. + let topic = self.get_topic(topic_id).with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get topic with ID: {} for stream ID: {}", + topic_id, self.stream_id + ) + })?; + if !topic.has_partitions() { + return Err(IggyError::NoPartitions(topic.topic_id, self.stream_id)); + } + + let Some((consumer, partition_id)) = topic + .resolve_consumer_with_partition_id(&consumer, client_id, maybe_partition_id, true) + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {}, partition ID: {:?}", client_id, maybe_partition_id))? else { + todo!("Send early response"); + }; + + let (metadata, batch) = poll_messages(topic, consumer, partition_id).await?; + + if auto_commit && !batch.is_empty() { + let offset = batch + .last_offset() + .expect("Batch set should have at least one batch"); + trace!( + "Last offset: {} will be automatically stored for {}, stream: {}, topic: {}, partition: {}", + offset, consumer, self.stream_id, topic_id, partition_id + ); + topic + .store_consumer_offset_internal(consumer, offset, partition_id) + .await + .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {consumer}, offset: {offset}, partition ID: {partition_id}"))?; + } + Ok((metadata, batch)) + } }
