This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch rebase_master
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 63c0b2f4cdf78b2ea7470a7bece901f66683008e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Jun 30 20:26:35 2025 +0200

    refactor(io_uring): refactor the send/poll method flow (#1944)
---
 .../handlers/messages/poll_messages_handler.rs     |  95 ++-------
 .../handlers/messages/send_messages_handler.rs     |  89 ++------
 core/server/src/shard/mod.rs                       |  29 +--
 core/server/src/shard/system/messages.rs           | 236 +++++++++++++--------
 core/server/src/shard/transmission/message.rs      |  14 +-
 core/server/src/streaming/streams/messages.rs      |  90 +++++++-
 6 files changed, 291 insertions(+), 262 deletions(-)

diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 3abfb6a1..dfc4a89f 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -20,11 +20,11 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::messages::COMPONENT;
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard::system::messages::PollingArgs;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
-use crate::shard::{IggyShard, ShardRequestResult};
 use crate::streaming::segments::IggyMessagesBatchSet;
 use crate::streaming::session::Session;
 use crate::to_iovec;
@@ -63,93 +63,30 @@ impl ServerCommandHandler for PollMessages {
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
         debug!("session: {session}, command: {self}");
-
-        let stream = shard
-            .get_stream(&self.stream_id)
-            .with_error_context(|error| {
-                format!(
-                    "{COMPONENT} (error: {error}) - stream not found for 
stream ID: {}",
-                    self.stream_id
-                )
-            })?;
-        let topic = stream.get_topic(
-            &self.topic_id,
-        ).with_error_context(|error| format!(
-            "{COMPONENT} (error: {error}) - topic not found for stream ID: {}, 
topic_id: {}",
-            self.stream_id, self.topic_id
-        ))?;
-
         let PollMessages {
             consumer,
             partition_id,
             strategy,
             count,
             auto_commit,
-            ..
+            stream_id,
+            topic_id,
         } = self;
         let args = PollingArgs::new(strategy, count, auto_commit);
 
-        shard.permissioner
-            .borrow()
-            .poll_messages(session.get_user_id(), topic.stream_id, 
topic.topic_id)
-            .with_error_context(|error| format!(
-                "{COMPONENT} (error: {error}) - permission denied to poll 
messages for user {} on stream ID: {}, topic ID: {}",
-                session.get_user_id(),
-                topic.stream_id,
-                topic.topic_id
-            ))?;
-
-        if !topic.has_partitions() {
-            return Err(IggyError::NoPartitions(topic.topic_id, 
topic.stream_id));
-        }
-
-        // There might be no partition assigned, if it's the consumer group 
member without any partitions.
-        let Some((consumer, partition_id)) = topic
-            .resolve_consumer_with_partition_id(&consumer, session.client_id, 
partition_id, true)
-            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer: {consumer}, client 
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
-                todo!("Send early response");
-            //return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
-        };
-
-        let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
-        let payload = ShardRequestPayload::PollMessages {
-            consumer: consumer.clone(),
-            args,
-            count,
-        };
-        let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
-        let message = ShardMessage::Request(request);
-        let (metadata, batch) = match shard.send_request_to_shard(&namespace, 
message).await {
-            ShardRequestResult::SameShard(message) => match message {
-                ShardMessage::Request(request) => match request.payload {
-                    ShardRequestPayload::PollMessages {
-                        consumer,
-                        args,
-                        count,
-                    } => {
-                        topic
-                            .get_messages(consumer, partition_id, 
args.strategy, count)
-                            .await?
-                    }
-                    _ => unreachable!(
-                        "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
-                    ),
-                },
-                _ => unreachable!(
-                    "Expected a request message inside of an command handler, 
impossible state"
-                ),
-            },
-            ShardRequestResult::Result(result) => match result? {
-                ShardResponse::PollMessages(response) => response,
-                ShardResponse::ErrorResponse(err) => {
-                    return Err(err);
-                }
-                _ => unreachable!(
-                    "Expected a PollMessages response inside of PollMessages 
handler, impossible state"
-                ),
-            },
-        };
-
+        let user_id = session.get_user_id();
+        let client_id = session.client_id;
+        let (metadata, batch) = shard
+            .poll_messages(
+                client_id,
+                user_id,
+                &stream_id,
+                &topic_id,
+                consumer,
+                partition_id,
+                args,
+            )
+            .await?;
         // Collect all chunks first into a Vec to extend their lifetimes.
         // This ensures the Bytes (in reality Arc<[u8]>) references from each 
IggyMessagesBatch stay alive
         // throughout the async vectored I/O operation, preventing "borrowed 
value does not live
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index f4497bf6..47df8270 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -19,22 +19,20 @@
 use super::COMPONENT;
 use crate::binary::command::{BinaryServerCommand, ServerCommandHandler};
 use crate::binary::sender::SenderKind;
+use crate::shard::IggyShard;
 use crate::shard::namespace::IggyNamespace;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload};
