This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partitions_refactor
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit aff103e3373dc1f127aa71468b9820476ac4cf25
Author: numinex <[email protected]>
AuthorDate: Tue Mar 10 10:01:03 2026 +0100

    fixes
---
 core/partitions/src/iggy_partition.rs  |  51 +++++++-
 core/partitions/src/iggy_partitions.rs |  54 +++------
 core/partitions/src/journal.rs         | 211 +++++++++++++--------------------
 core/partitions/src/lib.rs             |  35 +++++-
 core/simulator/src/deps.rs             |   2 +-
 5 files changed, 177 insertions(+), 176 deletions(-)

diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 49c820230..bf309a7c8 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::journal::{Noop, PartitionJournal};
+use crate::journal::{PartitionJournal2Impl, PartitionJournalMemStorage};
 use crate::log::SegmentedLog;
-use crate::{AppendResult, Partition};
+use crate::{AppendResult, Partition, decode_send_messages_batch};
 use iggy_common::{
     ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyMessagesBatchMut,
     IggyTimestamp, PartitionStats,
+    header::{Operation, PrepareHeader},
+    message::Message,
 };
 use journal::Journal as _;
 use std::sync::Arc;
@@ -30,7 +32,8 @@ use tokio::sync::Mutex as TokioMutex;
 // This struct aliases in terms of the code contained the `LocalPartition from 
