This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 18588f149 refactor(journal): refactor partition journal utilize
storage trait (#2909)
18588f149 is described below
commit 18588f1493a86e7f1dc3b79eadc9edd0b6557a81
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Mar 16 22:17:47 2026 +0100
refactor(journal): refactor partition journal utilize storage trait (#2909)
Refactor the `PartitionJournal` to use the `Strorage` trait as backing
storage, rather than storing data inline.
---
core/journal/src/lib.rs | 5 +-
core/partitions/src/iggy_partition.rs | 51 ++++-
core/partitions/src/iggy_partitions.rs | 54 ++---
core/partitions/src/journal.rs | 394 +++++++++++++++++++++++++++------
core/partitions/src/lib.rs | 35 ++-
core/simulator/src/deps.rs | 6 +-
6 files changed, 430 insertions(+), 115 deletions(-)
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index f1b4081dd..55a6b14fb 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -39,7 +39,10 @@ pub trait Storage {
type Buffer;
fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
- fn read(&self, offset: usize, buffer: Self::Buffer) -> impl Future<Output
= Self::Buffer>;
+ // TODO: Get rid of the `len` usize, we need to do changes in `Simulator`
in order to support that.
+ // Maybe we should go back to passing in the `Buffer` again, but I am not
sure how to handle it in the `Partitions Journal`, since we use in-memory impl
+ // which extracts the buffer out of the `Vec<Message>` and we don't need
to allocate a new buffer.
+ fn read(&self, offset: usize, len: usize) -> impl Future<Output =
Self::Buffer>;
}
pub trait JournalHandle {
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 49c820230..65fb470ae 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use crate::journal::{Noop, PartitionJournal};
+use crate::journal::{PartitionJournal, PartitionJournalMemStorage};
use crate::log::SegmentedLog;
-use crate::{AppendResult, Partition};
+use crate::{AppendResult, Partition, decode_send_messages_batch};
use iggy_common::{
ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError,
IggyMessagesBatchMut,
IggyTimestamp, PartitionStats,
+ header::{Operation, PrepareHeader},
+ message::Message,
};
use journal::Journal as _;
use std::sync::Arc;
@@ -30,7 +32,7 @@ use tokio::sync::Mutex as TokioMutex;
// This struct aliases in terms of the code contained the `LocalPartition from
`core/server/src/streaming/partitions/local_partition.rs`.
#[derive(Debug)]
pub struct IggyPartition {
- pub log: SegmentedLog<PartitionJournal, Noop>,
+ pub log: SegmentedLog<PartitionJournal<PartitionJournalMemStorage>,
PartitionJournalMemStorage>,
/// Committed offset — advanced only after quorum ack.
pub offset: Arc<AtomicU64>,
/// Dirty offset — advanced on every prepare (before commit).
@@ -46,6 +48,36 @@ pub struct IggyPartition {
}
impl IggyPartition {
+ fn prepare_message_from_batch(
+ mut header: PrepareHeader,
+ batch: &IggyMessagesBatchMut,
+ ) -> Message<PrepareHeader> {
+ let indexes = batch.indexes();
+ let count = batch.count();
+ let body_len = 4 + indexes.len() + batch.len();
+ let total_size = std::mem::size_of::<PrepareHeader>() + body_len;
+ header.size = u32::try_from(total_size)
+ .expect("prepare_message_from_batch: batch size exceeds u32::MAX");
+
+ let message =
Message::<PrepareHeader>::new(total_size).transmute_header(|_old, new| {
+ *new = header;
+ });
+
+ let mut bytes = message
+ .into_inner()
+ .try_into_mut()
+ .expect("prepare_message_from_batch: expected unique bytes
buffer");
+ let header_size = std::mem::size_of::<PrepareHeader>();
+ bytes[header_size..header_size +
4].copy_from_slice(&count.to_le_bytes());
+ let mut position = header_size + 4;
+ bytes[position..position + indexes.len()].copy_from_slice(indexes);
+ position += indexes.len();
+ bytes[position..position + batch.len()].copy_from_slice(batch);
+
+ Message::<PrepareHeader>::from_bytes(bytes.freeze())
+ .expect("prepare_message_from_batch: invalid prepared message
bytes")
+ }
+
pub fn new(stats: Arc<PartitionStats>) -> Self {
Self {
log: SegmentedLog::default(),
@@ -65,8 +97,16 @@ impl IggyPartition {
impl Partition for IggyPartition {
async fn append_messages(
&mut self,
- mut batch: IggyMessagesBatchMut,
+ message: Message<PrepareHeader>,
) -> Result<AppendResult, IggyError> {
+ let header = *message.header();
+ if header.operation != Operation::SendMessages {
+ return Err(IggyError::CannotAppendMessage);
+ }
+
+ let mut batch = decode_send_messages_batch(message.body_bytes())
+ .ok_or(IggyError::CannotAppendMessage)?;
+
if batch.count() == 0 {
return Ok(AppendResult::new(0, 0, 0));
}
@@ -116,7 +156,8 @@ impl Partition for IggyPartition {
journal.info.end_timestamp = ts;
}
- journal.inner.append(batch).await;
+ let message = Self::prepare_message_from_batch(header, &batch);
+ journal.inner.append(message).await;
Ok(AppendResult::new(
dirty_offset,
diff --git a/core/partitions/src/iggy_partitions.rs
b/core/partitions/src/iggy_partitions.rs
index c3b1e0bc6..931e804a7 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -30,8 +30,7 @@ use consensus::{
};
use iggy_common::header::Command2;
use iggy_common::{
- INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut,
PartitionStats, PooledBuffer,
- Segment, SegmentStorage,
+ IggyByteSize, PartitionStats, Segment, SegmentStorage,
header::{
ConsensusHeader, GenericHeader, Operation, PrepareHeader,
PrepareOkHeader, RequestHeader,
},
@@ -355,13 +354,13 @@ where
}
async fn on_replicate(&self, message: <VsrConsensus<B> as
Consensus>::Message<PrepareHeader>) {
- let header = message.header();
+ let header = *message.header();
let namespace = IggyNamespace::from_raw(header.namespace);
let consensus = self
.consensus()
.expect("on_replicate: consensus not initialized");
- let current_op = match replicate_preflight(consensus, header) {
+ let current_op = match replicate_preflight(consensus, &header) {
Ok(current_op) => current_op,
Err(reason) => {
warn!(
@@ -372,7 +371,7 @@ where
}
};
- let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
+ let is_old_prepare = fence_old_prepare_by_commit(consensus, &header);
if is_old_prepare {
warn!("received old prepare, not replicating");
} else {
@@ -387,9 +386,9 @@ where
// TODO: Figure out the flow of the partition operations.
// In metadata layer we assume that when an `on_request` or
`on_replicate` is called, it's called from correct shard.
// I think we need to do the same here, which means that the code from
below is unfallable, the partition should always exist by now!
- self.apply_replicated_operation(&namespace, &message).await;
+ self.apply_replicated_operation(&namespace, message).await;
- self.send_prepare_ok(header).await;
+ self.send_prepare_ok(&header).await;
if consensus.is_follower() {
self.commit_journal(namespace);
@@ -539,37 +538,21 @@ where
.register_namespace(ns);
}
- // TODO: Move this elsewhere, also do not reallocate, we do reallocationg
now becauise we use PooledBuffer for the batch body
- // but `Bytes` for `Message` payload.
- fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
- assert!(body.len() >= 4, "prepare body too small for batch header");
- let count = u32::from_le_bytes(body[0..4].try_into().unwrap());
- let indexes_len = count as usize * INDEX_SIZE;
- let indexes_end = 4 + indexes_len;
- assert!(
- body.len() >= indexes_end,
- "prepare body too small for {count} indexes",
- );
-
- let indexes =
IggyIndexesMut::from_bytes(PooledBuffer::from(&body[4..indexes_end]), 0);
- let messages = PooledBuffer::from(&body[indexes_end..]);
- IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
- }
-
async fn apply_replicated_operation(
&self,
namespace: &IggyNamespace,
- message: &Message<PrepareHeader>,
+ message: Message<PrepareHeader>,
) {
let consensus = self
.consensus()
.expect("apply_replicated_operation: consensus not initialized");
- let header = message.header();
+ let header = *message.header();
+ // TODO: WE have to distinguish between an `message` recv by leader
and follower.
+ // In the follower path, we have to skip the `prepare_for_persistence`
path, just append to journal.
match header.operation {
Operation::SendMessages => {
- let body = message.body_bytes();
- self.append_send_messages_to_journal(namespace, body.as_ref())
+ self.append_send_messages_to_journal(namespace, message)
.await;
debug!(
replica = consensus.replica(),
@@ -598,12 +581,7 @@ where
}
}
- async fn append_send_messages_to_journal(&self, namespace: &IggyNamespace,
body: &[u8]) {
- let batch = Self::batch_from_body(body);
- self.append_messages_to_journal(namespace, batch).await;
- }
-
- /// Append a batch to a partition's journal with offset assignment.
+ /// Append a prepare message to a partition's journal with offset
assignment.
///
/// Updates `segment.current_position` (logical position for indexing) but
/// not `segment.end_offset` or `segment.end_timestamp` (committed state).
@@ -611,15 +589,15 @@ where
///
/// Uses `dirty_offset` for offset assignment so that multiple prepares
/// can be pipelined before any commit.
- async fn append_messages_to_journal(
+ async fn append_send_messages_to_journal(
&self,
namespace: &IggyNamespace,
- batch: IggyMessagesBatchMut,
+ message: Message<PrepareHeader>,
) {
let partition = self
.get_mut_by_ns(namespace)
- .expect("append_messages_to_journal: partition not found for
namespace");
- let _ = partition.append_messages(batch).await;
+ .expect("append_send_messages_to_journal: partition not found for
namespace");
+ let _ = partition.append_messages(message).await;
}
/// Replicate a prepare message to the next replica in the chain.
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index ab281e940..d3b779fee 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -15,29 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-use iggy_common::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::Bytes;
+use iggy_common::{
+ IggyMessagesBatchMut, IggyMessagesBatchSet,
+ header::{Operation, PrepareHeader},
+ message::Message,
+};
use journal::{Journal, Storage};
-use std::cell::UnsafeCell;
+use std::{
+ cell::UnsafeCell,
+ collections::{BTreeMap, HashMap},
+};
-// TODO: Fix that, we need to figure out how to store the
`IggyMessagesBatchSet`.
-/// No-op storage backend for the in-memory partition journal.
-#[derive(Debug)]
-pub struct Noop;
-
-impl Storage for Noop {
- type Buffer = ();
-
- async fn write(&self, _buf: ()) -> usize {
- 0
- }
-
- async fn read(&self, _offset: usize, _buffer: ()) {}
-}
+const ZERO_LEN: usize = 0;
/// Lookup key for querying messages from the journal.
#[derive(Debug, Clone, Copy)]
pub enum MessageLookup {
+ #[allow(dead_code)]
Offset { offset: u64, count: u32 },
+ #[allow(dead_code)]
Timestamp { timestamp: u64, count: u32 },
}
@@ -49,89 +46,354 @@ impl std::ops::Deref for MessageLookup {
}
}
-/// In-memory journal that accumulates message batches as an
`IggyMessagesBatchSet`.
-///
-/// This is a pure storage layer — it holds batches and supports lookups via
-/// `MessageLookup`. All tracking metadata (offsets, timestamps, counts) lives
-/// outside the journal in the `SegmentedLog`'s `JournalInfo`.
-///
-/// Uses `UnsafeCell` for interior mutability, matching the single-threaded
-/// per-shard execution model.
-pub struct PartitionJournal {
- batch_set: UnsafeCell<IggyMessagesBatchSet>,
+#[allow(dead_code)]
+pub trait QueryableJournal<S>: Journal<S>
+where
+ S: Storage,
+{
+ type Query;
+
+ fn get(&self, query: &Self::Query) -> impl Future<Output =
Option<IggyMessagesBatchSet>>;
}
-impl PartitionJournal {
- pub fn new() -> Self {
+#[derive(Debug, Default)]
+pub struct PartitionJournalMemStorage {
+ entries: UnsafeCell<Vec<Bytes>>,
+ /// Maps byte offset (as if disk-backed) to index in entries Vec
+ offset_to_index: UnsafeCell<HashMap<usize, usize>>,
+ /// Current write position (cumulative byte offset)
+ current_offset: UnsafeCell<usize>,
+}
+
+impl Storage for PartitionJournalMemStorage {
+ type Buffer = Bytes;
+
+ async fn write(&self, buf: Self::Buffer) -> usize {
+ let len = buf.len();
+ let entries = unsafe { &mut *self.entries.get() };
+ let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
+ let current_offset = unsafe { &mut *self.current_offset.get() };
+
+ let index = entries.len();
+ offset_to_index.insert(*current_offset, index);
+ entries.push(buf);
+
+ let write_offset = *current_offset;
+ *current_offset += len;
+
+ write_offset
+ }
+
+ async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+ let offset_to_index = unsafe { &*self.offset_to_index.get() };
+ let Some(&index) = offset_to_index.get(&offset) else {
+ return Bytes::new();
+ };
+
+ let entries = unsafe { &*self.entries.get() };
+ entries.get(index).cloned().unwrap_or_default()
+ }
+}
+
+pub struct PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ /// Maps op -> storage byte offset (for all entries)
+ op_to_storage_offset: UnsafeCell<BTreeMap<u64, usize>>,
+ /// Maps message offset -> op (for queryable entries)
+ offset_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ /// Maps timestamp -> op (for queryable entries)
+ timestamp_to_op: UnsafeCell<BTreeMap<u64, u64>>,
+ headers: UnsafeCell<Vec<PrepareHeader>>,
+ inner: UnsafeCell<JournalInner<S>>,
+}
+
+impl<S> Default for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes> + Default,
+{
+ fn default() -> Self {
Self {
- batch_set: UnsafeCell::new(IggyMessagesBatchSet::empty()),
+ op_to_storage_offset: UnsafeCell::new(BTreeMap::new()),
+ offset_to_op: UnsafeCell::new(BTreeMap::new()),
+ timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+ headers: UnsafeCell::new(Vec::new()),
+ inner: UnsafeCell::new(JournalInner {
+ storage: S::default(),
+ }),
}
}
+}
- /// Drain all accumulated batches, returning the batch set.
+impl<S> std::fmt::Debug for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PartitionJournal2Impl").finish()
+ }
+}
+
+struct JournalInner<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ storage: S,
+}
+
+impl PartitionJournalMemStorage {
+ fn drain(&self) -> Vec<Bytes> {
+ let entries = unsafe { &mut *self.entries.get() };
+ let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
+ let current_offset = unsafe { &mut *self.current_offset.get() };
+
+ offset_to_index.clear();
+ *current_offset = 0;
+
+ std::mem::take(entries)
+ }
+
+ fn is_empty(&self) -> bool {
+ let entries = unsafe { &*self.entries.get() };
+ entries.is_empty()
+ }
+}
+
+impl PartitionJournal<PartitionJournalMemStorage> {
+ /// Drain all accumulated batches, matching the legacy `PartitionJournal`
API.
pub fn commit(&self) -> IggyMessagesBatchSet {
- let batch_set = unsafe { &mut *self.batch_set.get() };
- std::mem::take(batch_set)
+ let entries = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.drain()
+ };
+
+ let mut messages = Vec::with_capacity(entries.len());
+ for bytes in entries {
+ if let Ok(message) = Message::from_bytes(bytes) {
+ messages.push(message);
+ }
+ }
+
+ let headers = unsafe { &mut *self.headers.get() };
+ headers.clear();
+ let op_to_storage_offset = unsafe { &mut
*self.op_to_storage_offset.get() };
+ op_to_storage_offset.clear();
+ let offset_to_op = unsafe { &mut *self.offset_to_op.get() };
+ offset_to_op.clear();
+ let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
+ timestamp_to_op.clear();
+
+ Self::messages_to_batch_set(&messages)
}
pub fn is_empty(&self) -> bool {
- let batch_set = unsafe { &*self.batch_set.get() };
- batch_set.is_empty()
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.is_empty()
}
}
-impl Default for PartitionJournal {
- fn default() -> Self {
- Self::new()
+impl<S> PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ fn message_to_batch(message: &Message<PrepareHeader>) ->
Option<IggyMessagesBatchMut> {
+ if message.header().operation != Operation::SendMessages {
+ return None;
+ }
+
+ crate::decode_send_messages_batch(message.body_bytes())
}
-}
-impl std::fmt::Debug for PartitionJournal {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("PartitionJournal").finish()
+ fn messages_to_batch_set(messages: &[Message<PrepareHeader>]) ->
IggyMessagesBatchSet {
+ let mut batch_set = IggyMessagesBatchSet::empty();
+
+ for message in messages {
+ if let Some(batch) = Self::message_to_batch(message) {
+ batch_set.add_batch(batch);
+ }
+ }
+
+ batch_set
+ }
+
+ #[allow(dead_code)]
+ fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
+ match query {
+ MessageLookup::Offset { offset, .. } => {
+ let offset_to_op = unsafe { &*self.offset_to_op.get() };
+ offset_to_op
+ .range(..=*offset)
+ .next_back()
+ .or_else(|| offset_to_op.range(*offset..).next())
+ .map(|(_, op)| *op)
+ }
+ MessageLookup::Timestamp { timestamp, .. } => {
+ let timestamp_to_op = unsafe { &*self.timestamp_to_op.get() };
+ timestamp_to_op
+ .range(..=*timestamp)
+ .next_back()
+ .or_else(|| timestamp_to_op.range(*timestamp..).next())
+ .map(|(_, op)| *op)
+ }
+ }
+ }
+
+ async fn message_by_op(&self, op: u64) -> Option<Message<PrepareHeader>> {
+ let storage_offset = {
+ let op_to_storage_offset = unsafe {
&*self.op_to_storage_offset.get() };
+ *op_to_storage_offset.get(&op)?
+ };
+
+ let bytes = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.read(storage_offset, ZERO_LEN).await
+ };
+
+ if bytes.is_empty() {
+ return None;
+ }
+
+ Some(
+ Message::from_bytes(bytes)
+ .expect("partition.journal.storage.read: invalid bytes for
message"),
+ )
+ }
+
+ #[allow(dead_code)]
+ async fn load_messages_from_storage(
+ &self,
+ start_op: u64,
+ count: u32,
+ ) -> Vec<Message<PrepareHeader>> {
+ if count == 0 {
+ return Vec::new();
+ }
+
+ // Get (op, storage_offset) pairs directly from the mapping
+ // BTreeMap is already sorted by op
+ let op_offsets: Vec<(u64, usize)> = {
+ let op_to_storage_offset = unsafe {
&*self.op_to_storage_offset.get() };
+ op_to_storage_offset
+ .range(start_op..)
+ .map(|(op, offset)| (*op, *offset))
+ .collect()
+ };
+
+ let mut messages = Vec::new();
+ let mut loaded_messages = 0u32;
+
+ for (_, storage_offset) in op_offsets {
+ if loaded_messages >= count {
+ break;
+ }
+
+ let bytes = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.read(storage_offset, ZERO_LEN).await
+ };
+
+ if bytes.is_empty() {
+ continue;
+ }
+
+ let message = Message::from_bytes(bytes)
+ .expect("partition.journal.storage.read: invalid bytes for
message");
+
+ if let Some(batch) = Self::message_to_batch(&message) {
+ loaded_messages =
loaded_messages.saturating_add(batch.count());
+ messages.push(message);
+ }
+ }
+
+ messages
}
}
-impl Journal<Noop> for PartitionJournal {
- type Header = MessageLookup;
- type Entry = IggyMessagesBatchMut;
- type HeaderRef<'a> = MessageLookup;
+impl<S> Journal<S> for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ type Header = PrepareHeader;
+ type Entry = Message<Self::Header>;
+ #[rustfmt::skip] // Scuffed formatter.
+ type HeaderRef<'a> = &'a Self::Header where S: 'a;
- fn header(&self, _idx: usize) -> Option<Self::HeaderRef<'_>> {
- unreachable!("fn header: header lookup not supported for partition
journal.");
+ fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+ let headers = unsafe { &mut *self.headers.get() };
+ headers.get(idx)
}
- fn previous_header(&self, _header: &Self::Header) ->
Option<Self::HeaderRef<'_>> {
- unreachable!("fn previous_header: header lookup not supported for
partition journal.");
+ fn previous_header(&self, header: &Self::Header) ->
Option<Self::HeaderRef<'_>> {
+ if header.op == 0 {
+ return None;
+ }
+
+ let prev_op = header.op - 1;
+ let headers = unsafe { &*self.headers.get() };
+ headers.iter().find(|candidate| candidate.op == prev_op)
}
async fn append(&self, entry: Self::Entry) {
- let batch_set = unsafe { &mut *self.batch_set.get() };
- batch_set.add_batch(entry);
+ let first_offset_and_timestamp = Self::message_to_batch(&entry)
+ .and_then(|batch| Some((batch.first_offset()?,
batch.first_timestamp()?)));
+
+ let header = *entry.header();
+ let op = header.op;
+
+ {
+ let headers = unsafe { &mut *self.headers.get() };
+ headers.push(header);
+ };
+
+ let bytes = entry.into_inner();
+ let storage_offset = {
+ let inner = unsafe { &*self.inner.get() };
+ inner.storage.write(bytes).await
+ };
+
+ {
+ let op_to_storage_offset = unsafe { &mut
*self.op_to_storage_offset.get() };
+ op_to_storage_offset.insert(op, storage_offset);
+ }
+
+ if let Some((offset, timestamp)) = first_offset_and_timestamp {
+ let offset_to_op = unsafe { &mut *self.offset_to_op.get() };
+ offset_to_op.insert(offset, op);
+
+ let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
+ timestamp_to_op.insert(timestamp, op);
+ }
}
async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
- // Entry lookups go through SegmentedLog which uses JournalInfo
- // to construct MessageLookup headers. The actual query is done
- // via get() below, not through the Journal trait.
- let _ = header;
- unreachable!("fn entry: use SegmentedLog::get() instead for partition
journal lookups.");
+ self.message_by_op(header.op).await
}
}
-impl PartitionJournal {
- /// Query messages by offset or timestamp with count.
- ///
- /// This is called by `SegmentedLog` using `MessageLookup` headers
- /// constructed from `JournalInfo`.
- pub fn get(&self, header: &MessageLookup) -> Option<IggyMessagesBatchSet> {
- let batch_set = unsafe { &*self.batch_set.get() };
- let result = match header {
- MessageLookup::Offset { offset, count } =>
batch_set.get_by_offset(*offset, *count),
+impl<S> QueryableJournal<S> for PartitionJournal<S>
+where
+ S: Storage<Buffer = Bytes>,
+{
+ type Query = MessageLookup;
+
+ async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
+ let query = *query;
+ let start_op = self.candidate_start_op(&query)?;
+ let count = match query {
+ MessageLookup::Offset { count, .. } | MessageLookup::Timestamp {
count, .. } => count,
+ };
+
+ let messages = self.load_messages_from_storage(start_op, count).await;
+
+ let batch_set = Self::messages_to_batch_set(&messages);
+ let result = match query {
+ MessageLookup::Offset { offset, count } =>
batch_set.get_by_offset(offset, count),
MessageLookup::Timestamp { timestamp, count } => {
- batch_set.get_by_timestamp(*timestamp, *count)
+ batch_set.get_by_timestamp(timestamp, count)
}
};
+
if result.is_empty() {
None
} else {
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index d997d72b7..1f18e32ed 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -23,7 +23,11 @@ mod journal;
mod log;
mod types;
-use iggy_common::{IggyError, IggyMessagesBatchMut, IggyMessagesBatchSet};
+use bytes::{Bytes, BytesMut};
+use iggy_common::{
+ INDEX_SIZE, IggyError, IggyIndexesMut, IggyMessagesBatchMut,
IggyMessagesBatchSet,
+ PooledBuffer, header::PrepareHeader, message::Message,
+};
pub use iggy_partition::IggyPartition;
pub use iggy_partitions::IggyPartitions;
pub use types::{
@@ -31,6 +35,33 @@ pub use types::{
SendMessagesResult,
};
+pub(crate) fn decode_send_messages_batch(body: Bytes) ->
Option<IggyMessagesBatchMut> {
+ // TODO: This very is bad, IGGY-114 Fixes this.
+ let mut body = body
+ .try_into_mut()
+ .unwrap_or_else(|body| BytesMut::from(body.as_ref()));
+
+ if body.len() < 4 {
+ return None;
+ }
+
+ let count_bytes = body.split_to(4);
+ let count = u32::from_le_bytes(count_bytes.as_ref().try_into().ok()?);
+ let indexes_len = (count as usize).checked_mul(INDEX_SIZE)?;
+
+ if body.len() < indexes_len {
+ return None;
+ }
+
+ let indexes_bytes = body.split_to(indexes_len);
+ let indexes =
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
+ let messages = PooledBuffer::from(body);
+
+ Some(IggyMessagesBatchMut::from_indexes_and_messages(
+ indexes, messages,
+ ))
+}
+
/// Partition-level data plane operations.
///
/// `send_messages` MUST only append to the partition journal (prepare phase),
@@ -38,7 +69,7 @@ pub use types::{
pub trait Partition {
fn append_messages(
&mut self,
- batch: IggyMessagesBatchMut,
+ message: Message<PrepareHeader>,
) -> impl Future<Output = Result<AppendResult, IggyError>>;
fn poll_messages(
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 12e78bb05..9f4d64b79 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -45,7 +45,8 @@ impl Storage for MemStorage {
len
}
- async fn read(&self, offset: usize, mut buffer: Self::Buffer) ->
Self::Buffer {
+ async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
+ let mut buffer = vec![0; len];
let data = self.data.borrow();
let end = offset + buffer.len();
if offset < data.len() && end <= data.len() {
@@ -106,8 +107,7 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
let header = headers.get(&header.op)?;
let offset = *offsets.get(&header.op)?;
- let buffer = vec![0; header.size as usize];
- let buffer = self.storage.read(offset, buffer).await;
+ let buffer = self.storage.read(offset, header.size as usize).await;
let message =
Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes
should be valid");
Some(message)