This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new 4f3a73f7 refactor(io_uring): refactor the send/poll method flow (#1944)
4f3a73f7 is described below
commit 4f3a73f79d06c80b02864836c1925636fbc3f7b2
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Jun 30 20:26:35 2025 +0200
refactor(io_uring): refactor the send/poll method flow (#1944)
---
.../handlers/messages/poll_messages_handler.rs | 95 ++-------
.../handlers/messages/send_messages_handler.rs | 89 ++------
core/server/src/shard/mod.rs | 29 +--
core/server/src/shard/system/messages.rs | 236 +++++++++++++--------
core/server/src/shard/transmission/message.rs | 14 +-
core/server/src/streaming/streams/messages.rs | 90 +++++++-
6 files changed, 291 insertions(+), 262 deletions(-)
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 3abfb6a1..dfc4a89f 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,11 +20,11 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::messages::COMPONENT;
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
use crate::shard::namespace::IggyNamespace;
use crate::shard::system::messages::PollingArgs;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
-use crate::shard::{IggyShard, ShardRequestResult};
use crate::streaming::segments::IggyMessagesBatchSet;
use crate::streaming::session::Session;
use crate::to_iovec;
@@ -63,93 +63,30 @@ impl ServerCommandHandler for PollMessages {
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
debug!("session: {session}, command: {self}");
-
- let stream = shard
- .get_stream(&self.stream_id)
- .with_error_context(|error| {
- format!(
- "{COMPONENT} (error: {error}) - stream not found for
stream ID: {}",
- self.stream_id
- )
- })?;
- let topic = stream.get_topic(
- &self.topic_id,
- ).with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - topic not found for stream ID: {},
topic_id: {}",
- self.stream_id, self.topic_id
- ))?;
-
let PollMessages {
consumer,
partition_id,
strategy,
count,
auto_commit,
- ..
+ stream_id,
+ topic_id,
} = self;
let args = PollingArgs::new(strategy, count, auto_commit);
- shard.permissioner
- .borrow()
- .poll_messages(session.get_user_id(), topic.stream_id,
topic.topic_id)
- .with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - permission denied to poll
messages for user {} on stream ID: {}, topic ID: {}",
- session.get_user_id(),
- topic.stream_id,
- topic.topic_id
- ))?;
-
- if !topic.has_partitions() {
- return Err(IggyError::NoPartitions(topic.topic_id,
topic.stream_id));
- }
-
- // There might be no partition assigned, if it's the consumer group
member without any partitions.
- let Some((consumer, partition_id)) = topic
- .resolve_consumer_with_partition_id(&consumer, session.client_id,
partition_id, true)
- .with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer with partition id, consumer: {consumer}, client
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
- todo!("Send early response");
- //return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
- };
-
- let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
- let payload = ShardRequestPayload::PollMessages {
- consumer: consumer.clone(),
- args,
- count,
- };
- let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
- let message = ShardMessage::Request(request);
- let (metadata, batch) = match shard.send_request_to_shard(&namespace,
message).await {
- ShardRequestResult::SameShard(message) => match message {
- ShardMessage::Request(request) => match request.payload {
- ShardRequestPayload::PollMessages {
- consumer,
- args,
- count,
- } => {
- topic
- .get_messages(consumer, partition_id,
args.strategy, count)
- .await?
- }
- _ => unreachable!(
- "Expected a SendMessages request inside of
SendMessages handler, impossible state"
- ),
- },
- _ => unreachable!(
- "Expected a request message inside of an command handler,
impossible state"
- ),
- },
- ShardRequestResult::Result(result) => match result? {
- ShardResponse::PollMessages(response) => response,
- ShardResponse::ErrorResponse(err) => {
- return Err(err);
- }
- _ => unreachable!(
- "Expected a PollMessages response inside of PollMessages
handler, impossible state"
- ),
- },
- };
-
+ let user_id = session.get_user_id();
+ let client_id = session.client_id;
+ let (metadata, batch) = shard
+ .poll_messages(
+ client_id,
+ user_id,
+ &stream_id,
+ &topic_id,
+ consumer,
+ partition_id,
+ args,
+ )
+ .await?;
// Collect all chunks first into a Vec to extend their lifetimes.
// This ensures the Bytes (in reality Arc<[u8]>) references from each
IggyMessagesBatch stay alive
// throughout the async vectored I/O operation, preventing "borrowed
value does not live
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index f4497bf6..47df8270 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -19,22 +19,20 @@
use super::COMPONENT;
use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
use crate::shard::namespace::IggyNamespace;
use crate::shard::transmission::frame::ShardResponse;
use crate::shard::transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload};
-use crate::shard::{IggyShard, ShardRequestResult};
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
use crate::streaming::session::Session;
use crate::streaming::utils::PooledBuffer;
use anyhow::Result;
use bytes::BytesMut;
-use error_set::ErrContext;
use iggy_common::Identifier;
use iggy_common::Sizeable;
use iggy_common::{INDEX_SIZE, IdKind};
use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
use std::rc::Rc;
-use std::sync::Arc;
use tracing::instrument;
impl ServerCommandHandler for SendMessages {
@@ -112,80 +110,17 @@ impl ServerCommandHandler for SendMessages {
);
batch.validate()?;
- let stream = shard
- .get_stream(&self.stream_id)
- .with_error_context(|error| {
- format!(
- "Failed to get stream with ID: {} (error: {})",
- self.stream_id, error
- )
- })?;
- let topic = stream
- .get_topic(&self.topic_id)
- .with_error_context(|error| {
- format!(
- "Failed to get topic with ID: {} (error: {})",
- self.topic_id, error
- )
- })?;
- let stream_id = stream.stream_id;
- let topic_id = topic.topic_id;
- let partition_id = topic.calculate_partition_id(&self.partitioning)?;
-
- // Validate permissions for given user on stream and topic.
- shard.permissioner.borrow().append_messages(
- session.get_user_id(),
- stream_id,
- topic_id
- ).with_error_context(|error| format!(
- "{COMPONENT} (error: {error}) - permission denied to append
messages for user {} on stream ID: {}, topic ID: {}",
- session.get_user_id(),
- stream_id,
- topic_id
- ))?;
- let messages_count = batch.count();
- shard.metrics.increment_messages(messages_count as u64);
-
- // Encrypt messages if encryptor is enabled in configuration.
- let batch = shard.maybe_encrypt_messages(batch)?;
-
- let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id,
partition_id);
- let payload = ShardRequestPayload::SendMessages { batch };
- let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
- let message = ShardMessage::Request(request);
- // Egh... I don't like those nested match statements,
- // Technically there is only two `request` types that will ever be
dispatched
- // to different shards, thus we could get away with generic structs
- // maybe todo ?
- // how to make this code reusable, in a sense that we will have
exactly the same code inside of
- // `PollMessages` handler, but with different request....
- match shard.send_request_to_shard(&namespace, message).await {
- ShardRequestResult::SameShard(message) => match message {
- ShardMessage::Request(request) => {
- let partition_id = request.partition_id;
- match request.payload {
- ShardRequestPayload::SendMessages { batch } => {
- topic.append_messages(partition_id, batch).await?
- }
- _ => unreachable!(
- "Expected a SendMessages request inside of
SendMessages handler, impossible state"
- ),
- }
- }
- _ => unreachable!(
- "Expected a request message inside of an command handler,
impossible state"
- ),
- },
- ShardRequestResult::Result(result) => match result? {
- ShardResponse::SendMessages => (),
- ShardResponse::ErrorResponse(err) => {
- return Err(err);
- }
- _ => unreachable!(
- "Expected a SendMessages response inside of SendMessages
handler, impossible state"
- ),
- },
- };
+ let user_id = session.get_user_id();
+ shard
+ .append_messages(
+ user_id,
+ &self.stream_id,
+ &self.topic_id,
+ &self.partitioning,
+ batch,
+ )
+ .await?;
+
sender.send_empty_ok_response().await?;
Ok(())
}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 48bf183a..6c3718f8 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -54,7 +54,7 @@ use crate::{
transmission::{
event::ShardEvent,
frame::{ShardFrame, ShardResponse},
- message::{ShardMessage, ShardRequest, ShardRequestPayload},
+ message::{ShardMessage, ShardRequest, ShardRequestPayload,
ShardSendRequestResult},
},
},
state::{
@@ -120,11 +120,6 @@ impl ShardInfo {
}
}
-pub enum ShardRequestResult<T, E> {
- SameShard(ShardMessage),
- Result(Result<T, E>),
-}
-
pub struct IggyShard {
pub id: u16,
shards: Vec<Shard>,
@@ -498,13 +493,9 @@ impl IggyShard {
topic.append_messages(partition_id, batch).await?;
Ok(ShardResponse::SendMessages)
}
- ShardRequestPayload::PollMessages {
- args,
- consumer,
- count,
- } => {
+ ShardRequestPayload::PollMessages { args, consumer } => {
let (metadata, batch) = topic
- .get_messages(consumer, partition_id, args.strategy, count)
+ .get_messages(consumer, partition_id, args.strategy,
args.count)
.await?;
Ok(ShardResponse::PollMessages((metadata, batch)))
}
@@ -551,14 +542,14 @@ impl IggyShard {
}
}
- pub async fn send_request_to_shard(
+ pub async fn send_request_to_shard_or_recoil(
&self,
namespace: &IggyNamespace,
message: ShardMessage,
- ) -> ShardRequestResult<ShardResponse, IggyError> {
+ ) -> Result<ShardSendRequestResult, IggyError> {
if let Some(shard) = self.find_shard(namespace) {
if shard.id == self.id {
- return ShardRequestResult::SameShard(message);
+ return Ok(ShardSendRequestResult::Recoil(message));
}
let response = match shard.send_request(message).await {
@@ -568,16 +559,16 @@ impl IggyShard {
"{COMPONENT} - failed to send request to shard with
ID: {}, error: {err}",
shard.id
);
- return ShardRequestResult::Result(Err(err));
+ return Err(err);
}
};
- ShardRequestResult::Result(Ok(response))
+ Ok(ShardSendRequestResult::Response(response))
} else {
- ShardRequestResult::Result(Err(IggyError::ShardNotFound(
+ Err(IggyError::ShardNotFound(
namespace.stream_id,
namespace.topic_id,
namespace.partition_id,
- )))
+ ))
}
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 36ae234b..b27b034c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -19,83 +19,192 @@
use super::COMPONENT;
use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{
+ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
+};
+use crate::streaming::partitions::partition;
use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet};
use crate::streaming::session::Session;
use crate::streaming::utils::PooledBuffer;
use async_zip::tokio::read::stream;
use error_set::ErrContext;
+use futures::stream_select;
use iggy_common::{
BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE,
Identifier, IggyError,
- Partitioning, PollingStrategy,
+ Partitioning, PollingStrategy, UserId,
};
use tracing::{error, trace};
impl IggyShard {
+ pub async fn append_messages(
+ &self,
+ user_id: u32,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partitioning: &Partitioning,
+ batch: IggyMessagesBatchMut,
+ ) -> Result<(), IggyError> {
+ let stream = self.get_stream(stream_id).with_error_context(|error| {
+ format!(
+ "Failed to get stream with ID: {} (error: {})",
+ stream_id, error
+ )
+ })?;
+ let stream_id = stream.stream_id;
+ let numeric_topic_id = stream.get_topic(topic_id).map(|topic|
topic.topic_id).with_error_context(|error| {
+ format!(
+ "Failed to get topic with ID: {} (error: {})",
+ topic_id, error
+ )
+ })?;
+ // TODO: We should look into refactoring those permissioners, so they
can accept `Identifier` instead of numeric IDs.
+ // Validate permissions for given user on stream and topic.
+ self.permissioner.borrow().append_messages(
+ user_id,
+ stream.stream_id,
+ numeric_topic_id
+ ).with_error_context(|error| format!(
+ "{COMPONENT} (error: {error}) - permission denied to append
messages for user {} on stream ID: {}, topic ID: {}",
+ user_id,
+ stream.stream_id,
+ numeric_topic_id
+ ))?;
+ // Encrypt messages if encryptor is enabled in configuration.
+ let batch = self.maybe_encrypt_messages(batch)?;
+ let messages_count = batch.count();
+ stream
+ .append_messages(topic_id, partitioning, async |topic,
partition_id| {
+ let namespace = IggyNamespace::new(stream_id, topic.topic_id,
partition_id);
+ let payload = ShardRequestPayload::SendMessages { batch };
+ let request = ShardRequest::new(stream_id, numeric_topic_id,
partition_id, payload);
+ let message = ShardMessage::Request(request);
+
+ match self
+ .send_request_to_shard_or_recoil(&namespace, message)
+ .await?
+ {
+ ShardSendRequestResult::Recoil(message) => {
+ if let ShardMessage::Request( ShardRequest {
partition_id, payload, .. } ) = message
+ && let ShardRequestPayload::SendMessages { batch }
= payload
+ {
+ topic.append_messages(partition_id,
batch).await.with_error_context(|error| {
+ format!("{COMPONENT}: Failed to append
messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id:
{partition_id}, error: {error})")
+ })
+ } else {
+ unreachable!(
+ "Expected a SendMessages request inside of
SendMessages handler, impossible state"
+ );
+ }
+ }
+ ShardSendRequestResult::Response(response) => {
+ match response {
+ ShardResponse::SendMessages => { Ok(()) }
+ ShardResponse::ErrorResponse(err) => {
+ Err(err)
+ }
+ _ => unreachable!(
+ "Expected a SendMessages response inside of
SendMessages handler, impossible state"
+ ),
+ }
+
+ }
+ }
+ })
+ .await?;
+
+ self.metrics.increment_messages(messages_count as u64);
+ Ok(())
+ }
+
pub async fn poll_messages(
&self,
- session: &Session,
- consumer: &Consumer,
+ client_id: u32,
+ user_id: u32,
stream_id: &Identifier,
topic_id: &Identifier,
- partition_id: Option<u32>,
+ consumer: Consumer,
+ maybe_partition_id: Option<u32>,
args: PollingArgs,
) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
- self.ensure_authenticated(session)?;
- if args.count == 0 {
- return Err(IggyError::InvalidMessagesCount);
- }
-
let stream = self.get_stream(stream_id).with_error_context(|error| {
- format!("{COMPONENT} (error: {error}) - stream not found for
stream ID: {stream_id}")
+ format!(
+ "{COMPONENT} (error: {error}) - stream not found for stream
ID: {}",
+ stream_id
+ )
})?;
let stream_id = stream.stream_id;
- let topic = self.find_topic(session, &stream,
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) -
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
+ let numeric_topic_id = stream.get_topic(topic_id).map(|topic|
topic.topic_id).with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - topic not found for stream ID:
{}, topic_id: {}",
+ stream_id, topic_id
+ )
+ })?;
+
self.permissioner
.borrow()
- .poll_messages(session.get_user_id(), topic.stream_id,
topic.topic_id)
+ .poll_messages(user_id, stream_id, numeric_topic_id)
.with_error_context(|error| format!(
"{COMPONENT} (error: {error}) - permission denied to poll
messages for user {} on stream ID: {}, topic ID: {}",
- session.get_user_id(),
- topic.stream_id,
- topic.topic_id
+ user_id,
+ stream_id,
+ numeric_topic_id
))?;
- if !topic.has_partitions() {
- return Err(IggyError::NoPartitions(topic.topic_id,
topic.stream_id));
- }
-
// There might be no partition assigned, if it's the consumer group
member without any partitions.
- let Some((polling_consumer, partition_id)) = topic
- .resolve_consumer_with_partition_id(consumer, session.client_id,
partition_id, true)
- .with_error_context(|error| format!("{COMPONENT} (error: {error})
- failed to resolve consumer with partition id, consumer: {consumer}, client
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
- return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
- };
+ //return Ok((IggyPollMetadata::new(0, 0),
IggyMessagesBatchSet::empty()));
- let (metadata, batch_set) = topic
- .get_messages(polling_consumer, partition_id, args.strategy,
args.count)
- .await?;
+ let (metadata, batch) = stream.poll_messages(topic_id, client_id,
consumer, maybe_partition_id, args.auto_commit, async |topic, consumer,
partition_id| {
+ let namespace = IggyNamespace::new(stream.stream_id,
topic.topic_id, partition_id);
+ let payload = ShardRequestPayload::PollMessages {
+ consumer,
+ args,
+ };
+ let request = ShardRequest::new(stream.stream_id, topic.topic_id,
partition_id, payload);
+ let message = ShardMessage::Request(request);
- if args.auto_commit && !batch_set.is_empty() {
- let offset = batch_set
- .last_offset()
- .expect("Batch set should have at least one batch");
- trace!(
- "Last offset: {} will be automatically stored for {}, stream:
{}, topic: {}, partition: {}",
- offset, consumer, stream_id, topic_id, partition_id
- );
- topic
- .store_consumer_offset_internal(polling_consumer, offset,
partition_id)
- .await
- .with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to store consumer offset internal, polling consumer: {},
offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?;
- }
+ match self
+ .send_request_to_shard_or_recoil(&namespace, message)
+ .await?
+ {
+ ShardSendRequestResult::Recoil(message) => {
+ if let ShardMessage::Request( ShardRequest {
partition_id, payload, .. } ) = message
+ && let ShardRequestPayload::PollMessages {
consumer, args } = payload
+ {
+ topic.get_messages(consumer, partition_id,
args.strategy, args.count).await.with_error_context(|error| {
+ format!("{COMPONENT}: Failed to get messages
for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id},
error: {error})")
+ })
+ } else {
+ unreachable!(
+ "Expected a PollMessages request inside of
PollMessages handler, impossible state"
+ );
+ }
+ }
+ ShardSendRequestResult::Response(response) => {
+ match response {
+ ShardResponse::PollMessages(result) => {
Ok(result) }
+ ShardResponse::ErrorResponse(err) => {
+ Err(err)
+ }
+ _ => unreachable!(
+ "Expected a SendMessages response inside of
SendMessages handler, impossible state"
+ ),
+ }
- let batch_set = if let Some(encryptor) = &self.encryptor {
- self.decrypt_messages(batch_set, encryptor).await?
+ }
+ }
+ }).await?;
+
+ let batch = if let Some(_encryptor) = &self.encryptor {
+ //TODO: Bring back decryptor
+ todo!();
+ //self.decrypt_messages(batch, encryptor.as_ref()).await?
} else {
- batch_set
+ batch
};
- Ok((metadata, batch_set))
+ Ok((metadata, batch))
}
pub async fn flush_unsaved_buffer(
@@ -126,45 +235,6 @@ impl IggyShard {
Ok(())
}
- pub async fn decrypt_messages(
- &self,
- batches: IggyMessagesBatchSet,
- encryptor: &EncryptorKind,
- ) -> Result<IggyMessagesBatchSet, IggyError> {
- let mut decrypted_batches =
Vec::with_capacity(batches.containers_count());
- for batch in batches.iter() {
- let count = batch.count();
-
- let mut indexes = IggyIndexesMut::with_capacity(batch.count() as
usize, 0);
- let mut decrypted_messages =
PooledBuffer::with_capacity(batch.size() as usize);
- let mut position = 0;
-
- for message in batch.iter() {
- let payload = encryptor.decrypt(message.payload());
- match payload {
- Ok(payload) => {
- message.header().write_to_buffer(&mut
decrypted_messages);
- decrypted_messages.extend_from_slice(&payload);
- if let Some(user_headers) = message.user_headers() {
- decrypted_messages.extend_from_slice(user_headers);
- }
- indexes.insert(0, position as u32, 0);
- position += message.size();
- }
- Err(error) => {
- error!("Cannot decrypt the message. Error: {}", error);
- continue;
- }
- }
- }
- let decrypted_batch =
- IggyMessagesBatchMut::from_indexes_and_messages(count,
indexes, decrypted_messages);
- decrypted_batches.push(decrypted_batch);
- }
-
- Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
- }
-
pub fn maybe_encrypt_messages(
&self,
batch: IggyMessagesBatchMut,
diff --git a/core/server/src/shard/transmission/message.rs
b/core/server/src/shard/transmission/message.rs
index aa196fde..fe369bad 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -20,10 +20,19 @@ use iggy_common::PollingStrategy;
* under the License.
*/
use crate::{
- shard::{system::messages::PollingArgs, transmission::event::ShardEvent},
+ shard::{
+ system::messages::PollingArgs,
+ transmission::{event::ShardEvent, frame::ShardResponse},
+ },
streaming::{polling_consumer::PollingConsumer,
segments::IggyMessagesBatchMut},
};
+pub enum ShardSendRequestResult {
+ // TODO: In the future we can add other variants, for example backpressure
from the destination shard,
+ Recoil(ShardMessage),
+ Response(ShardResponse),
+}
+
#[derive(Debug)]
pub enum ShardMessage {
Request(ShardRequest),
@@ -60,9 +69,8 @@ pub enum ShardRequestPayload {
batch: IggyMessagesBatchMut,
},
PollMessages {
- args: PollingArgs,
consumer: PollingConsumer,
- count: u32,
+ args: PollingArgs,
},
}
diff --git a/core/server/src/streaming/streams/messages.rs
b/core/server/src/streaming/streams/messages.rs
index 855bda66..bb6ac41e 100644
--- a/core/server/src/streaming/streams/messages.rs
+++ b/core/server/src/streaming/streams/messages.rs
@@ -16,11 +16,99 @@
* under the License.
*/
-use crate::streaming::streams::stream::Stream;
+use error_set::ErrContext;
+use iggy_common::{Consumer, Identifier, IggyError, Partitioning};
+use tracing::trace;
+
+use super::COMPONENT;
+use crate::{
+ binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
+ shard::{
+ namespace::IggyNamespace,
+ transmission::message::{ShardMessage, ShardRequest,
ShardRequestPayload},
+ },
+ streaming::{
+ partitions::partition,
+ polling_consumer::PollingConsumer,
+ segments::{IggyMessagesBatchMut, IggyMessagesBatchSet},
+ streams::stream::Stream,
+ topics::topic::Topic,
+ },
+};
use std::sync::atomic::Ordering;
impl Stream {
pub fn get_messages_count(&self) -> u64 {
self.messages_count.load(Ordering::SeqCst)
}
+
+ // TODO: Create a types module, where we will put types of those closures.
+ pub async fn append_messages(
+ &self,
+ topic_id: &Identifier,
+ partitioning: &Partitioning,
+ append_messages: impl AsyncFnOnce(&Topic, u32) -> Result<(),
IggyError>,
+ ) -> Result<(), IggyError> {
+ // It's quite empty there, but in the future if we would like to
introduce consumer group subscription
+ // based on topic name wildcards, we would have to lift those to
stream level,
+ // thus it would be perfect place to perform those calculations here.
+ let topic = self.get_topic(topic_id).with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to get topic with ID:
{} for stream ID: {}",
+ topic_id, self.stream_id
+ )
+ })?;
+
+ let partition_id = topic.calculate_partition_id(partitioning)?;
+ append_messages(topic, partition_id).await
+ }
+
+ pub async fn poll_messages(
+ &self,
+ topic_id: &Identifier,
+ client_id: u32,
+ consumer: Consumer,
+ maybe_partition_id: Option<u32>,
+ auto_commit: bool,
+ poll_messages: impl AsyncFnOnce(
+ &Topic,
+ PollingConsumer,
+ u32,
+ )
+ -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError>,
+ ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+ // Same as in `append_messages` method.
+ let topic = self.get_topic(topic_id).with_error_context(|error| {
+ format!(
+ "{COMPONENT} (error: {error}) - failed to get topic with ID:
{} for stream ID: {}",
+ topic_id, self.stream_id
+ )
+ })?;
+ if !topic.has_partitions() {
+ return Err(IggyError::NoPartitions(topic.topic_id,
self.stream_id));
+ }
+
+ let Some((consumer, partition_id)) = topic
+ .resolve_consumer_with_partition_id(&consumer, client_id,
maybe_partition_id, true)
+ .with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to resolve consumer with partition id, consumer: {consumer},
client ID: {}, partition ID: {:?}", client_id, maybe_partition_id))? else {
+ todo!("Send early response");
+ };
+
+ let (metadata, batch) = poll_messages(topic, consumer,
partition_id).await?;
+
+ if auto_commit && !batch.is_empty() {
+ let offset = batch
+ .last_offset()
+ .expect("Batch set should have at least one batch");
+ trace!(
+ "Last offset: {} will be automatically stored for {}, stream:
{}, topic: {}, partition: {}",
+ offset, consumer, self.stream_id, topic_id, partition_id
+ );
+ topic
+ .store_consumer_offset_internal(consumer, offset, partition_id)
+ .await
+ .with_error_context(|error| format!("{COMPONENT} (error:
{error}) - failed to store consumer offset internal, polling consumer:
{consumer}, offset: {offset}, partition ID: {partition_id}"))?;
+ }
+ Ok((metadata, batch))
+ }
}