This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new a822c449 feat(io_uring): fix encryptor (#2211)
a822c449 is described below
commit a822c449a1dd3ee6864be2703c32ab3681206a6f
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sat Sep 27 11:18:58 2025 +0200
feat(io_uring): fix encryptor (#2211)
---
core/server/src/shard/system/messages.rs | 55 ++++++++++++++++++++++++++++----
1 file changed, 49 insertions(+), 6 deletions(-)
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 6acf72d1..a64602d0 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -36,8 +36,7 @@ use crate::streaming::{partitions, streams, topics};
use error_set::ErrContext;
use iggy_common::{
- BytesSerializable, Consumer, IGGY_MESSAGE_HEADER_SIZE, Identifier,
IggyError, IggyTimestamp,
- Partitioning, PartitioningKind, PollingKind, PollingStrategy,
+ BytesSerializable, Consumer, EncryptorKind, Identifier, IggyError,
IggyTimestamp, Partitioning, PartitioningKind, PollingKind, PollingStrategy,
IGGY_MESSAGE_HEADER_SIZE
};
use tracing::{error, trace};
@@ -332,10 +331,8 @@ impl IggyShard {
},
}?;
- let batch = if let Some(_encryptor) = &self.encryptor {
- //TODO: Bring back decryptor
- todo!();
- //self.decrypt_messages(batch, encryptor.as_ref()).await?
+ let batch = if let Some(encryptor) = &self.encryptor {
+ self.decrypt_messages(batch, encryptor).await?
} else {
batch
};
@@ -355,6 +352,52 @@ impl IggyShard {
todo!();
}
+
+ async fn decrypt_messages(
+ &self,
+ batches: IggyMessagesBatchSet,
+ encryptor: &EncryptorKind,
+ ) -> Result<IggyMessagesBatchSet, IggyError> {
+ let mut decrypted_batches =
Vec::with_capacity(batches.containers_count());
+ for batch in batches.iter() {
+ let count = batch.count();
+
+ let mut indexes = IggyIndexesMut::with_capacity(batch.count() as
usize, 0);
+ let mut decrypted_messages =
PooledBuffer::with_capacity(batch.size() as usize);
+ let mut position = 0;
+
+ for message in batch.iter() {
+ let payload = encryptor.decrypt(message.payload());
+ match payload {
+ Ok(payload) => {
+ // Update the header with the decrypted payload length
+ let mut header = message.header().to_header();
+ header.payload_length = payload.len() as u32;
+
+
decrypted_messages.extend_from_slice(&header.to_bytes());
+ decrypted_messages.extend_from_slice(&payload);
+ if let Some(user_headers) = message.user_headers() {
+ decrypted_messages.extend_from_slice(user_headers);
+ }
+ position += IGGY_MESSAGE_HEADER_SIZE
+ + payload.len()
+ + message.header().user_headers_length();
+ indexes.insert(0, position as u32, 0);
+ }
+ Err(error) => {
+ error!("Cannot decrypt the message. Error: {}", error);
+ continue;
+ }
+ }
+ }
+ let decrypted_batch =
+ IggyMessagesBatchMut::from_indexes_and_messages(count,
indexes, decrypted_messages);
+ decrypted_batches.push(decrypted_batch);
+ }
+
+ Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
+ }
+
pub fn maybe_encrypt_messages(
&self,
batch: IggyMessagesBatchMut,