This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new a822c449 feat(io_uring): fix encryptor (#2211)
a822c449 is described below

commit a822c449a1dd3ee6864be2703c32ab3681206a6f
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Sep 27 11:18:58 2025 +0200

    feat(io_uring): fix encryptor (#2211)
---
 core/server/src/shard/system/messages.rs | 55 ++++++++++++++++++++++++++++----
 1 file changed, 49 insertions(+), 6 deletions(-)

diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 6acf72d1..a64602d0 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -36,8 +36,7 @@ use crate::streaming::{partitions, streams, topics};
 use error_set::ErrContext;
 
 use iggy_common::{
-    BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier, 
IggyError, IggyTimestamp,
-    Partitioning, PartitioningKind, PollingKind, PollingStrategy,
+    BytesSerializable, Consumer, EncryptorKind, Identifier, IggyError, 
IggyTimestamp, Partitioning, PartitioningKind, PollingKind, PollingStrategy, 
IGGY_MESSAGE_HEADER_SIZE
 };
 use tracing::{error, trace};
 
@@ -332,10 +331,8 @@ impl IggyShard {
             },
         }?;
 
-        let batch = if let Some(_encryptor) = &self.encryptor {
-            //TODO: Bring back decryptor
-            todo!();
-            //self.decrypt_messages(batch, encryptor.as_ref()).await?
+        let batch = if let Some(encryptor) = &self.encryptor {
+            self.decrypt_messages(batch, encryptor).await?
         } else {
             batch
         };
@@ -355,6 +352,52 @@ impl IggyShard {
         todo!();
     }
 
+
+    async fn decrypt_messages(
+        &self,
+        batches: IggyMessagesBatchSet,
+        encryptor: &EncryptorKind,
+    ) -> Result<IggyMessagesBatchSet, IggyError> {
+        let mut decrypted_batches = 
Vec::with_capacity(batches.containers_count());
+        for batch in batches.iter() {
+            let count = batch.count();
+
+            let mut indexes = IggyIndexesMut::with_capacity(batch.count() as 
usize, 0);
+            let mut decrypted_messages = 
PooledBuffer::with_capacity(batch.size() as usize);
+            let mut position = 0;
+
+            for message in batch.iter() {
+                let payload = encryptor.decrypt(message.payload());
+                match payload {
+                    Ok(payload) => {
+                        // Update the header with the decrypted payload length
+                        let mut header = message.header().to_header();
+                        header.payload_length = payload.len() as u32;
+
+                        
decrypted_messages.extend_from_slice(&header.to_bytes());
+                        decrypted_messages.extend_from_slice(&payload);
+                        if let Some(user_headers) = message.user_headers() {
+                            decrypted_messages.extend_from_slice(user_headers);
+                        }
+                        position += IGGY_MESSAGE_HEADER_SIZE
+                            + payload.len()
+                            + message.header().user_headers_length();
+                        indexes.insert(0, position as u32, 0);
+                    }
+                    Err(error) => {
+                        error!("Cannot decrypt the message. Error: {}", error);
+                        continue;
+                    }
+                }
+            }
+            let decrypted_batch =
+                IggyMessagesBatchMut::from_indexes_and_messages(count, 
indexes, decrypted_messages);
+            decrypted_batches.push(decrypted_batch);
+        }
+
+        Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
+    }
+
     pub fn maybe_encrypt_messages(
         &self,
         batch: IggyMessagesBatchMut,

Reply via email to