This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partitions
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1bf09e1353e3a93e1071750299797ad0d884055e
Author: numinex <[email protected]>
AuthorDate: Fri Feb 13 13:25:56 2026 +0100

    temp v2
---
 Cargo.lock                                |   1 +
 core/common/src/types/consensus/header.rs |   1 +
 core/partitions/src/iggy_partitions.rs    | 145 ++++++++++++++++++++++++++++--
 core/partitions/src/types.rs              |  40 ++++++++-
 core/simulator/Cargo.toml                 |   1 +
 core/simulator/src/client.rs              |  74 ++++++++++++++-
 core/simulator/src/deps.rs                |   4 +-
 core/simulator/src/lib.rs                 |  31 +++----
 core/simulator/src/main.rs                |  32 ++++++-
 core/simulator/src/replica.rs             |  40 +++++++--
 rust_out                                  | Bin 0 -> 3893768 bytes
 11 files changed, 333 insertions(+), 36 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5a7fdcd97..502883e00 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8454,6 +8454,7 @@ dependencies = [
  "journal",
  "message_bus",
  "metadata",
+ "partitions",
 ]
 
 [[package]]
diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index f7f64969d..bab1812c6 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -156,6 +156,7 @@ impl ConsensusHeader for GenericHeader {
     }
 }
 
+
 #[repr(C)]
 #[derive(Debug, Clone, Copy)]
 pub struct RequestHeader {
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index bd2881e05..998e30dd9 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -22,8 +22,8 @@ use crate::IggyPartition;
 use crate::Partitions;
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
-    IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, INDEX_SIZE, 
PooledBuffer, Segment,
-    SegmentStorage,
+    IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, INDEX_SIZE, 
PartitionStats, PooledBuffer,
+    Segment, SegmentStorage,
     header::{Command2, GenericHeader, Operation, PrepareHeader, 
PrepareOkHeader, ReplyHeader},
     message::Message,
     sharding::{IggyNamespace, LocalIdx, ShardId},
@@ -33,6 +33,7 @@ use message_bus::MessageBus;
 use std::cell::UnsafeCell;
 use std::collections::HashMap;
 use std::sync::atomic::Ordering;
+use std::sync::Arc;
 use tracing::{debug, warn};
 
 /// Per-shard collection of all partitions.
@@ -228,6 +229,99 @@ impl<C> IggyPartitions<C> {
         &mut self.partitions_mut()[idx]
     }
 
+    /// Initialize a new partition with in-memory storage (for 
testing/simulation).
+    ///
+    /// This is a simplified version that doesn't create file-backed storage.
+    /// Use `init_partition()` for production use with real files.
+    ///
+    /// TODO: Make the log generic over its storage backend to support both
+    /// in-memory (for testing) and file-backed (for production) storage 
without
+    /// needing separate initialization methods.
+    pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
+        // Check if already initialized
+        if let Some(idx) = self.local_idx(&namespace) {
+            return idx;
+        }
+
+        // Create initial segment with default (in-memory) storage
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, self.config.segment_size);
+        let storage = SegmentStorage::default();
+
+        // Create partition with initialized log
+        let stats = Arc::new(PartitionStats::default());
+        let mut partition = IggyPartition::new(stats.clone());
+        partition.log.add_persisted_segment(segment, storage);
+        partition.offset.store(start_offset, Ordering::Relaxed);
+        partition.dirty_offset.store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+
+        // Insert and return local index
+        self.insert(namespace, partition)
+    }
+
+    /// Initialize a new partition with file-backed storage.
+    ///
+    /// This is the data plane initialization - creates the partition 
structure,
+    /// initial segment, and storage. Skips the control plane metadata 
broadcasting.
+    ///
+    /// Corresponds to the "INITIATE PARTITION" phase in the server's flow:
+    /// 1. Control plane: create PartitionMeta (SKIPPED in this method)
+    /// 2. Control plane: broadcast to shards (SKIPPED in this method)
+    /// 3. Data plane: INITIATE PARTITION (THIS METHOD)
+    ///
+    /// Idempotent - returns existing LocalIdx if partition already exists.
+    pub async fn init_partition(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
+        // Check if already initialized
+        if let Some(idx) = self.local_idx(&namespace) {
+            return idx;
+        }
+
+        // Create initial segment with storage
+        let start_offset = 0;
+        let segment = Segment::new(start_offset, self.config.segment_size);
+
+        // TODO: Waiting for issue to move server config to shared module.
+        // Once complete, paths will come from proper 
base_path/streams_path/etc config fields.
+        let messages_path = self.config.get_messages_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+        let index_path = self.config.get_index_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+
+        let storage = SegmentStorage::new(
+            &messages_path,
+            &index_path,
+            0, // messages_size (new segment)
+            0, // indexes_size (new segment)
+            self.config.enforce_fsync,
+            self.config.enforce_fsync,
+            false, // file_exists (new segment)
+        )
+        .await
+        .expect("Failed to create segment storage");
+
+        // Create partition with initialized log
+        let stats = Arc::new(PartitionStats::default());
+        let mut partition = IggyPartition::new(stats.clone());
+        partition.log.add_persisted_segment(segment, storage);
+        partition.offset.store(start_offset, Ordering::Relaxed);
+        partition.dirty_offset.store(start_offset, Ordering::Relaxed);
+        partition.should_increment_offset = false;
+        partition.stats.increment_segments_count(1);
+
+        // Insert and return local index
+        self.insert(namespace, partition)
+    }
+
 }
 
 impl<B> Partitions<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