`core/server/src/streaming/partitions/local_partition.rs`.
 #[derive(Debug)]
 pub struct IggyPartition {
-    pub log: SegmentedLog<PartitionJournal, Noop>,
+    pub log:
+        SegmentedLog<PartitionJournal2Impl<PartitionJournalMemStorage>, 
PartitionJournalMemStorage>,
     /// Committed offset — advanced only after quorum ack.
     pub offset: Arc<AtomicU64>,
     /// Dirty offset — advanced on every prepare (before commit).
@@ -46,6 +49,35 @@ pub struct IggyPartition {
 }
 
 impl IggyPartition {
+    fn prepare_message_from_batch(
+        mut header: PrepareHeader,
+        batch: &IggyMessagesBatchMut,
+    ) -> Message<PrepareHeader> {
+        let indexes = batch.indexes();
+        let count = batch.count();
+        let body_len = 4 + indexes.len() + batch.len();
+        let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
+        header.size = total_size as u32;
+
+        let message = 
Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
+            *new = header;
+        });
+
+        let mut bytes = message
+            .into_inner()
+            .try_into_mut()
+            .expect("prepare_message_from_batch: expected unique bytes 
buffer");
+        let header_size = std::mem::size_of::<PrepareHeader>();
+        bytes[header_size..header_size + 
4].copy_from_slice(&count.to_le_bytes());
+        let mut position = header_size + 4;
+        bytes[position..position + indexes.len()].copy_from_slice(indexes);
+        position += indexes.len();
+        bytes[position..position + batch.len()].copy_from_slice(batch);
+
+        Message::<PrepareHeader>::from_bytes(bytes.freeze())
+            .expect("prepare_message_from_batch: invalid prepared message 
bytes")
+    }
+
     pub fn new(stats: Arc<PartitionStats>) -> Self {
         Self {
             log: SegmentedLog::default(),
@@ -65,8 +97,16 @@ impl IggyPartition {
 impl Partition for IggyPartition {
     async fn append_messages(
         &mut self,
-        mut batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) -> Result<AppendResult, IggyError> {
+        let header = *message.header();
+        if header.operation != Operation::SendMessages {
+            return Err(IggyError::CannotAppendMessage);
+        }
+
+        let mut batch = decode_send_messages_batch(message.body_bytes())
+            .ok_or(IggyError::CannotAppendMessage)?;
+
         if batch.count() == 0 {
             return Ok(AppendResult::new(0, 0, 0));
         }
@@ -116,7 +156,8 @@ impl Partition for IggyPartition {
             journal.info.end_timestamp = ts;
         }
 
-        journal.inner.append(batch).await;
+        let message = Self::prepare_message_from_batch(header, &batch);
+        journal.inner.append(message).await;
 
         Ok(AppendResult::new(
             dirty_offset,
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index c3b1e0bc6..fda970d44 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -30,8 +30,7 @@ use consensus::{
 };
 use iggy_common::header::Command2;
 use iggy_common::{
-    INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
-    Segment, SegmentStorage,
+    IggyByteSize, PartitionStats, Segment, SegmentStorage,
     header::{
         ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, RequestHeader,
     },
@@ -355,13 +354,13 @@ where
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
-        let header = message.header();
+        let header = *message.header();
         let namespace = IggyNamespace::from_raw(header.namespace);
         let consensus = self
             .consensus()
             .expect("on_replicate: consensus not initialized");
 
-        let current_op = match replicate_preflight(consensus, header) {
+        let current_op = match replicate_preflight(consensus, &header) {
             Ok(current_op) => current_op,
             Err(reason) => {
                 warn!(
@@ -372,7 +371,7 @@ where
             }
         };
 
-        let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
         if is_old_prepare {
             warn!("received old prepare, not replicating");
         } else {
@@ -387,9 +386,9 @@ where
         // TODO: Figure out the flow of the partition operations.
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        self.apply_replicated_operation(&namespace, &message).await;
+        self.apply_replicated_operation(&namespace, message).await;
 
-        self.send_prepare_ok(header).await;
+        self.send_prepare_ok(&header).await;
 
         if consensus.is_follower() {
             self.commit_journal(namespace);
@@ -539,37 +538,21 @@ where
             .register_namespace(ns);
     }
 
-    // TODO: Move this elsewhere, also do not reallocate, we do reallocationg 
now becauise we use PooledBuffer for the batch body
-    // but `Bytes` for `Message` payload.
-    fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
-        assert!(body.len() >= 4, "prepare body too small for batch header");
-        let count = u32::from_le_bytes(body[0..4].try_into().unwrap());
-        let indexes_len = count as usize * INDEX_SIZE;
-        let indexes_end = 4 + indexes_len;
-        assert!(
-            body.len() >= indexes_end,
-            "prepare body too small for {count} indexes",
-        );
-
-        let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
-        let messages = PooledBuffer::from(&body[indexes_end..]);
-        IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
-    }
-
     async fn apply_replicated_operation(
         &self,
         namespace: &IggyNamespace,
-        message: &Message<PrepareHeader>,
+        message: Message<PrepareHeader>,
     ) {
         let consensus = self
             .consensus()
             .expect("apply_replicated_operation: consensus not initialized");
-        let header = message.header();
+        let header = *message.header();
 
+        // TODO: WE have to distinguish between an `message` recv by leader 
and follower.
+        // In the follower path, we have to skip the `prepare_for_persistance` 
path, just append to journal.
         match header.operation {
             Operation::SendMessages => {
-                let body = message.body_bytes();
-                self.append_send_messages_to_journal(namespace, body.as_ref())
+                self.append_send_messages_to_journal(namespace, message)
                     .await;
                 debug!(
                     replica = consensus.replica(),
@@ -598,12 +581,7 @@ where
         }
     }
 
-    async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, 
body: &[u8]) {
-        let batch = Self::batch_from_body(body);
-        self.append_messages_to_journal(namespace, batch).await;
-    }
-
-    /// Append a batch to a partition's journal with offset assignment.
+    /// Append a prepare message to a partition's journal with offset 
assignment.
     ///
     /// Updates `segment.current_position` (logical position for indexing) but
     /// not `segment.end_offset` or `segment.end_timestamp` (committed state).
@@ -611,15 +589,15 @@ where
     ///
     /// Uses `dirty_offset` for offset assignment so that multiple prepares
     /// can be pipelined before any commit.
-    async fn append_messages_to_journal(
+    async fn append_send_messages_to_journal(
         &self,
         namespace: &IggyNamespace,
-        batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) {
         let partition = self
             .get_mut_by_ns(namespace)
-            .expect("append_messages_to_journal: partition not found for 
namespace");
-        let _ = partition.append_messages(batch).await;
+            .expect("append_send_messages_to_journal: partition not found for 
namespace");
+        let _ = partition.append_messages(message).await;
     }
 
     /// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 93d3733fc..2139290d3 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::{Bytes, BytesMut};
+use bytes::Bytes;
 use iggy_common::{
-    INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
PooledBuffer,
+    IggyMessagesBatchMut, IggyMessagesBatchSet,
     header::{Operation, PrepareHeader},
     message::Message,
 };
@@ -29,21 +29,6 @@ use std::{
 
 const ZERO_LEN: usize = 0;
 
-// TODO: Fix that, we need to figure out how to store the 
`IggyMessagesBatchSet`.
-/// No-op storage backend for the in-memory partition journal.
-#[derive(Debug)]
-pub struct Noop;
-
-impl Storage for Noop {
-    type Buffer = ();
-
-    async fn write(&self, _buf: ()) -> usize {
-        0
-    }
-
-    async fn read(&self, _offset: usize, _len: usize) -> () {}
-}
-
 /// Lookup key for querying messages from the journal.
 #[derive(Debug, Clone, Copy)]
 pub enum MessageLookup {
@@ -59,42 +44,7 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-// [LEGACY]
-pub struct PartitionJournal {
-    batch_set: UnsafeCell<IggyMessagesBatchSet>,
-}
-
-impl PartitionJournal {
-    pub fn new() -> Self {
-        Self {
-            batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
-        }
-    }
-
-    /// Drain all accumulated batches, returning the batch set.
-    pub fn commit(&self) -> IggyMessagesBatchSet {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        std::mem::take(batch_set)
-    }
-
-    pub fn is_empty(&self) -> bool {
-        let batch_set = unsafe { &*self.batch_set.get() };
-        batch_set.is_empty()
-    }
-}
-
-impl Default for PartitionJournal {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl std::fmt::Debug for PartitionJournal {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("PartitionJournal").finish()
-    }
-}
-
+#[allow(dead_code)]
 pub trait PartitionJournal2<S>: Journal<S>
 where
     S: Storage,
@@ -104,7 +54,7 @@ where
     fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
 }
 
-#[derive(Default)]
+#[derive(Debug, Default)]
 pub struct PartitionJournalMemStorage {
     entries: UnsafeCell<Vec<Bytes>>,
     op_to_index: UnsafeCell<HashMap<u64, usize>>,
@@ -154,56 +104,100 @@ where
     inner: UnsafeCell<JournalInner<S>>,
 }
 
-struct JournalInner<S>
+impl<S> Default for PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes> + Default,
+{
+    fn default() -> Self {
+        Self {
+            message_offset_to_op: UnsafeCell::new(BTreeMap::new()),
+            timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+            headers: UnsafeCell::new(Vec::new()),
+            inner: UnsafeCell::new(JournalInner {
+                storage: S::default(),
+            }),
+        }
+    }
+}
+
+impl<S> std::fmt::Debug for PartitionJournal2Impl<S>
 where
     S: Storage<Buffer = Bytes>,
 {
-    storage: S,
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PartitionJournal2Impl").finish()
+    }
 }
 
-impl<S> PartitionJournal2Impl<S>
+struct JournalInner<S>
 where
     S: Storage<Buffer = Bytes>,
 {
-    fn decode_send_messages_batch(body: bytes::Bytes) -> 
Option<IggyMessagesBatchMut> {
-        // TODO: This is bad,
-        let mut body = body
-            .try_into_mut()
-            .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
+    storage: S,
+}
 
-        if body.len() < 4 {
-            return None;
-        }
+impl PartitionJournalMemStorage {
+    fn drain(&self) -> Vec<Bytes> {
+        let entries = unsafe { &mut *self.entries.get() };
+        let drained = std::mem::take(entries);
+        let op_to_index = unsafe { &mut *self.op_to_index.get() };
+        op_to_index.clear();
+        drained
+    }
 
-        let count_bytes = body.split_to(4);
-        let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
-        let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
+    fn is_empty(&self) -> bool {
+        let entries = unsafe { &*self.entries.get() };
+        entries.is_empty()
+    }
+}
 
-        if body.len() < indexes_len {
-            return None;
+impl PartitionJournal2Impl<PartitionJournalMemStorage> {
+    /// Drain all accumulated batches, matching the legacy PartitionJournal 
API.
+    pub fn commit(&self) -> IggyMessagesBatchSet {
+        let entries = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.drain()
+        };
+
+        let mut messages = Vec::with_capacity(entries.len());
+        for bytes in entries {
+            if let Ok(message) = Message::from_bytes(bytes) {
+                messages.push(message);
+            }
         }
 
-        let indexes_bytes = body.split_to(indexes_len);
-        let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
-        let messages = PooledBuffer::from(body);
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.clear();
+        let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+        offsets.clear();
+        let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+        timestamps.clear();
+
+        Self::messages_to_batch_set(&messages)
+    }
 
-        Some(IggyMessagesBatchMut::from_indexes_and_messages(
-            indexes, messages,
-        ))
+    pub fn is_empty(&self) -> bool {
+        let inner = unsafe { &*self.inner.get() };
+        inner.storage.is_empty()
     }
+}
 
+impl<S> PartitionJournal2Impl<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
     fn message_to_batch(message: &Message<PrepareHeader>) -> 
Option<IggyMessagesBatchMut> {
         if message.header().operation != Operation::SendMessages {
             return None;
         }
 
-        Self::decode_send_messages_batch(message.body_bytes())
+        crate::decode_send_messages_batch(message.body_bytes())
     }
 
-    fn messages_to_batch_set(messages: Vec<Message<PrepareHeader>>) -> 
IggyMessagesBatchSet {
+    fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> 
IggyMessagesBatchSet {
         let mut batch_set = IggyMessagesBatchSet::empty();
 
-        for message in &messages {
+        for message in messages {
             if let Some(batch) = Self::message_to_batch(message) {
                 batch_set.add_batch(batch);
             }
@@ -212,6 +206,7 @@ where
         batch_set
     }
 
+    #[allow(dead_code)]
     fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
         match query {
             MessageLookup::Offset { offset, .. } => {
@@ -240,9 +235,13 @@ where
             inner.storage.read(offset, ZERO_LEN).await
         };
 
-        Some(Message::from_bytes(bytes).expect("invalid message bytes read 
from storage"))
+        Some(
+            Message::from_bytes(bytes)
+                .expect("partition.journal.storage.read: invalid bytes for 
message"),
+        )
     }
 
+    #[allow(dead_code)]
     async fn load_messages_from_storage(
         &self,
         start_op: u64,
@@ -294,7 +293,7 @@ where
 
         let prev_op = header.op - 1;
         let headers = unsafe { &*self.headers.get() };
-        headers.get(prev_op as usize)
+        headers.iter().find(|candidate| candidate.op == prev_op)
     }
 
     async fn append(&self, entry: Self::Entry) {
@@ -344,7 +343,7 @@ where
 
         let messages = self.load_messages_from_storage(start_op, count).await;
 
-        let batch_set = Self::messages_to_batch_set(messages);
+        let batch_set = Self::messages_to_batch_set(&messages);
         let result = match query {
             MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
             MessageLookup::Timestamp { timestamp, count } => {
@@ -359,51 +358,3 @@ where
         }
     }
 }
-
-impl Journal<Noop> for PartitionJournal {
-    type Header = MessageLookup;
-    type Entry = IggyMessagesBatchMut;
-    type HeaderRef<'a> = MessageLookup;
-
-    fn header(&self, _idx: usize) -> Option<Self::HeaderRef<'_>> {
-        unreachable!("fn header: header lookup not supported for partition 
journal.");
-    }
-
-    fn previous_header(&self, _header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
-        unreachable!("fn previous_header: header lookup not supported for 
partition journal.");
-    }
-
-    async fn append(&self, entry: Self::Entry) {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        batch_set.add_batch(entry);
-    }
-
-    async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
-        // Entry lookups go through SegmentedLog which uses JournalInfo
-        // to construct MessageLookup headers. The actual query is done
-        // via get() below, not through the Journal trait.
-        let _ = header;
-        unreachable!("fn entry: use SegmentedLog::get() instead for partition 
journal lookups.");
-    }
-}
-
-impl PartitionJournal {
-    /// Query messages by offset or timestamp with count.
-    ///
-    /// This is called by `SegmentedLog` using `MessageLookup` headers
-    /// constructed from `JournalInfo`.
-    pub fn get(&self, header: &MessageLookup) -> Option<IggyMessagesBatchSet> {
-        let batch_set = unsafe { &*self.batch_set.get() };
-        let result = match header {
-            MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(*offset, *count),
-            MessageLookup::Timestamp { timestamp, count } => {
-                batch_set.get_by_timestamp(*timestamp, *count)
-            }
-        };
-        if result.is_empty() {
-            None
-        } else {
-            Some(result)
-        }
-    }
-}
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index d997d72b7..1f18e32ed 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -23,7 +23,11 @@ mod journal;
 mod log;
 mod types;
 
-use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::{Bytes, BytesMut};
+use iggy_common::{
+    INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet,
+    PooledBuffer, header::PrepareHeader, message::Message,
+};
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
 pub use types::{
@@ -31,6 +35,33 @@ pub use types::{
     SendMessagesResult,
 };
 
+pub(crate) fn decode_send_messages_batch(body: Bytes) -> 
Option<IggyMessagesBatchMut> {
+    // TODO: This very is bad, IGGY-114 Fixes this.
+    let mut body = body
+        .try_into_mut()
+        .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
+
+    if body.len() < 4 {
+        return None;
+    }
+
+    let count_bytes = body.split_to(4);
+    let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
+    let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
+
+    if body.len() < indexes_len {
+        return None;
+    }
+
+    let indexes_bytes = body.split_to(indexes_len);
+    let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
+    let messages = PooledBuffer::from(body);
+
+    Some(IggyMessagesBatchMut::from_indexes_and_messages(
+        indexes, messages,
+    ))
+}
+
 /// Partition-level data plane operations.
 ///
 /// `send_messages` MUST only append to the partition journal (prepare phase),
@@ -38,7 +69,7 @@ pub use types::{
 pub trait Partition {
     fn append_messages(
         &mut self,
-        batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) -> impl Future<Output = Result<AppendResult, IggyError>>;
 
     fn poll_messages(
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 316a54fe7..9f4d64b79 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -46,7 +46,7 @@ impl Storage for MemStorage {
     }
 
     async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
-        let buffer = vec![0; len];
+        let mut buffer = vec![0; len];
         let data = self.data.borrow();
         let end = offset + buffer.len();
         if offset < data.len() && end <= data.len() {

Reply via email to