atharvalade commented on code in PR #2909:
URL: https://github.com/apache/iggy/pull/2909#discussion_r2912416661
##########
core/partitions/src/journal.rs:
##########
@@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup {
}
}
-/// In-memory journal that accumulates message batches as an
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
- batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+ S: Storage,
+{
+ type Query;
+
+ fn get(&self, query: &Self::Query) -> impl Future<Output =
Option<IggyMessagesBatchSet>>;
+}
+
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+ entries: UnsafeCell<Vec<Bytes>>,
+ op_to_index: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+ type Buffer = Bytes;
+
+ async fn write(&self, buf: Self::Buffer) -> usize {
+ let op = Message::<PrepareHeader>::from_bytes(buf.clone())
+ .ok()
+ .map(|message| message.header().op);
+
+ let entries = unsafe { &mut *self.entries.get() };
+ let index = entries.len();
+ entries.push(buf.clone());
+
+ if let Some(op) = op {
+ let op_to_index = unsafe { &mut *self.op_to_index.get() };
+ op_to_index.insert(op, index);
+ }
+
+ buf.len()
+ }
+
+ async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+ let op = offset as u64;
+ let Some(index) = ({
+ let op_to_index = unsafe { &*self.op_to_index.get() };
+ op_to_index.get(&op).copied()
+ }) else {
+ return Bytes::new();
+ };
+
+ let entries = unsafe { &*self.entries.get() };
+ entries.get(index).cloned().unwrap_or_default()
+ }
+}
+
+pub struct PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ headers: UnsafeCell<Vec<PrepareHeader>>,
+ inner: UnsafeCell<JournalInner<S>>,
}
-impl PartitionJournal {
- pub fn new() -> Self {
+impl<S> Default for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes> + Default,
+{
+ fn default() -> Self {
Self {
- batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+ message_offset_to_op: UnsafeCell::new(BTreeMap::new()),
+ timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+ headers: UnsafeCell::new(Vec::new()),
+ inner: UnsafeCell::new(JournalInner {
+ storage: S::default(),
+ }),
}
}
+}
+
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PartitionJournal2Impl").finish()
+ }
+}
+
+struct JournalInner<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ storage: S,
+}
+
+impl PartitionJournalMemStorage {
+ fn drain(&self) -> Vec<Bytes> {
+ let entries = unsafe { &mut *self.entries.get() };
+ let drained = std::mem::take(entries);
+ let op_to_index = unsafe { &mut *self.op_to_index.get() };
+ op_to_index.clear();
+ drained
+ }
- /// Drain all accumulated batches, returning the batch set.
+ fn is_empty(&self) -> bool {
+ let entries = unsafe { &*self.entries.get() };
+ entries.is_empty()
+ }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+ /// Drain all accumulated batches, matching the legacy `PartitionJournal`
API.
pub fn commit(&self) -> IggyMessagesBatchSet {
- let batch_set = unsafe { &mut *self.batch_set.get() };
- std::mem::take(batch_set)
+ let entries = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.drain()
+ };
+
+ let mut messages = Vec::with_capacity(entries.len());
+ for bytes in entries {
+ if let Ok(message) = Message::from_bytes(bytes) {
+ messages.push(message);
+ }
+ }
+
+ let headers = unsafe { &mut *self.headers.get() };
+ headers.clear();
+ let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+ offsets.clear();
+ let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+ timestamps.clear();
+
+ Self::messages_to_batch_set(&messages)
Review Comment:
`commit()` drains storage destructively before parsing. If any
`Message::from_bytes` call returns `Err`, the entry is dropped. The old code
had no parsing step on this path, so this failure mode is new and unique to
this refactor.
##########
core/partitions/src/journal.rs:
##########
@@ -49,89 +46,313 @@ impl std::ops::Deref for MessageLookup {
}
}
-/// In-memory journal that accumulates message batches as an
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
- batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+ S: Storage,
+{
+ type Query;
+
+ fn get(&self, query: &Self::Query) -> impl Future<Output =
Option<IggyMessagesBatchSet>>;
+}
+
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+ entries: UnsafeCell<Vec<Bytes>>,
+ op_to_index: UnsafeCell<HashMap<u64, usize>>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+ type Buffer = Bytes;
+
+ async fn write(&self, buf: Self::Buffer) -> usize {
+ let op = Message::<PrepareHeader>::from_bytes(buf.clone())
+ .ok()
+ .map(|message| message.header().op);
+
+ let entries = unsafe { &mut *self.entries.get() };
+ let index = entries.len();
+ entries.push(buf.clone());
+
+ if let Some(op) = op {
+ let op_to_index = unsafe { &mut *self.op_to_index.get() };
+ op_to_index.insert(op, index);
+ }
+
+ buf.len()
+ }
+
+ async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+ let op = offset as u64;
+ let Some(index) = ({
+ let op_to_index = unsafe { &*self.op_to_index.get() };
+ op_to_index.get(&op).copied()
+ }) else {
+ return Bytes::new();
+ };
+
+ let entries = unsafe { &*self.entries.get() };
+ entries.get(index).cloned().unwrap_or_default()
+ }
+}
+
+pub struct PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ message_offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ headers: UnsafeCell<Vec<PrepareHeader>>,
+ inner: UnsafeCell<JournalInner<S>>,
}
-impl PartitionJournal {
- pub fn new() -> Self {
+impl<S> Default for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes> + Default,
+{
+ fn default() -> Self {
Self {
- batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+ message_offset_to_op: UnsafeCell::new(BTreeMap::new()),
+ timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+ headers: UnsafeCell::new(Vec::new()),
+ inner: UnsafeCell::new(JournalInner {
+ storage: S::default(),
+ }),
}
}
+}
+
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PartitionJournal2Impl").finish()
+ }
+}
+
+struct JournalInner<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ storage: S,
+}
+
+impl PartitionJournalMemStorage {
+ fn drain(&self) -> Vec<Bytes> {
+ let entries = unsafe { &mut *self.entries.get() };
+ let drained = std::mem::take(entries);
+ let op_to_index = unsafe { &mut *self.op_to_index.get() };
+ op_to_index.clear();
+ drained
+ }
- /// Drain all accumulated batches, returning the batch set.
+ fn is_empty(&self) -> bool {
+ let entries = unsafe { &*self.entries.get() };
+ entries.is_empty()
+ }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+ /// Drain all accumulated batches, matching the legacy `PartitionJournal`
API.
pub fn commit(&self) -> IggyMessagesBatchSet {
- let batch_set = unsafe { &mut *self.batch_set.get() };
- std::mem::take(batch_set)
+ let entries = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.drain()
+ };
+
+ let mut messages = Vec::with_capacity(entries.len());
+ for bytes in entries {
+ if let Ok(message) = Message::from_bytes(bytes) {
+ messages.push(message);
+ }
+ }
+
+ let headers = unsafe { &mut *self.headers.get() };
+ headers.clear();
+ let offsets = unsafe { &mut *self.message_offset_to_op.get() };
+ offsets.clear();
+ let timestamps = unsafe { &mut *self.timestamp_to_op.get() };
+ timestamps.clear();
+
+ Self::messages_to_batch_set(&messages)
}
pub fn is_empty(&self) -> bool {
- let batch_set = unsafe { &*self.batch_set.get() };
- batch_set.is_empty()
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.is_empty()
}
}
-impl Default for PartitionJournal {
- fn default() -> Self {
- Self::new()
+impl<S> PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ fn message_to_batch(message: &Message<PrepareHeader>) ->
Option<IggyMessagesBatchMut> {
+ if message.header().operation != Operation::SendMessages {
+ return None;
+ }
+
+ crate::decode_send_messages_batch(message.body_bytes())
}
-}
-impl std::fmt::Debug for PartitionJournal {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("PartitionJournal").finish()
+ fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) ->
IggyMessagesBatchSet {
+ let mut batch_set = IggyMessagesBatchSet::empty();
+
+ for message in messages {
+ if let Some(batch) = Self::message_to_batch(message) {
+ batch_set.add_batch(batch);
+ }
+ }
+
+ batch_set
+ }
+
+ #[allow(dead_code)]
+ fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
+ match query {
+ MessageLookup::Offset { offset, .. } => {
+ let offsets = unsafe { &*self.message_offset_to_op.get() };
+ offsets
+ .range(..=*offset)
+ .next_back()
+ .or_else(|| offsets.range(*offset..).next())
+ .map(|(_, op)| *op)
+ }
+ MessageLookup::Timestamp { timestamp, .. } => {
+ let timestamps = unsafe { &*self.timestamp_to_op.get() };
+ timestamps
+ .range(..=*timestamp)
+ .next_back()
+ .or_else(|| timestamps.range(*timestamp..).next())
+ .map(|(_, op)| *op)
+ }
+ }
+ }
+
+ async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
+ let offset = usize::try_from(op).ok()?;
+ let bytes = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.read(offset, ZERO_LEN).await
+ };
+
+ Some(
+ Message::from_bytes(bytes)
+ .expect("partition.journal.storage.read: invalid bytes for
message"),
+ )
+ }
Review Comment:
`message_by_op` returns `Option` but panics for any op that isn't in
storage.
`PartitionJournalMemStorage::read` returns `Bytes::new()` for unknown ops,
`Message::from_bytes` on empty bytes returns `Err`, and the `.expect()` panics.
This is directly reachable from `load_messages_from_storage` which walks ops
with `op += 1`; since consensus ops are globally scoped, so gaps per-partition
are expected and will trigger this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]