This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partitions_refactor in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f17ae7ca489a56f0861b6f622c2de95d07f92bbc Author: numinex <[email protected]> AuthorDate: Mon Mar 9 13:09:04 2026 +0100 temp --- core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md | 141 ++++++++++++++++ core/partitions/src/journal.rs | 182 +++++++++++++++++++-- 2 files changed, 312 insertions(+), 11 deletions(-) diff --git a/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md new file mode 100644 index 000000000..ec42f4d7d --- /dev/null +++ b/core/partitions/PARTITION_JOURNAL2_STORAGE_PLAN.md @@ -0,0 +1,141 @@ +# PartitionJournal2 Storage Proposal + +## Objective +Use `journal::Storage` as the actual backing store for serialized prepare entries (`Bytes`) and decode to `Message<PrepareHeader>` on read. + +## Current Problem +- `PartitionJournal2Impl` currently stores entries directly in: + - `UnsafeCell<Vec<Message<PrepareHeader>>>` +- `Noop` storage is unused for real data. +- The old `Buffer = Entry` idea is too rigid for this path. +- `Storage::read(&self, offset, buffer)` still requires a fallback buffer argument. + +## Design Direction + +### 1. Use serialized buffer (`Bytes`) in storage +For `PartitionJournal2`, enforce: + +```rust +S: Storage<Buffer = bytes::Bytes> +``` + +Journal entry remains: + +```rust +type Entry = Message<PrepareHeader>; +``` + +Conversion boundary: +- write path: `Message<PrepareHeader> -> Bytes` (serialize/store) +- read path: `Bytes -> Message<PrepareHeader>` via `Message::from_bytes` + +### 2. Replace `Noop` with in-memory prepare-message storage +Introduce a dedicated storage: + +```rust +pub struct InMemoryPrepareStorage { + entries: UnsafeCell<Vec<bytes::Bytes>>, +} +``` + +Behavior: +- `write(bytes)` appends serialized message bytes to `entries`. +- `read(offset, ...)` treats `offset` as `op_number` and returns that op entry. + +This keeps storage raw and simple, while typed decoding happens at journal boundary. + +### 2.1 `offset` semantics for this journal: `offset == op_number` +For `PartitionJournal2`, define: +- `Storage::read(offset, ...)` where `offset` is VSR `PrepareHeader.op` (op number), not byte offset. +- Journal append path stores entries in op order, so op lookup is O(1)-ish via index map (or direct vec index if contiguous). + +Implementation detail: +- Maintain `op_to_index: HashMap<u64, usize>` (or rely on contiguous `op` if guaranteed). +- On `append(entry)`, cache `op_to_index.insert(entry.header().op, vec_index)`. +- On `entry(header)`, call storage read using `header.op as usize` or map-resolved index. + +### 3. Make `PartitionJournal2Impl` storage-backed +Refactor `PartitionJournal2Impl` to own a storage instance: + +```rust +pub struct PartitionJournal2Impl<S: Storage<Buffer = bytes::Bytes>> { + storage: S, + // metadata/indexes only + headers: UnsafeCell<Vec<PrepareHeader>>, + message_offset_to_op: UnsafeCell<HashMap<u64, usize>>, + timestamp_to_op: UnsafeCell<HashMap<u64, usize>>, +} +``` + +Responsibilities split: +- `storage`: source of truth for serialized entry bytes +- `headers`: lightweight header cache to satisfy `header()` / `previous_header()` reference semantics +- maps: query acceleration for `get` + +### 4. Journal method behavior with storage +- `append(entry)`: + - decode send-messages info for maps + - serialize: `let bytes = entry.into_inner()` (or equivalent) + - `storage.write(bytes)` + - push `*entry.header()` into `headers` + - cache `op_number -> storage position` +- `entry(header)`: + - resolve by `op_number` (from `header.op`) + - fetch bytes from storage via read + - decode immediately: `Message::<PrepareHeader>::from_bytes(bytes)` +- `header(idx)` and `previous_header(header)`: + - use `headers` vector (no full entry decode needed) + +### 5. `get` path stays batch-conversion based +`get` still performs: +- collect candidate bytes from storage +- decode into `Message<PrepareHeader>` +- convert `Vec<Message<PrepareHeader>> -> IggyMessagesBatchSet` +- apply `MessageLookup` filter (`get_by_offset` / `get_by_timestamp`) + +The existing conversion helpers remain valid. + +## Storage Trait Consideration + +Current trait: + +```rust +fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output = Self::Buffer>; +``` + +For typed object lookups this is awkward. Recommended adjustment: + +```rust +fn read(&self, offset: usize) -> impl Future<Output = Option<Self::Buffer>>; +``` + +Why: +- avoids fake fallback buffer construction +- maps naturally to indexed storage +- still works for byte storage by returning `Option<Bytes>` + +If trait-wide change is too large right now, keep current signature temporarily and ignore/use the incoming `buffer` only as a fallback for out-of-range reads. + +For this phase, no trait change is required: just interpret the existing `offset` argument as `op_number` in `PartitionJournal2` storage. + +## Proposed File-Level Changes +- `core/partitions/src/journal.rs` + - add `InMemoryPrepareStorage` + - make `PartitionJournal2Impl` generic over storage (or concrete to `InMemoryPrepareStorage`) + - remove direct `inner.set: Vec<Message<PrepareHeader>>` as primary store + - keep lightweight header metadata cache + - keep/adjust current lookup maps +- `core/partitions/src/iggy_partition.rs` (only if/when wiring `PartitionJournal2` into partition log) + - replace `Noop` for this path with `InMemoryPrepareStorage` + +## Migration Sequence (Low Risk) +1. Add `InMemoryPrepareStorage` without removing existing fields. +2. Mirror writes to both old `inner.set` and storage. +3. Switch reads (`entry`, `get`) to storage-backed path. +4. Remove old `inner.set` once parity is confirmed. +5. Optionally evolve `Storage::read` signature in a separate PR. + +## Expected Outcome +- `Storage` is no longer a no-op for `PartitionJournal2`. +- Storage buffer is raw serialized data (`Bytes`), and decoding to `Message<PrepareHeader>` happens at read boundary. +- The in-memory backend stays simple (`UnsafeCell<Vec<Bytes>>`) and aligned with your proposed flow. diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index ab281e940..5f964b46b 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -15,9 +15,14 @@ // specific language governing permissions and limitations // under the License. -use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use bytes::BytesMut; +use iggy_common::{ + INDEX_SIZE, IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, PooledBuffer, + header::{Operation, PrepareHeader}, + message::Message, +}; use journal::{Journal, Storage}; -use std::cell::UnsafeCell; +use std::{cell::UnsafeCell, collections::HashMap}; // TODO: Fix that, we need to figure out how to store the `IggyMessagesBatchSet`. /// No-op storage backend for the in-memory partition journal. @@ -31,7 +36,7 @@ impl Storage for Noop { 0 } - async fn read(&self, _offset: usize, _buffer: ()) {} + async fn read(&self, _offset: usize, _buffer: ()) -> () { ()} } /// Lookup key for querying messages from the journal. @@ -49,14 +54,7 @@ impl std::ops::Deref for MessageLookup { } } -/// In-memory journal that accumulates message batches as an `IggyMessagesBatchSet`. -/// -/// This is a pure storage layer — it holds batches and supports lookups via -/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives -/// outside the journal in the `SegmentedLog`'s `JournalInfo`. -/// -/// Uses `UnsafeCell` for interior mutability, matching the single-threaded -/// per-shard execution model. +// [LEGACY] pub struct PartitionJournal { batch_set: UnsafeCell<IggyMessagesBatchSet>, } @@ -92,6 +90,168 @@ impl std::fmt::Debug for PartitionJournal { } } +pub trait PartitionJournal2<S>: Journal<S> +where + S: Storage, +{ + type Query; + + fn get(&self, query: &Self::Query) -> impl Future<Output = Option<IggyMessagesBatchSet>>; +} + +pub struct PartitionJournal2Impl { + message_offset_to_op: UnsafeCell<HashMap<u64, usize>>, + timestamp_to_op: UnsafeCell<HashMap<u64, usize>>, + inner: UnsafeCell<JournalInner>, +} + +struct JournalInner { + set: Vec<Message<PrepareHeader>>, +} + +impl Default for PartitionJournal2Impl { + fn default() -> Self { + Self { + message_offset_to_op: UnsafeCell::new(HashMap::new()), + timestamp_to_op: UnsafeCell::new(HashMap::new()), + inner: UnsafeCell::new(JournalInner { set: Vec::new() }), + } + } +} + +impl PartitionJournal2Impl { + fn decode_send_messages_batch(body: bytes::Bytes) -> Option<IggyMessagesBatchMut> { + // TODO: This is bad, + let mut body = body + .try_into_mut() + .unwrap_or_else(|body| BytesMut::from(body.as_ref())); + + if body.len() < 4 { + return None; + } + + let count_bytes = body.split_to(4); + let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?); + let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?; + + if body.len() < indexes_len { + return None; + } + + let indexes_bytes = body.split_to(indexes_len); + let indexes = IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0); + let messages = PooledBuffer::from(body); + + Some(IggyMessagesBatchMut::from_indexes_and_messages( + indexes, messages, + )) + } + + fn message_to_batch(message: &Message<PrepareHeader>) -> Option<IggyMessagesBatchMut> { + if message.header().operation != Operation::SendMessages { + return None; + } + + Self::decode_send_messages_batch(message.body_bytes()) + } + + fn messages_to_batch_set<'a>(messages: impl Iterator<Item = &'a Message<PrepareHeader>>) -> IggyMessagesBatchSet { + let mut batch_set = IggyMessagesBatchSet::empty(); + + for message in messages { + if let Some(batch) = Self::message_to_batch(message) { + batch_set.add_batch(batch); + } + } + + batch_set + } + + fn candidate_start_op(&self, query: &MessageLookup) -> usize { + match query { + MessageLookup::Offset { offset, .. } => { + let offsets = unsafe { &*self.message_offset_to_op.get() }; + offsets.get(offset).copied().unwrap_or_default() + } + MessageLookup::Timestamp { timestamp, .. } => { + let timestamps = unsafe { &*self.timestamp_to_op.get() }; + timestamps.get(timestamp).copied().unwrap_or_default() + } + } + } + + fn messages_from_op(&self, start_op: usize) -> impl Iterator<Item = &Message<PrepareHeader>> { + let inner = unsafe { &*self.inner.get() }; + inner.set.iter().skip(start_op) + } +} + +impl Journal<Noop> for PartitionJournal2Impl { + type Header = PrepareHeader; + type Entry = Message<Self::Header>; + type HeaderRef<'a> = &'a Self::Header; + + fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> { + // TODO: Fixes + let inner = unsafe { &*self.inner.get() }; + inner.set.get(idx).map(|msg| msg.header()) + } + + fn previous_header(&self, header: &Self::Header) -> Option<Self::HeaderRef<'_>> { + // TODO: Fixes + let prev_idx = header.op.saturating_sub(1) as usize; + let inner = unsafe { &*self.inner.get() }; + inner.set.get(prev_idx).map(|msg| msg.header()) + } + + async fn append(&self, entry: Self::Entry) { + let first_offset_and_timestamp = Self::message_to_batch(&entry) + .and_then(|batch| Some((batch.first_offset()?, batch.first_timestamp()?))); + + let inner = unsafe { &mut *self.inner.get() }; + let op = inner.set.len(); + inner.set.push(entry); + + if let Some((offset, timestamp)) = first_offset_and_timestamp { + let offsets = unsafe { &mut *self.message_offset_to_op.get() }; + offsets.insert(offset, op); + + let timestamps = unsafe { &mut *self.timestamp_to_op.get() }; + timestamps.insert(timestamp, op); + } + } + + async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> { + let op = header.op as usize; + let inner = unsafe { &*self.inner.get() }; + inner.set.get(op).cloned() + } +} + +impl PartitionJournal2<Noop> for PartitionJournal2Impl { + type Query = MessageLookup; + + async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> { + let query = *query; + let start_op = self.candidate_start_op(&query); + let messages = self.messages_from_op(start_op); + let batch_set = Self::messages_to_batch_set(messages); + + let result = match query { + MessageLookup::Offset { offset, count } => batch_set.get_by_offset(offset, count), + MessageLookup::Timestamp { timestamp, count } => { + batch_set.get_by_timestamp(timestamp, count) + } + }; + + if result.is_empty() { + None + } else { + Some(result) + } + } +} + impl Journal<Noop> for PartitionJournal { type Header = MessageLookup; type Entry = IggyMessagesBatchMut;
