numinnex commented on code in PR #2909:
URL: https://github.com/apache/iggy/pull/2909#discussion_r2923092140


##########
core/partitions/src/journal.rs:
##########
@@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup {
     }
 }
 
-/// In-memory journal that accumulates message batches as an 
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
-    batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+    S: Storage,
+{
+    type Query;
+
+    fn get(&self, query: &Self::Query) -> impl Future<Output = 
Option<IggyMessagesBatchSet>>;
+}
+
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+    entries: UnsafeCell<Vec<Bytes>>,
+    op_to_index: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+    type Buffer = Bytes;
+
+    async fn write(&self, buf: Self::Buffer) -> usize {
+        let op = Message::<PrepareHeader>::from_bytes(buf.clone())
+            .ok()
+            .map(|message| message.header().op);
+
+        let entries = unsafe { &mut *self.entries.get() };
+        let index = entries.len();
+        entries.push(buf.clone());
+
+        if let Some(op) = op {
+            let op_to_index = unsafe { &mut *self.op_to_index.get() };
+            op_to_index.insert(op, index);
+        }
+
+        buf.len()
+    }
+
+    async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+        let op = offset as u64;
+        let Some(index) = ({
+            let op_to_index = unsafe { &*self.op_to_index.get() };
+            op_to_index.get(&op).copied()
+        }) else {
+            return Bytes::new();
+        };
+
+        let entries = unsafe { &*self.entries.get() };
+        entries.get(index).cloned().unwrap_or_default()
+    }
+}
+
+pub struct PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+    headers: UnsafeCell<Vec<PrepareHeader>>,
+    inner: UnsafeCell<JournalInner<S>>,
 }
 
-impl PartitionJournal {
-    pub fn new() -> Self {
+impl<S> Default for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes> + Default,
+{
+    fn default() -> Self {
         Self {
-            batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+            message_offset_to_op: UnsafeCell::new(BTreeMap::new()),
+            timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+            headers: UnsafeCell::new(Vec::new()),
+            inner: UnsafeCell::new(JournalInner {
+                storage: S::default(),
+            }),
         }
     }
+}
+
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PartitionJournal2Impl").finish()
+    }
+}
+
+struct JournalInner<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    storage: S,
+}
+
+impl PartitionJournalMemStorage {
+    fn drain(&self) -> Vec<Bytes> {
+        let entries = unsafe { &mut *self.entries.get() };
+        let drained = std::mem::take(entries);
+        let op_to_index = unsafe { &mut *self.op_to_index.get() };
+        op_to_index.clear();
+        drained
+    }
 
-    /// Drain all accumulated batches, returning the batch set.
+    fn is_empty(&self) -> bool {
+        let entries = unsafe { &*self.entries.get() };
+        entries.is_empty()
+    }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+    /// Drain all accumulated batches, matching the legacy `PartitionJournal` 
API.
     pub fn commit(&self) -> IggyMessagesBatchSet {
-        let batch_set = unsafe { &mut *self.batch_set.get() };
-        std::mem::take(batch_set)
+        let entries = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.drain()
+        };
+
+        let mut messages = Vec::with_capacity(entries.len());
+        for bytes in entries {
+            if let Ok(message) = Message::from_bytes(bytes) {
+                messages.push(message);
+            }
+        }
+
+        let headers = unsafe { &mut *self.headers.get() };
+        headers.clear();
+        let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+        offsets.clear();
+        let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+        timestamps.clear();
+
+        Self::messages_to_batch_set(&messages)
     }
 
     pub fn is_empty(&self) -> bool {
-        let batch_set = unsafe { &*self.batch_set.get() };
-        batch_set.is_empty()
+        let inner = unsafe { &*self.inner.get() };
+        inner.storage.is_empty()
     }
 }
 
-impl Default for PartitionJournal {
-    fn default() -> Self {
-        Self::new()
+impl<S> PartitionJournal<S>
+where
+    S: Storage<Buffer = Bytes>,
+{
+    fn message_to_batch(message: &Message<PrepareHeader>) -> 
Option<IggyMessagesBatchMut> {
+        if message.header().operation != Operation::SendMessages {
+            return None;
+        }
+
+        crate::decode_send_messages_batch(message.body_bytes())
     }
-}
 
-impl std::fmt::Debug for PartitionJournal {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("PartitionJournal").finish()
+    fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) -> 
IggyMessagesBatchSet {
+        let mut batch_set = IggyMessagesBatchSet::empty();
+
+        for message in messages {
+            if let Some(batch) = Self::message_to_batch(message) {
+                batch_set.add_batch(batch);
+            }
+        }
+
+        batch_set
+    }
+
+    #[allow(dead_code)]
+    fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
+        match query {
+            MessageLookup::Offset { offset, .. } => {
+                let offsets = unsafe { &*self.message_offset_to_op.get() };
+                offsets
+                    .range(..=*offset)
+                    .next_back()
+                    .or_else(|| offsets.range(*offset..).next())
+                    .map(|(_, op)| *op)
+            }
+            MessageLookup::Timestamp { timestamp, .. } => {
+                let timestamps = unsafe { &*self.timestamp_to_op.get() };
+                timestamps
+                    .range(..=*timestamp)
+                    .next_back()
+                    .or_else(|| timestamps.range(*timestamp..).next())
+                    .map(|(_, op)| *op)
+            }
+        }
+    }
+
+    async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
+        let offset = usize::try_from(op).ok()?;
+        let bytes = {
+            let inner = unsafe { &*self.inner.get() };
+            inner.storage.read(offset, ZERO_LEN).await
+        };
+
+        Some(
+            Message::from_bytes(bytes)
+                .expect("partition.journal.storage.read: invalid bytes for 
message"),
+        )
+    }

Review Comment:
   Good catch, we shouldn't expect op to be monotonic linearly, as you've 
mentioned they are shared between partition messages and partition metadata 
operations, so the op_number could potentially be scattered. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to