-use crate::shard::{IggyShard, ShardRequestResult};
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::utils::PooledBuffer;
 use anyhow::Result;
 use bytes::BytesMut;
-use error_set::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::Sizeable;
 use iggy_common::{INDEX_SIZE, IdKind};
 use iggy_common::{IggyError, Partitioning, SendMessages, Validatable};
 use std::rc::Rc;
-use std::sync::Arc;
 use tracing::instrument;
 
 impl ServerCommandHandler for SendMessages {
@@ -112,80 +110,17 @@ impl ServerCommandHandler for SendMessages {
         );
         batch.validate()?;
 
-        let stream = shard
-            .get_stream(&self.stream_id)
-            .with_error_context(|error| {
-                format!(
-                    "Failed to get stream with ID: {} (error: {})",
-                    self.stream_id, error
-                )
-            })?;
-        let topic = stream
-            .get_topic(&self.topic_id)
-            .with_error_context(|error| {
-                format!(
-                    "Failed to get topic with ID: {} (error: {})",
-                    self.topic_id, error
-                )
-            })?;
-        let stream_id = stream.stream_id;
-        let topic_id = topic.topic_id;
-        let partition_id = topic.calculate_partition_id(&self.partitioning)?;
-
-        // Validate permissions for given user on stream and topic.
-        shard.permissioner.borrow().append_messages(
-            session.get_user_id(),
-            stream_id,
-            topic_id
-        ).with_error_context(|error| format!(
-            "{COMPONENT} (error: {error}) - permission denied to append 
messages for user {} on stream ID: {}, topic ID: {}",
-            session.get_user_id(),
-            stream_id,
-            topic_id
-        ))?;
-        let messages_count = batch.count();
-        shard.metrics.increment_messages(messages_count as u64);
-
-        // Encrypt messages if encryptor is enabled in configuration.
-        let batch = shard.maybe_encrypt_messages(batch)?;
-
-        let namespace = IggyNamespace::new(stream.stream_id, topic.topic_id, 
partition_id);
-        let payload = ShardRequestPayload::SendMessages { batch };
-        let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
-        let message = ShardMessage::Request(request);
-        // Egh... I don't like those nested match statements,
-        // Technically there is only two `request` types that will ever be 
dispatched
-        // to different shards, thus we could get away with generic structs
-        // maybe todo ?
-        // how to make this code reusable, in a sense that we will have 
exactly the same code inside of
-        // `PollMessages` handler, but with different request....
-        match shard.send_request_to_shard(&namespace, message).await {
-            ShardRequestResult::SameShard(message) => match message {
-                ShardMessage::Request(request) => {
-                    let partition_id = request.partition_id;
-                    match request.payload {
-                        ShardRequestPayload::SendMessages { batch } => {
-                            topic.append_messages(partition_id, batch).await?
-                        }
-                        _ => unreachable!(
-                            "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
-                        ),
-                    }
-                }
-                _ => unreachable!(
-                    "Expected a request message inside of an command handler, 
impossible state"
-                ),
-            },
-            ShardRequestResult::Result(result) => match result? {
-                ShardResponse::SendMessages => (),
-                ShardResponse::ErrorResponse(err) => {
-                    return Err(err);
-                }
-                _ => unreachable!(
-                    "Expected a SendMessages response inside of SendMessages 
handler, impossible state"
-                ),
-            },
-        };
+        let user_id = session.get_user_id();
+        shard
+            .append_messages(
+                user_id,
+                &self.stream_id,
+                &self.topic_id,
+                &self.partitioning,
+                batch,
+            )
+            .await?;
+
         sender.send_empty_ok_response().await?;
         Ok(())
     }
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 48bf183a..6c3718f8 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -54,7 +54,7 @@ use crate::{
         transmission::{
             event::ShardEvent,
             frame::{ShardFrame, ShardResponse},
-            message::{ShardMessage, ShardRequest, ShardRequestPayload},
+            message::{ShardMessage, ShardRequest, ShardRequestPayload, 
ShardSendRequestResult},
         },
     },
     state::{
@@ -120,11 +120,6 @@ impl ShardInfo {
     }
 }
 
