numinnex commented on code in PR #2909:
URL: https://github.com/apache/iggy/pull/2909#discussion_r2923060386


##########
core/partitions/src/journal.rs:
##########
@@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-/// In-memory journal that accumulates message batches as an 
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
-    batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+    S: Storage,
+{
+    type Query;
+
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
+}
+
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+    entries: UnsafeCell<Vec<Bytes>>,
+    op_to_index: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+    type Buffer = Bytes;
+
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let op = Message::<PrepareHeader>::from_bytes(buf.clone())
+            .ok()
+            .map(|message| message.header().op);
+
+        let entries = unsafe { &mut *self.entries.get() };
+        let index = entries.len();
+        entries.push(buf.clone());
+
+        if let Some(op) = op {
+            let op_to_index = unsafe { &mut *self.op_to_index.get() };
+            op_to_index.insert(op, index);
+        }
+
+        buf.len()
+    }
+
+    async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+        let op = offset as u64;
+        let Some(index) = ({
+            let op_to_index = unsafe { &*self.op_to_index.get() };
+            op_to_index.get(&op).copied()
+        }) else {
+            return Bytes::new();
+        };
+
+        let entries = unsafe { &*self.entries.get() };
+        entries.get(index).cloned().unwrap_or_default()
+    }
+}
+
+pub struct PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    headers: UnsafeCell<Vec<PrepareHeader>>,
+    inner: UnsafeCell<JournalInner<S>>,
 }
 
-impl PartitionJournal {
-    pub fn new() -> Self {
+impl<S> Default for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes> + Default,
+{
+    fn default() -> Self {
         Self {
-            batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+            message_offset_to_op: UnsafeCell::new(BTreeMap::new()),
+            timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+            headers: UnsafeCell::new(Vec::new()),
+            inner: UnsafeCell::new(JournalInner {
+                storage: S::default(),
+            }),
         }
     }
+}
+
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PartitionJournal2Impl").finish()
+    }
+}
+
+struct JournalInner<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    storage: S,
+}
+
+impl PartitionJournalMemStorage {
+    fn drain(&self) -> Vec<Bytes> {
+        let entries = unsafe { &mut *self.entries.get() };
+        let drained = std::mem::take(entries);
+        let op_to_index = unsafe { &mut *self.op_to_index.get() };
+        op_to_index.clear();
+        drained
+    }
 
-    /// Drain all accumulated batches, returning the batch set.
+    fn is_empty(&self) -> bool {
+        let entries = unsafe { &*self.entries.get() };
+        entries.is_empty()
+    }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+    /// Drain all accumulated batches, matching the legacy `PartitionJournal` 
API.
     pub fn commit(&self) -> IggyMessagesBatchSet {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        std::mem::take(batch_set)
+        let entries = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.drain()
+        };
+
+        let mut messages = Vec::with_capacity(entries.len());
+        for bytes in entries {
+            if let Ok(message) = Message::from_bytes(bytes) {
+                messages.push(message);
+            }
+        }
+
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.clear();
+        let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+        offsets.clear();
+        let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+        timestamps.clear();
+
+        Self::messages_to_batch_set(&messages)

Review Comment:
   The goal of this API is to drain entries from journal without validating 
(panic if validation fails), because all of the validation happens before any 
entry enters the journal.  
   
   I think the point is valid that in case of failed validation, we should make 
sure that we don't leave journal in inconsistent state, but the way we will 
achieve that is by crashing the server (if we fail parsing batch read from 
journal as the commit essentially is like reading from it, then something 
really bad happend)



##########
core/partitions/src/journal.rs:
##########
@@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-/// In-memory journal that accumulates message batches as an 
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
-    batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+    S: Storage,
+{
+    type Query;
+
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
+}
+
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+    entries: UnsafeCell<Vec<Bytes>>,

Review Comment:
   Yeah I agree, the only reason why the op_to_index mapping even exists there, 
is because we have to handle the `commit` path which resets the storage 
`offset`, therefore our op_number != storage_offset. I will move that 
translation to the `PartitionJournal` rather than having it in storage. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to