@@ -480,8 +574,9 @@ where
 
     /// Append a batch to a partition's journal with offset assignment.
     ///
-    /// Only writes to the journal — segment metadata (timestamps, end_offset,
-    /// current_position) is updated later when the journal is flushed to disk.
+    /// Updates `segment.current_position` (logical position for indexing) but
+    /// not `segment.end_offset` or `segment.end_timestamp` (committed state).
+    /// Those are updated during commit.
     ///
     /// Uses `dirty_offset` for offset assignment so that multiple prepares
     /// can be pipelined before any commit.
@@ -529,6 +624,12 @@ where
                 .store(last_dirty_offset, Ordering::Relaxed);
         }
 
+        // Update segment.current_position for next prepare_for_persistence 
call.
+        // This is the logical position (includes unflushed journal data).
+        // segment.size is only updated after actual persist (in 
persist_frozen_batches_to_disk).
+        let segment_index = partition.log.segments().len() - 1;
+        partition.log.segments_mut()[segment_index].current_position += 
batch_messages_size;
+
         // Update journal tracking metadata.
         let journal = partition.log.journal_mut();
         journal.info.messages_count += batch_messages_count;
@@ -648,6 +749,7 @@ where
         }
 
         // 1. Update segment metadata from journal state.
+        // Note: segment.current_position is already updated in append_batch 
(prepare phase).
         let segment_index = partition.log.segments().len() - 1;
         let segment = &mut partition.log.segments_mut()[segment_index];
 
@@ -656,7 +758,6 @@ where
         }
         segment.end_timestamp = journal_info.end_timestamp;
         segment.end_offset = journal_info.current_offset;
-        segment.current_position += journal_info.size.as_bytes_u64() as u32;
 
         // 2. Update stats.
         partition
@@ -812,11 +913,39 @@ where
 
         let segment = Segment::new(start_offset, self.config.segment_size);
 
-        // TODO: Create actual storage with file paths from config.
-        // For now create an empty storage placeholder.
-        let storage = SegmentStorage::default();
+        // TODO: Waiting for issue to move server config to shared module.
+        // Once complete, paths will come from proper 
base_path/streams_path/etc config fields.
+        let messages_path = self.config.get_messages_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+        let index_path = self.config.get_index_path(
+            namespace.stream_id(),
+            namespace.topic_id(),
+            namespace.partition_id(),
+            start_offset,
+        );
+
+        let storage = SegmentStorage::new(
+            &messages_path,
+            &index_path,
+            0, // messages_size (new segment)
+            0, // indexes_size (new segment)
+            self.config.enforce_fsync,
+            self.config.enforce_fsync,
+            false, // file_exists (new segment)
+        )
+        .await
+        .expect("Failed to create segment storage");
 
         // Clear old segment's indexes.