-pub enum ShardRequestResult<T, E> {
-    SameShard(ShardMessage),
-    Result(Result<T, E>),
-}
-
 pub struct IggyShard {
     pub id: u16,
     shards: Vec<Shard>,
@@ -498,13 +493,9 @@ impl IggyShard {
                 topic.append_messages(partition_id, batch).await?;
                 Ok(ShardResponse::SendMessages)
             }
-            ShardRequestPayload::PollMessages {
-                args,
-                consumer,
-                count,
-            } => {
+            ShardRequestPayload::PollMessages { args, consumer } => {
                 let (metadata, batch) = topic
-                    .get_messages(consumer, partition_id, args.strategy, count)
+                    .get_messages(consumer, partition_id, args.strategy, 
args.count)
                     .await?;
                 Ok(ShardResponse::PollMessages((metadata, batch)))
             }
@@ -551,14 +542,14 @@ impl IggyShard {
         }
     }
 
-    pub async fn send_request_to_shard(
+    pub async fn send_request_to_shard_or_recoil(
         &self,
         namespace: &IggyNamespace,
         message: ShardMessage,
-    ) -> ShardRequestResult<ShardResponse, IggyError> {
+    ) -> Result<ShardSendRequestResult, IggyError> {
         if let Some(shard) = self.find_shard(namespace) {
             if shard.id == self.id {
-                return ShardRequestResult::SameShard(message);
+                return Ok(ShardSendRequestResult::Recoil(message));
             }
 
             let response = match shard.send_request(message).await {
@@ -568,16 +559,16 @@ impl IggyShard {
                         "{COMPONENT} - failed to send request to shard with 
ID: {}, error: {err}",
                         shard.id
                     );
-                    return ShardRequestResult::Result(Err(err));
+                    return Err(err);
                 }
             };
-            ShardRequestResult::Result(Ok(response))
+            Ok(ShardSendRequestResult::Response(response))
         } else {
-            ShardRequestResult::Result(Err(IggyError::ShardNotFound(
+            Err(IggyError::ShardNotFound(
                 namespace.stream_id,
                 namespace.topic_id,
                 namespace.partition_id,
-            )))
+            ))
         }
     }
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 36ae234b..b27b034c 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -19,83 +19,192 @@
 use super::COMPONENT;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::shard::IggyShard;
+use crate::shard::namespace::IggyNamespace;
+use crate::shard::transmission::frame::ShardResponse;
+use crate::shard::transmission::message::{
+    ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
+};
+use crate::streaming::partitions::partition;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use crate::streaming::session::Session;
 use crate::streaming::utils::PooledBuffer;
 use async_zip::tokio::read::stream;
 use error_set::ErrContext;
+use futures::stream_select;
 use iggy_common::{
     BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, 
Identifier, IggyError,
-    Partitioning, PollingStrategy,
+    Partitioning, PollingStrategy, UserId,
 };
 use tracing::{error, trace};
 
 impl IggyShard {
+    pub async fn append_messages(
+        &self,
+        user_id: u32,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partitioning: &Partitioning,
+        batch: IggyMessagesBatchMut,
+    ) -> Result<(), IggyError> {
+        let stream = self.get_stream(stream_id).with_error_context(|error| {
+            format!(
+                "Failed to get stream with ID: {} (error: {})",
+                stream_id, error
+            )
+        })?;
+        let stream_id = stream.stream_id;
+        let numeric_topic_id = stream.get_topic(topic_id).map(|topic| 
topic.topic_id).with_error_context(|error| {
+            format!(
+                "Failed to get topic with ID: {} (error: {})",
+                topic_id, error
+            )
+        })?;
+        // TODO: We should look into refactoring those permissioners, so they 
can accept `Identifier` instead of numeric IDs.
+        // Validate permissions for given user on stream and topic.
+        self.permissioner.borrow().append_messages(
+            user_id,
+            stream.stream_id,
+            numeric_topic_id
+        ).with_error_context(|error| format!(
+            "{COMPONENT} (error: {error}) - permission denied to append 
messages for user {} on stream ID: {}, topic ID: {}",
+            user_id,
+            stream.stream_id,
+            numeric_topic_id
+        ))?;
+        // Encrypt messages if encryptor is enabled in configuration.
+        let batch = self.maybe_encrypt_messages(batch)?;
+        let messages_count = batch.count();
+        stream
+            .append_messages(topic_id, partitioning, async |topic, 
partition_id| {
+                let namespace = IggyNamespace::new(stream_id, topic.topic_id, 
partition_id);
+                let payload = ShardRequestPayload::SendMessages { batch };
+                let request = ShardRequest::new(stream_id, numeric_topic_id, 
partition_id, payload);
+                let message = ShardMessage::Request(request);
+
+                match self
+                    .send_request_to_shard_or_recoil(&namespace, message)
+                    .await?
+                {
+                    ShardSendRequestResult::Recoil(message) => {
+                        if let ShardMessage::Request( ShardRequest { 
partition_id, payload, .. } ) = message
+                            && let ShardRequestPayload::SendMessages { batch } 
= payload
+                        {
+                            topic.append_messages(partition_id, 
batch).await.with_error_context(|error| {
+                                format!("{COMPONENT}: Failed to append 
messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: 
{partition_id}, error: {error})")
+                            })
+                        } else {
+                            unreachable!(
+                                "Expected a SendMessages request inside of 
SendMessages handler, impossible state"
+                            );
+                        }
+                    }
+                    ShardSendRequestResult::Response(response) => {
+                        match response {
+                            ShardResponse::SendMessages =>  { Ok(()) }
+                            ShardResponse::ErrorResponse(err) => {
+                                Err(err)
+                            }
+                            _ => unreachable!(
+                                "Expected a SendMessages response inside of 
SendMessages handler, impossible state"
+                            ),
+                        }
+
+                    }
+                }
+            })
+            .await?;
+
+        self.metrics.increment_messages(messages_count as u64);
+        Ok(())
+    }
+
     pub async fn poll_messages(
         &self,
-        session: &Session,
-        consumer: &Consumer,
+        client_id: u32,
+        user_id: u32,
         stream_id: &Identifier,
         topic_id: &Identifier,
-        partition_id: Option<u32>,
+        consumer: Consumer,
+        maybe_partition_id: Option<u32>,
         args: PollingArgs,
     ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
-        self.ensure_authenticated(session)?;
-        if args.count == 0 {
-            return Err(IggyError::InvalidMessagesCount);
-        }
-
         let stream = self.get_stream(stream_id).with_error_context(|error| {
-            format!("{COMPONENT} (error: {error}) - stream not found for 
stream ID: {stream_id}")
+            format!(
+                "{COMPONENT} (error: {error}) - stream not found for stream 
ID: {}",
+                stream_id
+            )
         })?;
         let stream_id = stream.stream_id;
-        let topic = self.find_topic(session, &stream, 
topic_id).with_error_context(|error| format!("{COMPONENT} (error: {error}) - 
topic not found for stream ID: {stream_id}, topic_id: {topic_id}"))?;
+        let numeric_topic_id = stream.get_topic(topic_id).map(|topic| 
topic.topic_id).with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - topic not found for stream ID: 
{}, topic_id: {}",
+                stream_id, topic_id
+            )
+        })?;
+
         self.permissioner
             .borrow()
-            .poll_messages(session.get_user_id(), topic.stream_id, 
topic.topic_id)
+            .poll_messages(user_id, stream_id, numeric_topic_id)
             .with_error_context(|error| format!(
                 "{COMPONENT} (error: {error}) - permission denied to poll 
messages for user {} on stream ID: {}, topic ID: {}",
-                session.get_user_id(),
-                topic.stream_id,
-                topic.topic_id
+                user_id,
+                stream_id,
+                numeric_topic_id
             ))?;
 
-        if !topic.has_partitions() {
-            return Err(IggyError::NoPartitions(topic.topic_id, 
topic.stream_id));
-        }
-
         // There might be no partition assigned, if it's the consumer group 
member without any partitions.
-        let Some((polling_consumer, partition_id)) = topic
-            .resolve_consumer_with_partition_id(consumer, session.client_id, 
partition_id, true)
-            .with_error_context(|error| format!("{COMPONENT} (error: {error}) 
- failed to resolve consumer with partition id, consumer: {consumer}, client 
ID: {}, partition ID: {:?}", session.client_id, partition_id))? else {
-            return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
-        };
+        //return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
 
-        let (metadata, batch_set) = topic
-            .get_messages(polling_consumer, partition_id, args.strategy, 
args.count)
-            .await?;
+        let (metadata, batch) = stream.poll_messages(topic_id, client_id, 
consumer, maybe_partition_id, args.auto_commit, async |topic, consumer, 
partition_id|  {
+            let namespace = IggyNamespace::new(stream.stream_id, 
topic.topic_id, partition_id);
+            let payload = ShardRequestPayload::PollMessages {
+                consumer,
+                args,
+            };
+            let request = ShardRequest::new(stream.stream_id, topic.topic_id, 
partition_id, payload);
+            let message = ShardMessage::Request(request);
 
-        if args.auto_commit && !batch_set.is_empty() {
-            let offset = batch_set
-                .last_offset()
-                .expect("Batch set should have at least one batch");
-            trace!(
-                "Last offset: {} will be automatically stored for {}, stream: 
{}, topic: {}, partition: {}",
-                offset, consumer, stream_id, topic_id, partition_id
-            );
-            topic
-                .store_consumer_offset_internal(polling_consumer, offset, 
partition_id)
-                .await
-                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: {}, 
offset: {}, partition ID: {}", polling_consumer, offset, partition_id))?;
-        }
+            match self
+                    .send_request_to_shard_or_recoil(&namespace, message)
+                    .await?
+                {
+                    ShardSendRequestResult::Recoil(message) => {
+                        if let ShardMessage::Request( ShardRequest { 
partition_id, payload, .. } ) = message
+                            && let ShardRequestPayload::PollMessages { 
consumer, args } = payload
+                        {
+                            topic.get_messages(consumer, partition_id, 
args.strategy, args.count).await.with_error_context(|error| {
+                                format!("{COMPONENT}: Failed to get messages 
for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, 
error: {error})")
+                            })
+                        } else {
+                            unreachable!(
+                                "Expected a PollMessages request inside of 
PollMessages handler, impossible state"
+                            );
+                        }
+                    }
+                    ShardSendRequestResult::Response(response) => {
+                        match response {
+                            ShardResponse::PollMessages(result) =>  { 
Ok(result) }
+                            ShardResponse::ErrorResponse(err) => {
+                                Err(err)
+                            }
+                            _ => unreachable!(
+                                "Expected a SendMessages response inside of 
SendMessages handler, impossible state"
+                            ),
+                        }
 
-        let batch_set = if let Some(encryptor) = &self.encryptor {
-            self.decrypt_messages(batch_set, encryptor).await?
+                    }
+                }
+        }).await?;
+
+        let batch = if let Some(_encryptor) = &self.encryptor {
+            //TODO: Bring back decryptor
+            todo!();
+            //self.decrypt_messages(batch, encryptor.as_ref()).await?
         } else {
-            batch_set
+            batch
         };
 
-        Ok((metadata, batch_set))
+        Ok((metadata, batch))
     }
 
     pub async fn flush_unsaved_buffer(
@@ -126,45 +235,6 @@ impl IggyShard {
         Ok(())
     }
 
-    pub async fn decrypt_messages(
-        &self,
-        batches: IggyMessagesBatchSet,
-        encryptor: &EncryptorKind,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
-        let mut decrypted_batches = 
Vec::with_capacity(batches.containers_count());
-        for batch in batches.iter() {
-            let count = batch.count();
-
-            let mut indexes = IggyIndexesMut::with_capacity(batch.count() as 
usize, 0);
-            let mut decrypted_messages = 
PooledBuffer::with_capacity(batch.size() as usize);
-            let mut position = 0;
-
-            for message in batch.iter() {
-                let payload = encryptor.decrypt(message.payload());
-                match payload {
-                    Ok(payload) => {
-                        message.header().write_to_buffer(&mut 
decrypted_messages);
-                        decrypted_messages.extend_from_slice(&payload);
-                        if let Some(user_headers) = message.user_headers() {
-                            decrypted_messages.extend_from_slice(user_headers);
-                        }
-                        indexes.insert(0, position as u32, 0);
-                        position += message.size();
-                    }
-                    Err(error) => {
-                        error!("Cannot decrypt the message. Error: {}", error);
-                        continue;
-                    }
-                }
-            }
-            let decrypted_batch =
-                IggyMessagesBatchMut::from_indexes_and_messages(count, 
indexes, decrypted_messages);
-            decrypted_batches.push(decrypted_batch);
-        }
-
-        Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
-    }
-
     pub fn maybe_encrypt_messages(
         &self,
         batch: IggyMessagesBatchMut,
diff --git a/core/server/src/shard/transmission/message.rs 
b/core/server/src/shard/transmission/message.rs
index aa196fde..fe369bad 100644
--- a/core/server/src/shard/transmission/message.rs
+++ b/core/server/src/shard/transmission/message.rs
@@ -20,10 +20,19 @@ use iggy_common::PollingStrategy;
  * under the License.
  */
 use crate::{
-    shard::{system::messages::PollingArgs, transmission::event::ShardEvent},
+    shard::{
+        system::messages::PollingArgs,
+        transmission::{event::ShardEvent, frame::ShardResponse},
+    },
     streaming::{polling_consumer::PollingConsumer, 
segments::IggyMessagesBatchMut},
 };
 
+pub enum ShardSendRequestResult {
+    // TODO: In the future we can add other variants, for example backpressure 
from the destination shard,
+    Recoil(ShardMessage),
+    Response(ShardResponse),
+}
+
 #[derive(Debug)]
 pub enum ShardMessage {
     Request(ShardRequest),
@@ -60,9 +69,8 @@ pub enum ShardRequestPayload {
         batch: IggyMessagesBatchMut,
     },
     PollMessages {
-        args: PollingArgs,
         consumer: PollingConsumer,
-        count: u32,
+        args: PollingArgs,
     },
 }
 
diff --git a/core/server/src/streaming/streams/messages.rs 
b/core/server/src/streaming/streams/messages.rs
index 855bda66..bb6ac41e 100644
--- a/core/server/src/streaming/streams/messages.rs
+++ b/core/server/src/streaming/streams/messages.rs
@@ -16,11 +16,99 @@
  * under the License.
  */
 
-use crate::streaming::streams::stream::Stream;
+use error_set::ErrContext;
+use iggy_common::{Consumer, Identifier, IggyError, Partitioning};
+use tracing::trace;
+
+use super::COMPONENT;
+use crate::{
+    binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
+    shard::{
+        namespace::IggyNamespace,
+        transmission::message::{ShardMessage, ShardRequest, 
ShardRequestPayload},
+    },
+    streaming::{
+        partitions::partition,
+        polling_consumer::PollingConsumer,
+        segments::{IggyMessagesBatchMut, IggyMessagesBatchSet},
+        streams::stream::Stream,
+        topics::topic::Topic,
+    },
+};
 use std::sync::atomic::Ordering;
 
 impl Stream {
     pub fn get_messages_count(&self) -> u64 {
         self.messages_count.load(Ordering::SeqCst)
     }
+
+    // TODO: Create a types module, where we will put types of those closures.
+    pub async fn append_messages(
+        &self,
+        topic_id: &Identifier,
+        partitioning: &Partitioning,
+        append_messages: impl AsyncFnOnce(&Topic, u32) -> Result<(), 
IggyError>,
+    ) -> Result<(), IggyError> {
+        // It's quite empty there, but in the future if we would like to 
introduce consumer group subscription
+        // based on topic name wildcards, we would have to lift those to 
stream level,
+        // thus it would be perfect place to perform those calculations here.
+        let topic = self.get_topic(topic_id).with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to get topic with ID: 
{} for stream ID: {}",
+                topic_id, self.stream_id
+            )
+        })?;
+
+        let partition_id = topic.calculate_partition_id(partitioning)?;
+        append_messages(topic, partition_id).await
+    }
+
+    pub async fn poll_messages(
+        &self,
+        topic_id: &Identifier,
+        client_id: u32,
+        consumer: Consumer,
+        maybe_partition_id: Option<u32>,
+        auto_commit: bool,
+        poll_messages: impl AsyncFnOnce(
+            &Topic,
+            PollingConsumer,
+            u32,
+        )
+            -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError>,
+    ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+        // Same as in `append_messages` method.
+        let topic = self.get_topic(topic_id).with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to get topic with ID: 
{} for stream ID: {}",
+                topic_id, self.stream_id
+            )
+        })?;
+        if !topic.has_partitions() {
+            return Err(IggyError::NoPartitions(topic.topic_id, 
self.stream_id));
+        }
+
+        let Some((consumer, partition_id)) = topic
+                .resolve_consumer_with_partition_id(&consumer, client_id, 
maybe_partition_id, true)
+                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to resolve consumer with partition id, consumer: {consumer}, 
client ID: {}, partition ID: {:?}", client_id, maybe_partition_id))? else {
+                todo!("Send early response");
+                };
+
+        let (metadata, batch) = poll_messages(topic, consumer, 
partition_id).await?;
+
+        if auto_commit && !batch.is_empty() {
+            let offset = batch
+                .last_offset()
+                .expect("Batch set should have at least one batch");
+            trace!(
+                "Last offset: {} will be automatically stored for {}, stream: 
{}, topic: {}, partition: {}",
+                offset, consumer, self.stream_id, topic_id, partition_id
+            );
+            topic
+                .store_consumer_offset_internal(consumer, offset, partition_id)
+                .await
+                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: 
{consumer}, offset: {offset}, partition ID: {partition_id}"))?;
+        }
+        Ok((metadata, batch))
+    }
 }

Reply via email to