This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 5a7cde2e3162f89978afc6f0892ab71bfc76151e Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Mar 23 20:41:21 2025 +0100 fixes in sdk --- sdk/src/cli/message/poll_messages.rs | 105 ++++----- sdk/src/clients/consumer.rs | 235 ++++++++++---------- sdk/src/consumer_ext/consumer_message_ext.rs | 3 +- sdk/src/consumer_ext/mod.rs | 7 +- sdk/src/messages/send_messages.rs | 244 ++++++++------------- sdk/src/models/messaging/indexes.rs | 88 +------- sdk/src/models/messaging/message.rs | 37 +++- server/src/streaming/partitions/messages.rs | 69 +++--- server/src/streaming/partitions/storage.rs | 5 +- .../streaming/segments/messages/messages_writer.rs | 5 - .../src/streaming/segments/messages_accumulator.rs | 2 - server/src/streaming/segments/reading_messages.rs | 16 -- server/src/streaming/segments/segment.rs | 13 +- .../streaming/segments/types/message_view_mut.rs | 6 +- .../streaming/segments/types/messages_batch_set.rs | 17 -- server/src/streaming/segments/writing_messages.rs | 3 +- server/src/streaming/topics/messages.rs | 7 +- server/src/tcp/connection_handler.rs | 6 - 18 files changed, 348 insertions(+), 520 deletions(-) diff --git a/sdk/src/cli/message/poll_messages.rs b/sdk/src/cli/message/poll_messages.rs index 67196408..b153f4e8 100644 --- a/sdk/src/cli/message/poll_messages.rs +++ b/sdk/src/cli/message/poll_messages.rs @@ -5,8 +5,7 @@ use crate::consumer::Consumer; use crate::identifier::Identifier; use crate::messages::{PollMessages, PollingStrategy}; use crate::models::messaging::HeaderKind; -use crate::prelude::{HeaderKey, IggyMessage}; -use crate::utils::sizeable::Sizeable; +use crate::prelude::{HeaderKey, HeaderValue, IggyMessage}; use crate::utils::timestamp::IggyTimestamp; use crate::utils::{byte_size::IggyByteSize, duration::IggyDuration}; use anyhow::Context; @@ -63,22 +62,27 @@ impl PollMessagesCmd { fn create_message_header_keys( &self, - polled_messages: &Vec<IggyMessage>, + polled_messages: &[IggyMessage], ) -> HashSet<(HeaderKey, HeaderKind)> { - todo!(); - // if !self.show_headers { - // return HashSet::new(); - // } - // polled_messages - // .iter() - // .flat_map(|m| match m.user_headers.as_ref() { - // Some(h) => HashMap::from_bytes(h.clone()) - // .iter() - // .map(|(k, v)| (k.clone(), v.kind)) - // .collect::<Vec<_>>(), - // None => vec![], - // }) - // .collect::<HashSet<_>>() + if !self.show_headers { + return HashSet::new(); + } + polled_messages + .iter() + .flat_map(|m| match m.user_headers.as_ref() { + Some(h) => match HashMap::<HeaderKey, HeaderValue>::from_bytes(h.clone()) { + Ok(headers) => headers + .iter() + .map(|(k, v)| (k.clone(), v.kind)) + .collect::<Vec<_>>(), + Err(e) => { + tracing::error!("Failed to parse user headers, error: {e}"); + vec![] + } + }, + None => vec![], + }) + .collect::<HashSet<_>>() } fn create_table_header(header_key_set: &HashSet<(HeaderKey, HeaderKind)>) -> Row { @@ -101,42 +105,41 @@ impl PollMessagesCmd { } fn create_table_content( - polled_messages: &Vec<IggyMessage>, + polled_messages: &[IggyMessage], message_header_keys: &HashSet<(HeaderKey, HeaderKind)>, ) -> Vec<Row> { - todo!(); - // polled_messages - // .iter() - // .map(|message| { - // let mut row = vec![ - // format!("{}", message.header.offset), - // IggyTimestamp::from(message.header.timestamp) - // .to_local_string("%Y-%m-%d %H:%M:%S%.6f"), - // format!("{}", message.header.id), - // format!("{}", message.payload.len()), - // String::from_utf8_lossy(&message.payload).to_string(), - // ]; - - // let values = message_header_keys - // .iter() - // .map(|(key, kind)| { - // let user_headers = HashMap::from_bytes(message.user_headers); - // message - // .user_headers - // .as_ref() - // .map(|h| { - // h.get(key) - // .filter(|v| v.kind == *kind) - // .map(|v| v.value_only_to_string()) - // .unwrap_or_default() - // }) - // .unwrap_or_default() - // }) - // .collect::<Vec<_>>(); - // row.extend(values); - // Row::from(row) - // }) - // .collect::<_>() + polled_messages + .iter() + .map(|message| { + let mut row = vec![ + format!("{}", message.header.offset), + IggyTimestamp::from(message.header.timestamp) + .to_local_string("%Y-%m-%d %H:%M:%S%.6f"), + format!("{}", message.header.id), + format!("{}", message.payload.len()), + String::from_utf8_lossy(&message.payload).to_string(), + ]; + + let values = message_header_keys + .iter() + .map(|(key, kind)| { + message + .user_headers_map() + .expect("Failed to parse user headers") + .as_ref() + .map(|h| { + h.get(key) + .filter(|v| v.kind == *kind) + .map(|v| v.value_only_to_string()) + .unwrap_or_default() + }) + .unwrap_or_default() + }) + .collect::<Vec<_>>(); + row.extend(values); + Row::from(row) + }) + .collect::<_>() } } diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs index 2e50e86e..9a621030 100644 --- a/sdk/src/clients/consumer.rs +++ b/sdk/src/clients/consumer.rs @@ -5,8 +5,7 @@ use crate::error::IggyError; use crate::identifier::{IdKind, Identifier}; use crate::locking::{IggySharedMut, IggySharedMutFn}; use crate::messages::PollingStrategy; -use crate::prelude::{IggyMessage, PolledMessages}; -use crate::utils::byte_size::IggyByteSize; +use crate::prelude::{IggyMessage, PolledMessages, PollingKind}; use crate::utils::crypto::EncryptorKind; use crate::utils::duration::IggyDuration; use crate::utils::timestamp::IggyTimestamp; @@ -25,10 +24,8 @@ use tokio::time; use tokio::time::sleep; use tracing::{error, info, trace, warn}; -//const EMPTY_MESSAGES: Vec<PolledMessage> = Vec::new(); - const ORDERING: std::sync::atomic::Ordering = std::sync::atomic::Ordering::SeqCst; -type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<Vec<IggyMessage>, IggyError>>>>; +type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages, IggyError>>>>; /// The auto-commit configuration for storing the offset on the server. #[derive(Debug, PartialEq, Copy, Clone)] @@ -102,7 +99,7 @@ pub struct IggyConsumer { last_consumed_offsets: Arc<DashMap<u32, AtomicU64>>, current_offsets: Arc<DashMap<u32, AtomicU64>>, poll_future: Option<PollMessagesFuture>, - buffered_messages: VecDeque<Vec<IggyMessage>>, + buffered_messages: VecDeque<IggyMessage>, encryptor: Option<Arc<EncryptorKind>>, store_offset_sender: flume::Sender<(u32, u64)>, store_offset_after_each_message: bool, @@ -611,10 +608,7 @@ impl IggyConsumer { auto_commit_after_polling, ) .await; - polled_messages - //TODO: Fix me - /* if let Ok(mut polled_messages) = polled_messages { if polled_messages.messages.is_empty() { return Ok(polled_messages); @@ -635,13 +629,9 @@ impl IggyConsumer { if !allow_replay && has_consumed_offset { polled_messages .messages - .retain(|message| message.offset > consumed_offset); + .retain(|message| message.header.offset > consumed_offset); if polled_messages.messages.is_empty() { - return Ok(PolledMessages { - messages: EMPTY_MESSAGES, - current_offset: polled_messages.current_offset, - partition_id, - }); + return Ok(PolledMessages::empty()); } } @@ -693,9 +683,10 @@ impl IggyConsumer { } return Ok(PolledMessages { - messages: EMPTY_MESSAGES, + messages: vec![], current_offset: polled_messages.current_offset, partition_id, + count: 0, }); } @@ -712,7 +703,6 @@ impl IggyConsumer { sleep(retry_interval.get_duration()).await; } Err(error) - */ } } @@ -791,16 +781,16 @@ impl IggyConsumer { } } -pub struct ReceivedBatch { - pub messages: Vec<IggyMessage>, +pub struct ReceivedMessage { + pub message: IggyMessage, pub current_offset: u64, pub partition_id: u32, } -impl ReceivedBatch { - pub fn new(messages: Vec<IggyMessage>, current_offset: u64, partition_id: u32) -> Self { +impl ReceivedMessage { + pub fn new(message: IggyMessage, current_offset: u64, partition_id: u32) -> Self { Self { - messages, + message, current_offset, partition_id, } @@ -808,134 +798,135 @@ impl ReceivedBatch { } impl Stream for IggyConsumer { - type Item = Result<ReceivedBatch, IggyError>; + type Item = Result<ReceivedMessage, IggyError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let partition_id = self.current_partition_id.load(ORDERING); if let Some(message) = self.buffered_messages.pop_front() { - //TODO: Fix me - /* + { + if let Some(last_consumed_offset_entry) = + self.last_consumed_offsets.get(&partition_id) { - if let Some(last_consumed_offset_entry) = - self.last_consumed_offsets.get(&partition_id) - { - last_consumed_offset_entry.store(message.offset, ORDERING); - } else { - self.last_consumed_offsets - .insert(partition_id, AtomicU64::new(message.offset)); - } - - if (self.store_after_every_nth_message > 0 - && message.offset % self.store_after_every_nth_message == 0) - || self.store_offset_after_each_message - { - self.send_store_offset(partition_id, message.offset); - } + last_consumed_offset_entry.store(message.header.offset, ORDERING); + } else { + self.last_consumed_offsets + .insert(partition_id, AtomicU64::new(message.header.offset)); } - if self.buffered_messages.is_empty() { - if self.polling_strategy.kind == PollingKind::Offset { - self.polling_strategy = PollingStrategy::offset(message.offset + 1); - } - - if self.store_offset_after_all_messages { - self.send_store_offset(partition_id, message.offset); - } + if (self.store_after_every_nth_message > 0 + && message.header.offset % self.store_after_every_nth_message == 0) + || self.store_offset_after_each_message + { + self.send_store_offset(partition_id, message.header.offset); } + } - let current_offset; - if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) { - current_offset = current_offset_entry.load(ORDERING); - } else { - current_offset = 0; + if self.buffered_messages.is_empty() { + if self.polling_strategy.kind == PollingKind::Offset { + self.polling_strategy = PollingStrategy::offset(message.header.offset + 1); } - return Poll::Ready(Some(Ok(ReceivedMessage::new( - message, - current_offset, - partition_id, - )))); + if self.store_offset_after_all_messages { + self.send_store_offset(partition_id, message.header.offset); + } } - if self.poll_future.is_none() { - let future = self.create_poll_messages_future(); - self.poll_future = Some(Box::pin(future)); + let current_offset; + if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) { + current_offset = current_offset_entry.load(ORDERING); + } else { + current_offset = 0; } - while let Some(future) = self.poll_future.as_mut() { - match future.poll_unpin(cx) { - Poll::Ready(Ok(mut polled_messages)) => { - let partition_id = polled_messages.partition_id; - self.current_partition_id.store(partition_id, ORDERING); - if polled_messages.messages.is_empty() { - self.poll_future = Some(Box::pin(self.create_poll_messages_future())); - } else { - if let Some(ref encryptor) = self.encryptor { - for message in &mut polled_messages.messages { - let payload = encryptor.decrypt(&message.payload); - if payload.is_err() { - self.poll_future = None; - error!("Failed to decrypt the message payload at offset: {}, partition ID: {}", message.offset, partition_id); - let error = payload.unwrap_err(); - return Poll::Ready(Some(Err(error))); - } - - let payload = payload.unwrap(); - message.payload = Bytes::from(payload); - message.length = IggyByteSize::from(message.payload.len() as u64); + return Poll::Ready(Some(Ok(ReceivedMessage::new( + message, + current_offset, + partition_id, + )))); + } + + if self.poll_future.is_none() { + let future = self.create_poll_messages_future(); + self.poll_future = Some(Box::pin(future)); + } + + while let Some(future) = self.poll_future.as_mut() { + match future.poll_unpin(cx) { + Poll::Ready(Ok(mut polled_messages)) => { + let partition_id = polled_messages.partition_id; + self.current_partition_id.store(partition_id, ORDERING); + if polled_messages.messages.is_empty() { + self.poll_future = Some(Box::pin(self.create_poll_messages_future())); + } else { + if let Some(ref encryptor) = self.encryptor { + for message in &mut polled_messages.messages { + let payload = encryptor.decrypt(&message.payload); + if payload.is_err() { + self.poll_future = None; + error!("Failed to decrypt the message payload at offset: {}, partition ID: {}", message.header.offset, partition_id); + let error = payload.unwrap_err(); + return Poll::Ready(Some(Err(error))); } - } - if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) - { - current_offset_entry.store(polled_messages.current_offset, ORDERING); - } else { - self.current_offsets.insert( - partition_id, - AtomicU64::new(polled_messages.current_offset), - ); + let payload = payload.unwrap(); + message.payload = Bytes::from(payload); + message.header.payload_length = message.payload.len() as u32; } + } - let message = polled_messages.messages.remove(0); - self.buffered_messages.extend(polled_messages.messages); + if let Some(current_offset_entry) = self.current_offsets.get(&partition_id) + { + current_offset_entry.store(polled_messages.current_offset, ORDERING); + } else { + self.current_offsets.insert( + partition_id, + AtomicU64::new(polled_messages.current_offset), + ); + } - if self.polling_strategy.kind == PollingKind::Offset { - self.polling_strategy = PollingStrategy::offset(message.offset + 1); - } + let message = polled_messages.messages.remove(0); + self.buffered_messages.extend(polled_messages.messages); - if let Some(last_consumed_offset_entry) = - self.last_consumed_offsets.get(&partition_id) - { - last_consumed_offset_entry.store(message.offset, ORDERING); - } else { - self.last_consumed_offsets - .insert(partition_id, AtomicU64::new(message.offset)); - } + if self.polling_strategy.kind == PollingKind::Offset { + self.polling_strategy = + PollingStrategy::offset(message.header.offset + 1); + } - if (self.store_after_every_nth_message > 0 - && message.offset % self.store_after_every_nth_message == 0) - || self.store_offset_after_each_message - || (self.store_offset_after_all_messages - && self.buffered_messages.is_empty()) - { - self.send_store_offset(polled_messages.partition_id, message.offset); - } + if let Some(last_consumed_offset_entry) = + self.last_consumed_offsets.get(&partition_id) + { + last_consumed_offset_entry.store(message.header.offset, ORDERING); + } else { + self.last_consumed_offsets + .insert(partition_id, AtomicU64::new(message.header.offset)); + } - self.poll_future = None; - return Poll::Ready(Some(Ok(ReceivedMessage::new( - message, - polled_messages.current_offset, + if (self.store_after_every_nth_message > 0 + && message.header.offset % self.store_after_every_nth_message == 0) + || self.store_offset_after_each_message + || (self.store_offset_after_all_messages + && self.buffered_messages.is_empty()) + { + self.send_store_offset( polled_messages.partition_id, - )))); + message.header.offset, + ); } - } - Poll::Ready(Err(err)) => { + self.poll_future = None; - return Poll::Ready(Some(Err(err))); + return Poll::Ready(Some(Ok(ReceivedMessage::new( + message, + polled_messages.current_offset, + polled_messages.partition_id, + )))); } - Poll::Pending => return Poll::Pending, } - */ + Poll::Ready(Err(err)) => { + self.poll_future = None; + return Poll::Ready(Some(Err(err))); + } + Poll::Pending => return Poll::Pending, + } } Poll::Pending diff --git a/sdk/src/consumer_ext/consumer_message_ext.rs b/sdk/src/consumer_ext/consumer_message_ext.rs index 6c093fa3..cab9e540 100644 --- a/sdk/src/consumer_ext/consumer_message_ext.rs +++ b/sdk/src/consumer_ext/consumer_message_ext.rs @@ -70,8 +70,7 @@ impl IggyConsumerMessageExt for IggyConsumer { Some(Ok(received_message)) => { let partition_id = received_message.partition_id; let current_offset = received_message.current_offset; - //TODO(hubcio): fix me, this used to be message.offset - let message_offset = received_message.messages[0].header.offset; + let message_offset = received_message.message.header.offset; if let Err(err) = message_consumer.consume(received_message).await { error!("Error while handling message at offset: {message_offset}/{current_offset}, partition: {partition_id} for consumer: {name} on topic: {topic} and stream: {stream} due to error: {err}", name = self.name(), topic = self.topic(), stream = self.stream()); diff --git a/sdk/src/consumer_ext/mod.rs b/sdk/src/consumer_ext/mod.rs index 52869755..ca6a9548 100644 --- a/sdk/src/consumer_ext/mod.rs +++ b/sdk/src/consumer_ext/mod.rs @@ -1,8 +1,7 @@ mod consumer_message_ext; mod consumer_message_trait; -use crate::clients::consumer::ReceivedBatch; -use crate::error::IggyError; +use crate::{clients::consumer::ReceivedMessage, error::IggyError}; pub use consumer_message_trait::IggyConsumerMessageExt; /// Trait for message consumer @@ -18,7 +17,7 @@ pub trait LocalMessageConsumer { /// # Errors /// /// * `IggyError` - If the message consumer fails to consume the message - async fn consume(&self, message: ReceivedBatch) -> Result<(), IggyError>; + async fn consume(&self, message: ReceivedMessage) -> Result<(), IggyError>; } // Default implementation for `&T` @@ -33,7 +32,7 @@ impl<T: MessageConsumer + Send + Sync> MessageConsumer for &T { /// # Errors /// /// * `IggyError` - If the message consumer fails to consume the message - async fn consume(&self, message: ReceivedBatch) -> Result<(), IggyError> { + async fn consume(&self, message: ReceivedMessage) -> Result<(), IggyError> { (**self).consume(message).await } } diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 2ab0ad0e..6c4b92da 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -169,75 +169,10 @@ impl Validatable<IggyError> for SendMessages { impl BytesSerializable for SendMessages { fn to_bytes(&self) -> Bytes { panic!("should not be used") - - // let stream_id_bytes = self.stream_id.to_bytes(); - // let topic_id_bytes = self.topic_id.to_bytes(); - // let partitioning_bytes = self.partitioning.to_bytes(); - - // let metadata_len = stream_id_bytes.len() - // + topic_id_bytes.len() - // + partitioning_bytes.len() - // + std::mem::size_of::<u32>(); - - // let total_len = metadata_len + self.messages.len(); - - // let mut bytes = BytesMut::with_capacity(total_len); - - // bytes.put_slice(&stream_id_bytes); - // bytes.put_slice(&topic_id_bytes); - // bytes.put_slice(&partitioning_bytes); - // bytes.put_u32_le(self.messages_count); - // bytes.put_slice(&self.messages); - - // bytes.freeze() } fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> { panic!("should not be used") - // if bytes.is_empty() { - // return Err(IggyError::InvalidCommand); - // } - - // let mut position = 0; - // let stream_id = Identifier::from_bytes(bytes.clone())?; - // position += stream_id.get_size_bytes().as_bytes_usize(); - - // if bytes.len() <= position { - // return Err(IggyError::InvalidCommand); - // } - - // let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - // position += topic_id.get_size_bytes().as_bytes_usize(); - - // if bytes.len() <= position { - // return Err(IggyError::InvalidCommand); - // } - - // let partitioning = Partitioning::from_bytes(bytes.slice(position..))?; - // position += partitioning.get_size_bytes().as_bytes_usize(); - - // if bytes.len() < position + 4 { - // return Err(IggyError::InvalidCommand); - // } - - // let messages_count = u32::from_le_bytes( - // bytes - // .slice(position..position + 4) - // .as_ref() - // .try_into() - // .unwrap(), - // ); - // position += 4; - - // let messages = bytes.slice(position..); - - // Ok(SendMessages { - // stream_id, - // topic_id, - // partitioning, - // messages_count, - // messages, - // }) } } @@ -255,97 +190,98 @@ impl Display for SendMessages { #[cfg(test)] mod tests { + use std::str::FromStr; + use super::*; - // //TODO: Fix me, fix those tests. - // #[test] - // fn should_be_serialized_as_bytes() { - // let message_1 = Message::from_str("hello 1").unwrap(); - // let message_2 = Message::new(Some(2), "hello 2".into(), None); - // let message_3 = Message::new(Some(3), "hello 3".into(), None); - // let messages = vec![message_1, message_2, message_3]; - // let command = SendMessages { - // stream_id: Identifier::numeric(1).unwrap(), - // topic_id: Identifier::numeric(2).unwrap(), - // partitioning: Partitioning::partition_id(4), - // messages, - // }; - - // let bytes = command.to_bytes(); - - // let mut position = 0; - // let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - // position += stream_id.get_size_bytes().as_bytes_usize(); - // let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - // position += topic_id.get_size_bytes().as_bytes_usize(); - // let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap(); - // position += key.get_size_bytes().as_bytes_usize(); - // let messages = bytes.slice(position..); - // let command_messages = command - // .messages - // .iter() - // .fold(BytesMut::new(), |mut bytes_mut, message| { - // bytes_mut.put(message.to_bytes()); - // bytes_mut - // }) - // .freeze(); - - // assert!(!bytes.is_empty()); - // assert_eq!(stream_id, command.stream_id); - // assert_eq!(topic_id, command.topic_id); - // assert_eq!(key, command.partitioning); - // assert_eq!(messages, command_messages); - // } - - // #[test] - // fn should_be_deserialized_from_bytes() { - // let stream_id = Identifier::numeric(1).unwrap(); - // let topic_id = Identifier::numeric(2).unwrap(); - // let key = Partitioning::partition_id(4); - - // let message_1 = Message::from_str("hello 1").unwrap(); - // let message_2 = Message::new(Some(2), "hello 2".into(), None); - // let message_3 = Message::new(Some(3), "hello 3".into(), None); - // let messages = [ - // message_1.to_bytes(), - // message_2.to_bytes(), - // message_3.to_bytes(), - // ] - // .concat(); - - // let key_bytes = key.to_bytes(); - // let stream_id_bytes = stream_id.to_bytes(); - // let topic_id_bytes = topic_id.to_bytes(); - // let current_position = stream_id_bytes.len() + topic_id_bytes.len() + key_bytes.len(); - // let mut bytes = BytesMut::with_capacity(current_position); - // bytes.put_slice(&stream_id_bytes); - // bytes.put_slice(&topic_id_bytes); - // bytes.put_slice(&key_bytes); - // bytes.put_slice(&messages); - // let bytes = bytes.freeze(); - // let command = SendMessages::from_bytes(bytes.clone()); - // assert!(command.is_ok()); - - // let messages_payloads = bytes.slice(current_position..); - // let mut position = 0; - // let mut messages = Vec::new(); - // while position < messages_payloads.len() { - // let message = Message::from_bytes(messages_payloads.slice(position..)).unwrap(); - // position += message.get_size_bytes().as_bytes_usize(); - // messages.push(message); - // } - - // let command = command.unwrap(); - // assert_eq!(command.stream_id, stream_id); - // assert_eq!(command.topic_id, topic_id); - // assert_eq!(command.partitioning, key); - // for (index, message) in command.messages.iter().enumerate() { - // let command_message = &command.messages[index]; - // assert_eq!(command_message.id, message.id); - // assert_eq!(command_message.length, message.length); - // assert_eq!(command_message.payload, message.payload); - // } - // } + #[test] + fn should_be_serialized_as_bytes() { + let message_1 = IggyMessage::from_str("hello 1").unwrap(); + let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None); + let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None); + let messages = vec![message_1, message_2, message_3]; + let command = SendMessages { + stream_id: Identifier::numeric(1).unwrap(), + topic_id: Identifier::numeric(2).unwrap(), + partitioning: Partitioning::partition_id(4), + messages, + }; + + let bytes = command.to_bytes(); + + let mut position = 0; + let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); + position += stream_id.get_size_bytes().as_bytes_usize(); + let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); + position += topic_id.get_size_bytes().as_bytes_usize(); + let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap(); + position += key.get_size_bytes().as_bytes_usize(); + let messages = bytes.slice(position..); + let command_messages = command + .messages + .iter() + .fold(BytesMut::new(), |mut bytes_mut, message| { + bytes_mut.put(message.to_bytes()); + bytes_mut + }) + .freeze(); + + assert!(!bytes.is_empty()); + assert_eq!(stream_id, command.stream_id); + assert_eq!(topic_id, command.topic_id); + assert_eq!(key, command.partitioning); + assert_eq!(messages, command_messages); + } + + #[test] + fn should_be_deserialized_from_bytes() { + let stream_id = Identifier::numeric(1).unwrap(); + let topic_id = Identifier::numeric(2).unwrap(); + let key = Partitioning::partition_id(4); + + let message_1 = IggyMessage::from_str("hello 1").unwrap(); + let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None); + let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None); + let messages = [ + message_1.to_bytes(), + message_2.to_bytes(), + message_3.to_bytes(), + ] + .concat(); + + let key_bytes = key.to_bytes(); + let stream_id_bytes = stream_id.to_bytes(); + let topic_id_bytes = topic_id.to_bytes(); + let current_position = stream_id_bytes.len() + topic_id_bytes.len() + key_bytes.len(); + let mut bytes = BytesMut::with_capacity(current_position); + bytes.put_slice(&stream_id_bytes); + bytes.put_slice(&topic_id_bytes); + bytes.put_slice(&key_bytes); + bytes.put_slice(&messages); + let bytes = bytes.freeze(); + let command = SendMessages::from_bytes(bytes.clone()); + assert!(command.is_ok()); + + let messages_payloads = bytes.slice(current_position..); + let mut position = 0; + let mut messages = Vec::new(); + while position < messages_payloads.len() { + let message = IggyMessage::from_bytes(messages_payloads.slice(position..)).unwrap(); + position += message.get_size_bytes().as_bytes_usize(); + messages.push(message); + } + + let command = command.unwrap(); + assert_eq!(command.stream_id, stream_id); + assert_eq!(command.topic_id, topic_id); + assert_eq!(command.partitioning, key); + for (index, message) in command.messages.iter().enumerate() { + let command_message = &command.messages[index]; + assert_eq!(command_message.id, message.id); + assert_eq!(command_message.length, message.length); + assert_eq!(command_message.payload, message.payload); + } + } #[test] fn key_of_type_balanced_should_have_empty_value() { diff --git a/sdk/src/models/messaging/indexes.rs b/sdk/src/models/messaging/indexes.rs index 420c1b78..56c63055 100644 --- a/sdk/src/models/messaging/indexes.rs +++ b/sdk/src/models/messaging/indexes.rs @@ -1,12 +1,11 @@ use super::{index_view::IggyIndexView, INDEX_SIZE}; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::fmt; use std::ops::{Deref, Index as StdIndex}; /// A container for binary-encoded index data. /// Optimized for efficient storage and I/O operations. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct IggyIndexes { base_position: u32, buffer: Bytes, @@ -89,26 +88,6 @@ impl IggyIndexes { Some(IggyIndexes::new(slice, base_position)) } - /// Gets a first index - pub fn first(&self) -> Option<IggyIndexView> { - if self.count() == 0 { - return None; - } - - Some(IggyIndexView::new(&self.buffer[0..INDEX_SIZE])) - } - - /// Gets a last index - pub fn last(&self) -> Option<IggyIndexView> { - if self.count() == 0 { - return None; - } - - Some(IggyIndexView::new( - &self.buffer[(self.count() - 1) as usize * INDEX_SIZE..], - )) - } - /// Finds an index by timestamp using binary search /// If an exact match isn't found, returns the index with the nearest timestamp /// that is greater than or equal to the requested timestamp @@ -162,43 +141,12 @@ impl IggyIndexes { self.base_position } - /// Helper method to get the first index offset - pub fn first_offset(&self) -> u32 { - let offset = self.get(0).map(|idx| idx.offset()).unwrap_or(0); - offset - } - - /// Helper method to get the first index position - pub fn first_position(&self) -> u32 { - let position = self.get(0).map(|idx| idx.position()).unwrap_or(0); - position - } - - /// Helper method to get the first timestamp - pub fn first_timestamp(&self) -> u64 { - self.get(0).map(|idx| idx.timestamp()).unwrap_or(0) - } - - /// Helper method to get the last index offset - pub fn last_offset(&self) -> u32 { - self.get(self.count() - 1) - .map(|idx| idx.offset()) - .unwrap_or(0) - } - /// Helper method to get the last index position pub fn last_position(&self) -> u32 { self.get(self.count() - 1) .map(|idx| idx.position()) .unwrap_or(0) } - - /// Helper method to get the last timestamp - pub fn last_timestamp(&self) -> u64 { - self.get(self.count() - 1) - .map(|idx| idx.timestamp()) - .unwrap_or(0) - } } impl StdIndex<usize> for IggyIndexes { @@ -218,37 +166,3 @@ impl Deref for IggyIndexes { &self.buffer } } - -impl fmt::Debug for IggyIndexes { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let count = self.count(); - - if count == 0 { - return write!( - f, - "IggyIndexes {{ count: 0, base_position: {}, indexes: [] }}", - self.base_position - ); - } - - writeln!(f, "IggyIndexes {{")?; - writeln!(f, " count: {},", count)?; - writeln!(f, " base_position: {},", self.base_position)?; - writeln!(f, " indexes: [")?; - - for i in 0..count { - if let Some(index) = self.get(i) { - writeln!( - f, - " {{ offset: {}, position: {}, timestamp: {} }},", - index.offset(), - index.position(), - index.timestamp() - )?; - } - } - - writeln!(f, " ]")?; - write!(f, "}}") - } -} diff --git a/sdk/src/models/messaging/message.rs b/sdk/src/models/messaging/message.rs index c18d9c6e..ce12c14e 100644 --- a/sdk/src/models/messaging/message.rs +++ b/sdk/src/models/messaging/message.rs @@ -37,11 +37,46 @@ impl IggyMessage { Self::builder().with_id(id).with_payload(payload).build() } + /// Create a message with ID, payload and user headers + pub fn with_id_and_headers( + id: u128, + payload: Bytes, + headers: HashMap<HeaderKey, HeaderValue>, + ) -> Self { + Self::builder() + .with_id(id) + .with_payload(payload) + .with_user_headers_map(headers) + .build() + } + /// Start a builder for more complex configuration pub fn builder() -> IggyMessageBuilder { IggyMessageBuilder::new() } + /// Return instantiated user headers map + pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, HeaderValue>>, IggyError> { + if let Some(headers) = &self.user_headers { + let headers_bytes = Bytes::copy_from_slice(headers); + + match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) { + Ok(h) => Ok(Some(h)), + Err(e) => { + tracing::error!( + "Error parsing headers: {}, header_length={}", + e, + self.header.user_headers_length + ); + + Ok(None) + } + } + } else { + Ok(None) + } + } + /// Convert Bytes to messages pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> { let mut messages = Vec::with_capacity(count as usize); @@ -209,7 +244,7 @@ impl IggyMessageBuilder { self } - pub fn with_user_header_kv(mut self, key: HeaderKey, value: HeaderValue) -> Self { + pub fn with_user_key_value_header(mut self, key: HeaderKey, value: HeaderValue) -> Self { let headers = self.headers.get_or_insert_with(HashMap::new); headers.insert(key, value); self diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 4dc3ef4c..f1c89d04 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -339,67 +339,54 @@ impl Partition { #[cfg(test)] mod tests { - // TODO: Fix me - /* - use iggy::utils::byte_size::IggyByteSize; - use iggy::utils::expiry::IggyExpiry; - use iggy::utils::sizeable::Sizeable; - use std::sync::atomic::{AtomicU32, AtomicU64}; - use tempfile::TempDir; - use super::*; use crate::configs::system::{MessageDeduplicationConfig, SystemConfig}; - use crate::streaming::partitions::create_messages; use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind}; use crate::streaming::storage::SystemStorage; + use bytes::Bytes; + use std::sync::atomic::{AtomicU32, AtomicU64}; + use std::sync::Arc; + use tempfile::TempDir; #[tokio::test] async fn given_disabled_message_deduplication_all_messages_should_be_appended() { let (mut partition, _tempdir) = create_partition(false).await; let messages = create_messages(); let messages_count = messages.len() as u32; - let appendable_batch_info = AppendableBatchInfo { - batch_size: messages - .iter() - .map(|m| m.get_size_bytes()) - .sum::<IggyByteSize>(), - partition_id: partition.partition_id, - }; - partition - .append_messages(appendable_batch_info, messages, None) - .await - .unwrap(); + let messages_size = messages + .iter() + .map(|m| m.get_size_bytes().as_bytes_u32()) + .sum(); + let batch = IggyMessagesBatchMut::from_messages(&messages, messages_size); + + partition.append_messages(batch, None).await.unwrap(); let loaded_messages = partition .get_messages_by_offset(0, messages_count) .await .unwrap(); - assert_eq!(loaded_messages.len(), messages_count as usize); + assert_eq!(loaded_messages.count(), messages_count); } #[tokio::test] async fn given_enabled_message_deduplication_only_messages_with_unique_id_should_be_appended() { let (mut partition, _tempdir) = create_partition(true).await; let messages = create_messages(); + let messages_size = messages + .iter() + .map(|m| m.get_size_bytes().as_bytes_u32()) + .sum(); + let batch = IggyMessagesBatchMut::from_messages(&messages, messages_size); let messages_count = messages.len() as u32; let unique_messages_count = 3; - let appendable_batch_info = AppendableBatchInfo { - batch_size: messages - .iter() - .map(|m| m.get_size_bytes()) - .sum::<IggyByteSize>(), - partition_id: partition.partition_id, - }; - partition - .append_messages(appendable_batch_info, messages, None) - .await - .unwrap(); + + partition.append_messages(batch, None).await.unwrap(); let loaded_messages = partition .get_messages_by_offset(0, messages_count) .await .unwrap(); - assert_eq!(loaded_messages.len(), unique_messages_count); + assert_eq!(loaded_messages.count(), unique_messages_count); } async fn create_partition(deduplication_enabled: bool) -> (Partition, TempDir) { @@ -441,5 +428,19 @@ mod tests { temp_dir, ) } - */ + + fn create_messages() -> Vec<IggyMessage> { + vec![ + create_message(1, "message 1"), + create_message(2, "message 2"), + create_message(3, "message 3"), + create_message(2, "message 3.2"), + create_message(1, "message 1.2"), + create_message(3, "message 3.3"), + ] + } + + fn create_message(id: u128, payload: &str) -> IggyMessage { + IggyMessage::with_id(id, Bytes::from(payload.to_string())) + } } diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index b74fe6b7..80c89b43 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -99,9 +99,7 @@ impl PartitionStorage for FilePartitionStorage { let index_path_exists = tokio::fs::try_exists(&index_path).await.unwrap(); let time_index_path_exists = tokio::fs::try_exists(&time_index_path).await.unwrap(); - // Rebuild indexes in 2 cases: - // 1. Index cache is enabled and index at path does not exists. - // 2. Index cache is enabled and time index at path exists. + // Rebuild indexes if index cache is enabled and index at path does not exists. if index_cache_enabled && (!index_path_exists || time_index_path_exists) { warn!( "Index at path {} does not exist, rebuilding it based on {}...", @@ -127,7 +125,6 @@ impl PartitionStorage for FilePartitionStorage { ); } - // Remove legacy time index if it exists. if time_index_path_exists { tokio::fs::remove_file(&time_index_path).await.unwrap(); } diff --git a/server/src/streaming/segments/messages/messages_writer.rs b/server/src/streaming/segments/messages/messages_writer.rs index 9d0cdd44..f95dd9ce 100644 --- a/server/src/streaming/segments/messages/messages_writer.rs +++ b/server/src/streaming/segments/messages/messages_writer.rs @@ -90,11 +90,6 @@ impl MessagesWriter { "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", self.file_path ); - - for container in batch_set.iter() { - tracing::error!("Container size: {}", container.size()); - } - match confirmation { Confirmation::Wait => { if let Some(ref mut file) = self.file { diff --git a/server/src/streaming/segments/messages_accumulator.rs b/server/src/streaming/segments/messages_accumulator.rs index 1d11a1a7..33e91503 100644 --- a/server/src/streaming/segments/messages_accumulator.rs +++ b/server/src/streaming/segments/messages_accumulator.rs @@ -172,8 +172,6 @@ impl MessagesAccumulator { for batch in self.batches.iter() { segment_indexes.concatenate(batch.indexes_slice()); } - - tracing::error!("hubcio after update indexes: {}", segment_indexes.count()); } /// Consumes the accumulator and returns the contained message batches. diff --git a/server/src/streaming/segments/reading_messages.rs b/server/src/streaming/segments/reading_messages.rs index fdf4deaa..8cdc94c5 100644 --- a/server/src/streaming/segments/reading_messages.rs +++ b/server/src/streaming/segments/reading_messages.rs @@ -42,8 +42,6 @@ impl Segment { let accumulator_first_timestamp = self.accumulator.first_timestamp(); let accumulator_last_timestamp = self.accumulator.last_timestamp(); - tracing::error!("hubcio accumulator first timestamp: {accumulator_first_timestamp}, last timestamp: {accumulator_last_timestamp}"); - // Case 1: Requested timestamp is higher than any available timestamp if timestamp > accumulator_last_timestamp { return Ok(IggyMessagesBatchSet::empty()); @@ -123,19 +121,11 @@ impl Segment { // Case 1: All messages are in accumulator buffer if offset >= accumulator_first_msg_offset && end_offset <= accumulator_last_msg_offset { - tracing::error!( - "hubcio segment has {} messages, getting all from accumulator", - self.get_messages_count() - ); return Ok(self.accumulator.get_messages_by_offset(offset, count)); } // Case 2: All messages are on disk if end_offset < accumulator_first_msg_offset { - tracing::error!( - "hubcio segment has {} messages, getting all from disk", - self.get_messages_count() - ); return self.load_messages_from_disk_by_offset(offset, count).await; } @@ -169,12 +159,6 @@ impl Segment { // Calculate how many more messages we need from the accumulator let remaining_count = count - combined_batch_set.count(); - tracing::error!( - "hubcio segment has {} messages, remaining count: {}", - self.get_messages_count(), - remaining_count - ); - if remaining_count > 0 { let accumulator_start_offset = std::cmp::max(offset, accumulator_first_msg_offset); diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index de1b85c7..246b5fe1 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -211,13 +211,16 @@ impl Segment { } pub async fn initialize_reading(&mut self) -> Result<(), IggyError> { - let log_reader = + if !self.config.segment.cache_indexes { + let index_reader = + IndexReader::new(&self.index_path, self.indexes_size.clone()).await?; + self.index_reader = Some(index_reader); + } + + let messages_reader = MessagesReader::new(&self.messages_path, self.messages_size.clone()).await?; - // TODO(hubcio): there is no need to store open fd for reader if we have index cache enabled - let index_reader = IndexReader::new(&self.index_path, self.indexes_size.clone()).await?; + self.messages_reader = Some(messages_reader); - self.messages_reader = Some(log_reader); - self.index_reader = Some(index_reader); Ok(()) } diff --git a/server/src/streaming/segments/types/message_view_mut.rs b/server/src/streaming/segments/types/message_view_mut.rs index ca283dec..b3521cf6 100644 --- a/server/src/streaming/segments/types/message_view_mut.rs +++ b/server/src/streaming/segments/types/message_view_mut.rs @@ -49,9 +49,9 @@ impl<'a> IggyMessageViewMut<'a> { /// Convenience to update the checksum field in the header pub fn update_checksum(&mut self) { - let start = 8; // Skip checksum field for checksum calculation - let size = self.size() - 8; - let data = &self.buffer[start..start + size]; + let checksum_field_size = size_of::<u64>(); // Skip checksum field for checksum calculation + let size = self.size() - checksum_field_size; + let data = &self.buffer[checksum_field_size..checksum_field_size + size]; let checksum = gxhash64(data, 0); diff --git a/server/src/streaming/segments/types/messages_batch_set.rs b/server/src/streaming/segments/types/messages_batch_set.rs index 34b9840c..af52db56 100644 --- a/server/src/streaming/segments/types/messages_batch_set.rs +++ b/server/src/streaming/segments/types/messages_batch_set.rs @@ -132,12 +132,6 @@ impl IggyMessagesBatchSet { let mut remaining_count = count; for container in self.iter() { - tracing::error!( - "BATCH_SET container has {} messages, first offset {}, last offset {}", - container.count(), - container.first_offset(), - container.last_offset() - ); if remaining_count == 0 { break; } @@ -149,13 +143,8 @@ impl IggyMessagesBatchSet { if let Some(sliced) = container.slice_by_offset(start_offset, remaining_count) { if sliced.count() > 0 { - tracing::error!( - "BATCH_SET will get {} messages from container", - sliced.count() - ); remaining_count -= sliced.count(); result.add_batch(sliced); - tracing::error!("BATCH_SET remaining count {}", remaining_count); } } } @@ -182,12 +171,6 @@ impl IggyMessagesBatchSet { let first_timestamp = container.first_timestamp(); if first_timestamp < timestamp { - tracing::error!( - "BATCH_SET container has {} messages, first timestamp {}, requested timestamp {}", - container.count(), - first_timestamp, - timestamp - ); continue; } diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index 4d41b2ee..d33dabd3 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -1,4 +1,4 @@ -use super::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use super::IggyMessagesBatchMut; use crate::streaming::segments::segment::Segment; use error_set::ErrContext; use iggy::confirmation::Confirmation; @@ -102,7 +102,6 @@ impl Segment { self.indexes.mark_saved(); if !self.config.segment.cache_indexes { - tracing::error!("Clearing indexes cache"); self.indexes.clear(); } diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 3048bad2..15b3f932 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -9,15 +9,12 @@ use error_set::ErrContext; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::messages::{PartitioningKind, PollingKind}; -use iggy::prelude::{Partitioning, PolledMessages}; -use iggy::utils::byte_size::IggyByteSize; +use iggy::prelude::Partitioning; use iggy::utils::expiry::IggyExpiry; -use iggy::utils::sizeable::Sizeable; use iggy::utils::timestamp::IggyTimestamp; use iggy::{confirmation::Confirmation, prelude::PollingStrategy}; use std::sync::atomic::Ordering; -use std::sync::Arc; -use tracing::{info, trace, warn}; +use tracing::trace; impl Topic { pub fn get_messages_count(&self) -> u64 { diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 1dd74e10..2d99b9be 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -4,13 +4,7 @@ use crate::server_error::ConnectionError; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use crate::tcp::connection_handler::command::ServerCommand; -use crate::tcp::COMPONENT; -use bytes::{BufMut, BytesMut}; -use error_set::ErrContext; -use iggy::bytes_serializable::BytesSerializable; -use iggy::command::SEND_MESSAGES_CODE; use iggy::error::IggyError; -use iggy::identifier::Identifier; use std::io::ErrorKind; use std::sync::Arc; use tracing::{debug, error, info};