+        // TODO: Waiting for issue to move server config to shared module.
+        // Once complete, conditionally clear based on cache_indexes config:
+        // if !matches!(self.config.cache_indexes, CacheIndexesConfig::All) {
+        //     partition.log.indexes_mut()[old_segment_index] = None;
+        // }
         partition.log.indexes_mut()[old_segment_index] = None;
 
         // Close writers for the sealed segment.
diff --git a/core/partitions/src/types.rs b/core/partitions/src/types.rs
index 3496c28f7..886cb715e 100644
--- a/core/partitions/src/types.rs
+++ b/core/partitions/src/types.rs
@@ -161,7 +161,7 @@ impl Default for PartitionOffsets {
 ///
 /// Mirrors the relevant fields from the server's `PartitionConfig` and
 /// `SegmentConfig` (`core/server/src/configs/system.rs`).
-#[derive(Debug, Clone, Copy)]
+#[derive(Debug, Clone)]
 pub struct PartitionsConfig {
     /// Flush journal to disk when it accumulates this many messages.
     pub messages_required_to_save: u32,
@@ -172,3 +172,41 @@ pub struct PartitionsConfig {
     /// Maximum size of a single segment before rotation.
     pub segment_size: IggyByteSize,
 }
+
+impl PartitionsConfig {
+    /// Constructs the file path for segment messages.
+    ///
+    /// TODO: This is a stub waiting for completion of issue to move server 
config
+    /// to shared module. Real implementation should use:
+    /// 
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.log`
+    pub fn get_messages_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+    ) -> String {
+        format!(
+            "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.log",
+            stream_id, topic_id, partition_id, start_offset
+        )
+    }
+
+    /// Constructs the file path for segment indexes.
+    ///
+    /// TODO: This is a stub waiting for completion of issue to move server 
config
+    /// to shared module. Real implementation should use:
+    /// 
`{base_path}/{streams_path}/{stream_id}/{topics_path}/{topic_id}/{partitions_path}/{partition_id}/{start_offset:0>20}.index`
+    pub fn get_index_path(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        start_offset: u64,
+    ) -> String {
+        format!(
+            "/tmp/iggy_stub/streams/{}/topics/{}/partitions/{}/{:0>20}.index",
+            stream_id, topic_id, partition_id, start_offset
+        )
+    }
+}
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index e75769201..b8e25866d 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -29,3 +29,4 @@ iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 metadata = { path = "../metadata" }
+partitions = { path = "../partitions" }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index b2f57c152..a8e5ccbb7 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -16,11 +16,12 @@
 // under the License.
 
 use iggy_common::{
-    BytesSerializable, Identifier,
+    BytesSerializable, INDEX_SIZE, Identifier,
     create_stream::CreateStream,
     delete_stream::DeleteStream,
     header::{Operation, RequestHeader},
     message::Message,
+    sharding::IggyNamespace,
 };
 use std::cell::Cell;
 
@@ -62,6 +63,77 @@ impl SimClient {
         self.build_request(Operation::DeleteStream, payload)
     }
 
+    pub fn send_messages(
+        &self,
+        namespace: IggyNamespace,
+        messages: Vec<&[u8]>,
+    ) -> Message<RequestHeader> {
+        // Build batch: count | indexes | messages
+        let count = messages.len() as u32;
+        let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE);
+        let mut messages_buf = Vec::new();
+
+        let mut current_position = 0u32;
+        for msg in &messages {
+            // Write index: position (u32) + length (u32)
+            indexes.extend_from_slice(&current_position.to_le_bytes());
+            indexes.extend_from_slice(&(msg.len() as u32).to_le_bytes());
+
+            // Append message
+            messages_buf.extend_from_slice(msg);
+            current_position += msg.len() as u32;
+        }
+
+        // Build payload: count | indexes | messages
+        let mut payload = Vec::with_capacity(4 + indexes.len() + 
messages_buf.len());
+        payload.extend_from_slice(&count.to_le_bytes());
+        payload.extend_from_slice(&indexes);
+        payload.extend_from_slice(&messages_buf);
+
+        self.build_request_with_namespace(Operation::SendMessages, 
bytes::Bytes::from(payload), namespace)
+    }
+
+    fn build_request_with_namespace(
+        &self,
+        operation: Operation,
+        payload: bytes::Bytes,
+        namespace: IggyNamespace,
+    ) -> Message<RequestHeader> {
+        use bytes::Bytes;
+
+        let header_size = std::mem::size_of::<RequestHeader>();
+        let total_size = header_size + payload.len();
+
+        let header = RequestHeader {
+            command: iggy_common::header::Command2::Request,
+            operation,
+            size: total_size as u32,
+            cluster: 0,
+            checksum: 0,
+            checksum_body: 0,
+            epoch: 0,
+            view: 0,
+            release: 0,
+            protocol: 0,
+            replica: 0,
+            reserved_frame: [0; 12],
+            client: self.client_id,
+            request_checksum: 0,
+            timestamp: 0,
+            request: self.next_request_number(),
+            namespace: namespace.inner(),
+            ..Default::default()
+        };
+
+        let header_bytes = bytemuck::bytes_of(&header);
+        let mut buffer = Vec::with_capacity(total_size);
+        buffer.extend_from_slice(header_bytes);
+        buffer.extend_from_slice(&payload);
+
+        Message::<RequestHeader>::from_bytes(Bytes::from(buffer))
+            .expect("failed to build request message")
+    }
+
     fn build_request(&self, operation: Operation, payload: bytes::Bytes) -> 
Message<RequestHeader> {
         use bytes::Bytes;
 
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index b7d829e8f..6a798b4aa 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -155,5 +155,5 @@ pub type SimMetadata = IggyMetadata<
     SimMuxStateMachine,
 >;
 
-#[derive(Debug, Default)]
-pub struct ReplicaPartitions {}
+/// Type alias for simulator partitions
+pub type ReplicaPartitions = 
partitions::IggyPartitions<VsrConsensus<SharedMemBus>>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index d1f64fc09..cacfe5adf 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -25,16 +25,23 @@ use iggy_common::header::{GenericHeader, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
 use message_bus::MessageBus;
 use metadata::Metadata;
+use partitions::Partitions;
 use replica::Replica;
 use std::sync::Arc;
 
-#[derive(Debug)]
 pub struct Simulator {
     pub replicas: Vec<Replica>,
     pub message_bus: Arc<MemBus>,
 }
 
 impl Simulator {
+    /// Initialize a partition on all replicas (in-memory for simulation)
+    pub fn init_partition(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
+        for replica in &mut self.replicas {
+            replica.partitions.init_partition_in_memory(namespace);
+        }
+    }
+
     pub fn new(replica_count: usize, clients: impl Iterator<Item = u128>) -> 
Self {
         let mut message_bus = MemBus::new();
         for client in clients {
@@ -119,7 +126,7 @@ impl Simulator {
         if operation < 200 {
             self.dispatch_to_metadata_on_replica(replica, message).await;
         } else {
-            self.dispatch_to_partition_on_replica(replica, message);
+            self.dispatch_to_partition_on_replica(replica, message).await;
         }
     }
 
@@ -137,28 +144,16 @@ impl Simulator {
         }
     }
 
-    fn dispatch_to_partition_on_replica(&self, replica: &Replica, message: 
MessageBag) {
+    async fn dispatch_to_partition_on_replica(&self, replica: &Replica, 
message: MessageBag) {
         match message {
             MessageBag::Request(request) => {
-                todo!(
-                    "dispatch request to partition replica {}: operation={:?}",
-                    replica.id,
-                    request.header().operation
-                );
+                replica.partitions.on_request(request).await;
             }
             MessageBag::Prepare(prepare) => {
-                todo!(
-                    "dispatch prepare to partition replica {}: operation={:?}",
-                    replica.id,
-                    prepare.header().operation
-                );
+                replica.partitions.on_replicate(prepare).await;
             }
             MessageBag::PrepareOk(prepare_ok) => {
-                todo!(
-                    "dispatch prepare_ok to partition replica {}: op={}",
-                    replica.id,
-                    prepare_ok.header().op
-                );
+                replica.partitions.on_ack(prepare_ok).await;
             }
         }
     }
diff --git a/core/simulator/src/main.rs b/core/simulator/src/main.rs
index 995c717a3..8d519cdda 100644
--- a/core/simulator/src/main.rs
+++ b/core/simulator/src/main.rs
@@ -17,6 +17,7 @@
 
 use iggy_common::header::ReplyHeader;
 use iggy_common::message::Message;
+use iggy_common::sharding::IggyNamespace;
 use message_bus::MessageBus;
 use simulator::{Simulator, client::SimClient};
 use std::collections::VecDeque;
@@ -41,9 +42,16 @@ impl Responses {
 fn main() {
     let client_id: u128 = 1;
     let leader: u8 = 0;
-    let sim = Simulator::new(3, std::iter::once(client_id));
+    let mut sim = Simulator::new(3, std::iter::once(client_id));
     let bus = sim.message_bus.clone();
 
+    // Hardcoded partition for testing: stream_id=1, topic_id=1, partition_id=0
+    let test_namespace = IggyNamespace::new(1, 1, 0);
+
+    // Initialize partition on all replicas
+    println!("[sim] Initializing test partition: {:?}", test_namespace);
+    sim.init_partition(test_namespace);
+
     // Responses queue
     let responses = Arc::new(Mutex::new(Responses::default()));
     let responses_clone = responses.clone();
@@ -54,6 +62,28 @@ fn main() {
         futures::executor::block_on(async {
             let client = SimClient::new(client_id);
 
+            // Send some test messages to the partition
+            println!("[client] Sending messages to partition");
+            let test_messages = vec![
+                b"Hello, partition!".as_slice(),
+                b"Message 2".as_slice(),
+                b"Message 3".as_slice(),
+            ];
+
+            let send_msg = client.send_messages(test_namespace, test_messages);
+            bus.send_to_replica(leader, send_msg.into_generic())
+                .await
+                .expect("failed to send messages");
+
+            loop {
+                if let Some(reply) = responses_clone.lock().unwrap().pop() {
+                    println!("[client] Got send_messages reply: {:?}", 
reply.header());
+                    break;
+                }
+                std::thread::sleep(std::time::Duration::from_millis(1));
+            }
+
+            // Send metadata operations
             let create_msg = client.create_stream("test-stream");
             bus.send_to_replica(leader, create_msg.into_generic())
                 .await
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 74c26b72f..a0d0fb54c 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -20,13 +20,15 @@ use crate::deps::{
     MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
 };
 use consensus::VsrConsensus;
+use iggy_common::IggyByteSize;
+use iggy_common::sharding::ShardId;
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
 use metadata::{IggyMetadata, variadic};
+use partitions::PartitionsConfig;
 use std::sync::Arc;
 
-#[derive(Debug)]
 pub struct Replica {
     pub id: u8,
     pub name: String,
@@ -43,24 +45,52 @@ impl Replica {
         let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
 
         let cluster_id: u128 = 1; // TODO: Make configurable
-        let consensus = VsrConsensus::new(
+        let metadata_consensus = VsrConsensus::new(
             cluster_id,
             id,
             replica_count,
             SharedMemBus(Arc::clone(&bus)),
         );
-        consensus.init();
+        metadata_consensus.init();
+
+        // Create separate consensus instance for partitions
+        let partitions_consensus = VsrConsensus::new(
+            cluster_id,
+            id,
+            replica_count,
+            SharedMemBus(Arc::clone(&bus)),
+        );
+        partitions_consensus.init();
+
+        // Configure partitions
+        let partitions_config = PartitionsConfig {
+            messages_required_to_save: 1000,
+            size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 
1024),
+            enforce_fsync: false, // Disable fsync for simulation
+            segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB 
segments
+        };
+
+        // Only replica 0 gets consensus (primary shard for now)
+        let partitions = if id == 0 {
+            ReplicaPartitions::new(
+                ShardId::new(id as u16),
+                partitions_config,
+                Some(partitions_consensus),
+            )
+        } else {
+            ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, 
None)
+        };
 
         Self {
             id,
             name,
             metadata: IggyMetadata {
-                consensus: Some(consensus),
+                consensus: Some(metadata_consensus),
                 journal: Some(SimJournal::<MemStorage>::default()),
                 snapshot: Some(SimSnapshot::default()),
                 mux_stm: mux,
             },
-            partitions: ReplicaPartitions::default(),
+            partitions,
             bus,
         }
     }
diff --git a/rust_out b/rust_out
new file mode 100755
index 000000000..421fd8e25
Binary files /dev/null and b/rust_out differ

Reply via email to