This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 18588f149 refactor(journal): refactor partition journal utilize 
storage trait (#2909)
18588f149 is described below

commit 18588f1493a86e7f1dc3b79eadc9edd0b6557a81
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Mar 16 22:17:47 2026 +0100

    refactor(journal): refactor partition journal utilize storage trait (#2909)
    
    Refactor the `PartitionJournal` to use the `Strorage` trait as backing
    storage, rather than storing data inline.
---
 core/journal/src/lib.rs                |   5 +-
 core/partitions/src/iggy_partition.rs  |  51 ++++-
 core/partitions/src/iggy_partitions.rs |  54 ++---
 core/partitions/src/journal.rs         | 394 +++++++++++++++++++++++++++------
 core/partitions/src/lib.rs             |  35 ++-
 core/simulator/src/deps.rs             |   6 +-
 6 files changed, 430 insertions(+), 115 deletions(-)

diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index f1b4081dd..55a6b14fb 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -39,7 +39,10 @@ pub trait Storage {
     type Buffer;
 
     fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
-    fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output 
= Self::Buffer>;
+    // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` 
in order to support that.
+    // Maybe we should go back to passing in the `Buffer` again, but I am not 
sure how to handle it in the `Partitions Journal`, since we use in-memory impl
+    // which extracts the buffer out of the `Vec<Message>` and we don't need 
to allocate a new buffer.
+    fn read(&self, offset: usize, len: usize) -> impl Future<Output = 
Self::Buffer>;
 }
 
 pub trait JournalHandle {
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 49c820230..65fb470ae 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::journal::{Noop, PartitionJournal};
+use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
 use crate::log::SegmentedLog;
-use crate::{AppendResult, Partition};
+use crate::{AppendResult, Partition, decode_send_messages_batch};
 use iggy_common::{
     ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, 
IggyMessagesBatchMut,
     IggyTimestamp, PartitionStats,
+    header::{Operation, PrepareHeader},
+    message::Message,
 };
 use journal::Journal as _;
 use std::sync::Arc;
@@ -30,7 +32,7 @@ use tokio::sync::Mutex as TokioMutex;
 // This struct aliases in terms of the code contained the `LocalPartition from 
`core/server/src/streaming/partitions/local_partition.rs`.
 #[derive(Debug)]
 pub struct IggyPartition {
-    pub log: SegmentedLog<PartitionJournal, Noop>,
+    pub log: SegmentedLog<PartitionJournal<PartitionJournalMemStorage>, 
PartitionJournalMemStorage>,
     /// Committed offset — advanced only after quorum ack.
     pub offset: Arc<AtomicU64>,
     /// Dirty offset — advanced on every prepare (before commit).
@@ -46,6 +48,36 @@ pub struct IggyPartition {
 }
 
 impl IggyPartition {
+    fn prepare_message_from_batch(
+        mut header: PrepareHeader,
+        batch: &IggyMessagesBatchMut,
+    ) -> Message<PrepareHeader> {
+        let indexes = batch.indexes();
+        let count = batch.count();
+        let body_len = 4 + indexes.len() + batch.len();
+        let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
+        header.size = u32::try_from(total_size)
+            .expect("prepare_message_from_batch: batch size exceeds u32::MAX");
+
+        let message = 
Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
+            *new = header;
+        });
+
+        let mut bytes = message
+            .into_inner()
+            .try_into_mut()
+            .expect("prepare_message_from_batch: expected unique bytes 
buffer");
+        let header_size = std::mem::size_of::<PrepareHeader>();
+        bytes[header_size..header_size + 
4].copy_from_slice(&count.to_le_bytes());
+        let mut position = header_size + 4;
+        bytes[position..position + indexes.len()].copy_from_slice(indexes);
+        position += indexes.len();
+        bytes[position..position + batch.len()].copy_from_slice(batch);
+
+        Message::<PrepareHeader>::from_bytes(bytes.freeze())
+            .expect("prepare_message_from_batch: invalid prepared message 
bytes")
+    }
+
     pub fn new(stats: Arc<PartitionStats>) -> Self {
         Self {
             log: SegmentedLog::default(),
@@ -65,8 +97,16 @@ impl IggyPartition {
 impl Partition for IggyPartition {
     async fn append_messages(
         &mut self,
-        mut batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) -> Result<AppendResult, IggyError> {
+        let header = *message.header();
+        if header.operation != Operation::SendMessages {
+            return Err(IggyError::CannotAppendMessage);
+        }
+
+        let mut batch = decode_send_messages_batch(message.body_bytes())
+            .ok_or(IggyError::CannotAppendMessage)?;
+
         if batch.count() == 0 {
             return Ok(AppendResult::new(0, 0, 0));
         }
@@ -116,7 +156,8 @@ impl Partition for IggyPartition {
             journal.info.end_timestamp = ts;
         }
 
-        journal.inner.append(batch).await;
+        let message = Self::prepare_message_from_batch(header, &batch);
+        journal.inner.append(message).await;
 
         Ok(AppendResult::new(
             dirty_offset,
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index c3b1e0bc6..931e804a7 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -30,8 +30,7 @@ use consensus::{
 };
 use iggy_common::header::Command2;
 use iggy_common::{
-    INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
-    Segment, SegmentStorage,
+    IggyByteSize, PartitionStats, Segment, SegmentStorage,
     header::{
         ConsensusHeader, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, RequestHeader,
     },
@@ -355,13 +354,13 @@ where
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
-        let header = message.header();
+        let header = *message.header();
         let namespace = IggyNamespace::from_raw(header.namespace);
         let consensus = self
             .consensus()
             .expect("on_replicate: consensus not initialized");
 
-        let current_op = match replicate_preflight(consensus, header) {
+        let current_op = match replicate_preflight(consensus, &header) {
             Ok(current_op) => current_op,
             Err(reason) => {
                 warn!(
@@ -372,7 +371,7 @@ where
             }
         };
 
-        let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+        let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
         if is_old_prepare {
             warn!("received old prepare, not replicating");
         } else {
@@ -387,9 +386,9 @@ where
         // TODO: Figure out the flow of the partition operations.
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        self.apply_replicated_operation(&namespace, &message).await;
+        self.apply_replicated_operation(&namespace, message).await;
 
-        self.send_prepare_ok(header).await;
+        self.send_prepare_ok(&header).await;
 
         if consensus.is_follower() {
             self.commit_journal(namespace);
@@ -539,37 +538,21 @@ where
             .register_namespace(ns);
     }
 
-    // TODO: Move this elsewhere, also do not reallocate, we do reallocationg 
now becauise we use PooledBuffer for the batch body
-    // but `Bytes` for `Message` payload.
-    fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
-        assert!(body.len() >= 4, "prepare body too small for batch header");
-        let count = u32::from_le_bytes(body[0..4].try_into().unwrap());
-        let indexes_len = count as usize * INDEX_SIZE;
-        let indexes_end = 4 + indexes_len;
-        assert!(
-            body.len() >= indexes_end,
-            "prepare body too small for {count} indexes",
-        );
-
-        let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
-        let messages = PooledBuffer::from(&body[indexes_end..]);
-        IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
-    }
-
     async fn apply_replicated_operation(
         &self,
         namespace: &IggyNamespace,
-        message: &Message<PrepareHeader>,
+        message: Message<PrepareHeader>,
     ) {
         let consensus = self
             .consensus()
             .expect("apply_replicated_operation: consensus not initialized");
-        let header = message.header();
+        let header = *message.header();
 
+        // TODO: WE have to distinguish between an `message` recv by leader 
and follower.
+        // In the follower path, we have to skip the `prepare_for_persistence` 
path, just append to journal.
         match header.operation {
             Operation::SendMessages => {
-                let body = message.body_bytes();
-                self.append_send_messages_to_journal(namespace, body.as_ref())
+                self.append_send_messages_to_journal(namespace, message)
                     .await;
                 debug!(
                     replica = consensus.replica(),
@@ -598,12 +581,7 @@ where
         }
     }
 
-    async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace, 
body: &[u8]) {
-        let batch = Self::batch_from_body(body);
-        self.append_messages_to_journal(namespace, batch).await;
-    }
-
-    /// Append a batch to a partition's journal with offset assignment.
+    /// Append a prepare message to a partition's journal with offset 
assignment.
     ///
     /// Updates `segment.current_position` (logical position for indexing) but
     /// not `segment.end_offset` or `segment.end_timestamp` (committed state).
@@ -611,15 +589,15 @@ where
     ///
     /// Uses `dirty_offset` for offset assignment so that multiple prepares
     /// can be pipelined before any commit.
-    async fn append_messages_to_journal(
+    async fn append_send_messages_to_journal(
         &self,
         namespace: &IggyNamespace,
-        batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) {
         let partition = self
             .get_mut_by_ns(namespace)
-            .expect("append_messages_to_journal: partition not found for 
namespace");
-        let _ = partition.append_messages(batch).await;
+            .expect("append_send_messages_to_journal: partition not found for 
namespace");
+        let _ = partition.append_messages(message).await;
     }
 
     /// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index ab281e940..d3b779fee 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,29 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::Bytes;
+use iggy_common::{
+    IggyMessagesBatchMut, IggyMessagesBatchSet,
+    header::{Operation, PrepareHeader},
+    message::Message,
+};
 use journal::{Journal, Storage};
-use std::cell::UnsafeCell;
+use std::{
+    cell::UnsafeCell,
+    collections::{BTreeMap, HashMap},
+};
 
-// TODO: Fix that, we need to figure out how to store the 
`IggyMessagesBatchSet`.
-/// No-op storage backend for the in-memory partition journal.
-#[derive(Debug)]
-pub struct Noop;
-
-impl Storage for Noop {
-    type Buffer = ();
-
-    async fn write(&self, _buf: ()) -> usize {
-        0
-    }
-
-    async fn read(&self, _offset: usize, _buffer: ()) {}
-}
+const ZERO_LEN: usize = 0;
 
 /// Lookup key for querying messages from the journal.
 #[derive(Debug, Clone, Copy)]
 pub enum MessageLookup {
+    #[allow(dead_code)]
     Offset { offset: u64, count: u32 },
+    #[allow(dead_code)]
     Timestamp { timestamp: u64, count: u32 },
 }
 
@@ -49,89 +46,354 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-/// In-memory journal that accumulates message batches as an 
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
-    batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+    S: Storage,
+{
+    type Query;
+
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
 }
 
-impl PartitionJournal {
-    pub fn new() -> Self {
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+    entries: UnsafeCell<Vec<Bytes>>,
+    /// Maps byte offset (as if disk-backed) to index in entries Vec
+    offset_to_index: UnsafeCell<HashMap<usize, usize>>,
+    /// Current write position (cumulative byte offset)
+    current_offset: UnsafeCell<usize>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+    type Buffer = Bytes;
+
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let len = buf.len();
+        let entries = unsafe { &mut *self.entries.get() };
+        let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
+        let current_offset = unsafe { &mut *self.current_offset.get() };
+
+        let index = entries.len();
+        offset_to_index.insert(*current_offset, index);
+        entries.push(buf);
+
+        let write_offset = *current_offset;
+        *current_offset += len;
+
+        write_offset
+    }
+
+    async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+        let offset_to_index = unsafe { &*self.offset_to_index.get() };
+        let Some(&index) = offset_to_index.get(&offset) else {
+            return Bytes::new();
+        };
+
+        let entries = unsafe { &*self.entries.get() };
+        entries.get(index).cloned().unwrap_or_default()
+    }
+}
+
+pub struct PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    /// Maps op -> storage byte offset (for all entries)
+    op_to_storage_offset: UnsafeCell<BTreeMap<u64, usize>>,
+    /// Maps message offset -> op (for queryable entries)
+    offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    /// Maps timestamp -> op (for queryable entries)
+    timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    headers: UnsafeCell<Vec<PrepareHeader>>,
+    inner: UnsafeCell<JournalInner<S>>,
+}
+
+impl<S> Default for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes> + Default,
+{
+    fn default() -> Self {
         Self {
-            batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+            op_to_storage_offset: UnsafeCell::new(BTreeMap::new()),
+            offset_to_op: UnsafeCell::new(BTreeMap::new()),
+            timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+            headers: UnsafeCell::new(Vec::new()),
+            inner: UnsafeCell::new(JournalInner {
+                storage: S::default(),
+            }),
         }
     }
+}
 
-    /// Drain all accumulated batches, returning the batch set.
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PartitionJournal2Impl").finish()
+    }
+}
+
+struct JournalInner<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    storage: S,
+}
+
+impl PartitionJournalMemStorage {
+    fn drain(&self) -> Vec<Bytes> {
+        let entries = unsafe { &mut *self.entries.get() };
+        let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
+        let current_offset = unsafe { &mut *self.current_offset.get() };
+
+        offset_to_index.clear();
+        *current_offset = 0;
+
+        std::mem::take(entries)
+    }
+
+    fn is_empty(&self) -> bool {
+        let entries = unsafe { &*self.entries.get() };
+        entries.is_empty()
+    }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+    /// Drain all accumulated batches, matching the legacy `PartitionJournal` 
API.
     pub fn commit(&self) -> IggyMessagesBatchSet {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        std::mem::take(batch_set)
+        let entries = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.drain()
+        };
+
+        let mut messages = Vec::with_capacity(entries.len());
+        for bytes in entries {
+            if let Ok(message) = Message::from_bytes(bytes) {
+                messages.push(message);
+            }
+        }
+
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.clear();
+        let op_to_storage_offset = unsafe { &mut 
*self.op_to_storage_offset.get() };
+        op_to_storage_offset.clear();
+        let offset_to_op = unsafe { &mut *self.offset_to_op.get() };
+        offset_to_op.clear();
+        let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
+        timestamp_to_op.clear();
+
+        Self::messages_to_batch_set(&messages)
     }
 
     pub fn is_empty(&self) -> bool {
-        let batch_set = unsafe { &*self.batch_set.get() };
-        batch_set.is_empty()
+        let inner = unsafe { &*self.inner.get() };
+        inner.storage.is_empty()
     }
 }
 
-impl Default for PartitionJournal {
-    fn default() -> Self {
-        Self::new()
+impl<S> PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    fn message_to_batch(message: &Message<PrepareHeader>) -> 
Option<IggyMessagesBatchMut> {
+        if message.header().operation != Operation::SendMessages {
+            return None;
+        }
+
+        crate::decode_send_messages_batch(message.body_bytes())
     }
-}
 
-impl std::fmt::Debug for PartitionJournal {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("PartitionJournal").finish()
+    fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> 
IggyMessagesBatchSet {
+        let mut batch_set = IggyMessagesBatchSet::empty();
+
+        for message in messages {
+            if let Some(batch) = Self::message_to_batch(message) {
+                batch_set.add_batch(batch);
+            }
+        }
+
+        batch_set
+    }
+
+    #[allow(dead_code)]
+    fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
+        match query {
+            MessageLookup::Offset { offset, .. } => {
+                let offset_to_op = unsafe { &*self.offset_to_op.get() };
+                offset_to_op
+                    .range(..=*offset)
+                    .next_back()
+                    .or_else(|| offset_to_op.range(*offset..).next())
+                    .map(|(_, op)| *op)
+            }
+            MessageLookup::Timestamp { timestamp, .. } => {
+                let timestamp_to_op = unsafe { &*self.timestamp_to_op.get() };
+                timestamp_to_op
+                    .range(..=*timestamp)
+                    .next_back()
+                    .or_else(|| timestamp_to_op.range(*timestamp..).next())
+                    .map(|(_, op)| *op)
+            }
+        }
+    }
+
+    async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
+        let storage_offset = {
+            let op_to_storage_offset = unsafe { 
&*self.op_to_storage_offset.get() };
+            *op_to_storage_offset.get(&op)?
+        };
+
+        let bytes = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.read(storage_offset, ZERO_LEN).await
+        };
+
+        if bytes.is_empty() {
+            return None;
+        }
+
+        Some(
+            Message::from_bytes(bytes)
+                .expect("partition.journal.storage.read: invalid bytes for 
message"),
+        )
+    }
+
+    #[allow(dead_code)]
+    async fn load_messages_from_storage(
+        &self,
+        start_op: u64,
+        count: u32,
+    ) -> Vec<Message<PrepareHeader>> {
+        if count == 0 {
+            return Vec::new();
+        }
+
+        // Get (op, storage_offset) pairs directly from the mapping
+        // BTreeMap is already sorted by op
+        let op_offsets: Vec<(u64, usize)> = {
+            let op_to_storage_offset = unsafe { 
&*self.op_to_storage_offset.get() };
+            op_to_storage_offset
+                .range(start_op..)
+                .map(|(op, offset)| (*op, *offset))
+                .collect()
+        };
+
+        let mut messages = Vec::new();
+        let mut loaded_messages = 0u32;
+
+        for (_, storage_offset) in op_offsets {
+            if loaded_messages >= count {
+                break;
+            }
+
+            let bytes = {
+                let inner = unsafe { &*self.inner.get() };
+                inner.storage.read(storage_offset, ZERO_LEN).await
+            };
+
+            if bytes.is_empty() {
+                continue;
+            }
+
+            let message = Message::from_bytes(bytes)
+                .expect("partition.journal.storage.read: invalid bytes for 
message");
+
+            if let Some(batch) = Self::message_to_batch(&message) {
+                loaded_messages = 
loaded_messages.saturating_add(batch.count());
+                messages.push(message);
+            }
+        }
+
+        messages
     }
 }
 
-impl Journal<Noop> for PartitionJournal {
-    type Header = MessageLookup;
-    type Entry = IggyMessagesBatchMut;
-    type HeaderRef<'a> = MessageLookup;
+impl<S> Journal<S> for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    type Header = PrepareHeader;
+    type Entry = Message<Self::Header>;
+    #[rustfmt::skip] // Scuffed formatter.
+    type HeaderRef<'a> = &'a Self::Header where S: 'a;
 
-    fn header(&self, _idx: usize) -> Option<Self::HeaderRef<'_>> {
-        unreachable!("fn header: header lookup not supported for partition 
journal.");
+    fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.get(idx)
     }
 
-    fn previous_header(&self, _header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
-        unreachable!("fn previous_header: header lookup not supported for 
partition journal.");
+    fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
+        if header.op == 0 {
+            return None;
+        }
+
+        let prev_op = header.op - 1;
+        let headers = unsafe { &*self.headers.get() };
+        headers.iter().find(|candidate| candidate.op == prev_op)
     }
 
     async fn append(&self, entry: Self::Entry) {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        batch_set.add_batch(entry);
+        let first_offset_and_timestamp = Self::message_to_batch(&entry)
+            .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
+
+        let header = *entry.header();
+        let op = header.op;
+
+        {
+            let headers = unsafe { &mut *self.headers.get() };
+            headers.push(header);
+        };
+
+        let bytes = entry.into_inner();
+        let storage_offset = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.write(bytes).await
+        };
+
+        {
+            let op_to_storage_offset = unsafe { &mut 
*self.op_to_storage_offset.get() };
+            op_to_storage_offset.insert(op, storage_offset);
+        }
+
+        if let Some((offset, timestamp)) = first_offset_and_timestamp {
+            let offset_to_op = unsafe { &mut *self.offset_to_op.get() };
+            offset_to_op.insert(offset, op);
+
+            let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
+            timestamp_to_op.insert(timestamp, op);
+        }
     }
 
     async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
-        // Entry lookups go through SegmentedLog which uses JournalInfo
-        // to construct MessageLookup headers. The actual query is done
-        // via get() below, not through the Journal trait.
-        let _ = header;
-        unreachable!("fn entry: use SegmentedLog::get() instead for partition 
journal lookups.");
+        self.message_by_op(header.op).await
     }
 }
 
-impl PartitionJournal {
-    /// Query messages by offset or timestamp with count.
-    ///
-    /// This is called by `SegmentedLog` using `MessageLookup` headers
-    /// constructed from `JournalInfo`.
-    pub fn get(&self, header: &MessageLookup) -> Option<IggyMessagesBatchSet> {
-        let batch_set = unsafe { &*self.batch_set.get() };
-        let result = match header {
-            MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(*offset, *count),
+impl<S> QueryableJournal<S> for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    type Query = MessageLookup;
+
+    async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
+        let query = *query;
+        let start_op = self.candidate_start_op(&query)?;
+        let count = match query {
+            MessageLookup::Offset { count, .. } | MessageLookup::Timestamp { 
count, .. } => count,
+        };
+
+        let messages = self.load_messages_from_storage(start_op, count).await;
+
+        let batch_set = Self::messages_to_batch_set(&messages);
+        let result = match query {
+            MessageLookup::Offset { offset, count } => 
batch_set.get_by_offset(offset, count),
             MessageLookup::Timestamp { timestamp, count } => {
-                batch_set.get_by_timestamp(*timestamp, *count)
+                batch_set.get_by_timestamp(timestamp, count)
             }
         };
+
         if result.is_empty() {
             None
         } else {
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index d997d72b7..1f18e32ed 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -23,7 +23,11 @@ mod journal;
 mod log;
 mod types;
 
-use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::{Bytes, BytesMut};
+use iggy_common::{
+    INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet,
+    PooledBuffer, header::PrepareHeader, message::Message,
+};
 pub use iggy_partition::IggyPartition;
 pub use iggy_partitions::IggyPartitions;
 pub use types::{
@@ -31,6 +35,33 @@ pub use types::{
     SendMessagesResult,
 };
 
+pub(crate) fn decode_send_messages_batch(body: Bytes) -> 
Option<IggyMessagesBatchMut> {
+    // TODO: This very is bad, IGGY-114 Fixes this.
+    let mut body = body
+        .try_into_mut()
+        .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
+
+    if body.len() < 4 {
+        return None;
+    }
+
+    let count_bytes = body.split_to(4);
+    let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
+    let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
+
+    if body.len() < indexes_len {
+        return None;
+    }
+
+    let indexes_bytes = body.split_to(indexes_len);
+    let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
+    let messages = PooledBuffer::from(body);
+
+    Some(IggyMessagesBatchMut::from_indexes_and_messages(
+        indexes, messages,
+    ))
+}
+
 /// Partition-level data plane operations.
 ///
 /// `send_messages` MUST only append to the partition journal (prepare phase),
@@ -38,7 +69,7 @@ pub use types::{
 pub trait Partition {
     fn append_messages(
         &mut self,
-        batch: IggyMessagesBatchMut,
+        message: Message<PrepareHeader>,
     ) -> impl Future<Output = Result<AppendResult, IggyError>>;
 
     fn poll_messages(
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 12e78bb05..9f4d64b79 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -45,7 +45,8 @@ impl Storage for MemStorage {
         len
     }
 
-    async fn read(&self, offset: usize, mut buffer: Self::Buffer) -> 
Self::Buffer {
+    async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
+        let mut buffer = vec![0; len];
         let data = self.data.borrow();
         let end = offset + buffer.len();
         if offset < data.len() && end <= data.len() {
@@ -106,8 +107,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
         let header = headers.get(&header.op)?;
         let offset = *offsets.get(&header.op)?;
 
-        let buffer = vec![0; header.size as usize];
-        let buffer = self.storage.read(offset, buffer).await;
+        let buffer = self.storage.read(offset, header.size as usize).await;
         let message =
             Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
         Some(message)

Reply via email to