This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch move_batch_to_common in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3322547b40aec247c26fd56901d7a36f3ff11bc3 Author: numinex <[email protected]> AuthorDate: Mon Feb 9 12:13:01 2026 +0100 v2 --- Cargo.lock | 5 +- core/common/Cargo.toml | 4 + .../src/deduplication/message_deduplicator.rs | 126 +++++++++++++++++++++ .../streaming => common/src}/deduplication/mod.rs | 4 +- core/common/src/lib.rs | 3 + .../src/types/message}/indexes_mut.rs | 4 +- .../src/types/message}/message_header_view_mut.rs | 2 +- .../src/types/message}/message_view_mut.rs | 6 +- .../src/types/message}/messages_batch_mut.rs | 10 +- .../src/types/message}/messages_batch_set.rs | 12 +- core/common/src/types/message/mod.rs | 12 ++ .../src/types/message/poll_metadata.rs} | 19 +++- core/common/src/utils/mod.rs | 1 + .../utils/mod.rs => common/src/utils/random_id.rs} | 15 ++- core/server/Cargo.toml | 1 - .../handlers/messages/poll_messages_handler.rs | 17 +-- core/server/src/http/http_shard_wrapper.rs | 2 +- core/server/src/shard/system/messages.rs | 2 +- core/server/src/shard/transmission/frame.rs | 3 +- core/server/src/streaming/deduplication/mod.rs | 2 +- core/server/src/streaming/partitions/helpers.rs | 2 +- .../src/streaming/partitions/local_partition.rs | 2 +- core/server/src/streaming/partitions/ops.rs | 2 +- core/server/src/streaming/segments/indexes/mod.rs | 3 +- core/server/src/streaming/segments/types/mod.rs | 13 +-- core/server/src/streaming/utils/mod.rs | 2 +- 26 files changed, 205 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 13a4dd133..906117550 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4653,6 +4653,8 @@ dependencies = [ "err_trail", "human-repr", "humantime", + "lending-iterator", + "moka", "nix 0.31.1", "once_cell", "rcgen", @@ -4668,6 +4670,8 @@ dependencies = [ "tracing", "tungstenite", "twox-hash", + "ulid", + "uuid", ] [[package]] @@ -8284,7 +8288,6 @@ dependencies = [ "iggy_common", "jsonwebtoken", "left-right", - "lending-iterator", "metadata", "mimalloc", "mime_guess", diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index 89adb6a99..43d8133e4 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -48,6 +48,8 @@ derive_more = { workspace = true } err_trail = { workspace = true } human-repr = { workspace = true } humantime = { workspace = true } +lending-iterator = { workspace = true } +moka = { workspace = true } once_cell = { workspace = true } rcgen = { workspace = true } ring = { workspace = true } @@ -61,6 +63,8 @@ tokio = { workspace = true } tracing = { workspace = true } tungstenite = { workspace = true } twox-hash = { workspace = true } +ulid = { workspace = true } +uuid = { workspace = true } [target.'cfg(unix)'.dependencies] nix = { workspace = true } diff --git a/core/common/src/deduplication/message_deduplicator.rs b/core/common/src/deduplication/message_deduplicator.rs new file mode 100644 index 000000000..3a98ad925 --- /dev/null +++ b/core/common/src/deduplication/message_deduplicator.rs @@ -0,0 +1,126 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::IggyDuration; +use moka::future::{Cache, CacheBuilder}; + +#[derive(Debug)] +pub struct MessageDeduplicator { + ttl: Option<IggyDuration>, + max_entries: Option<u64>, + cache: Cache<u128, bool>, +} + +/// Create deep copy of the `MessageDeduplicator` instance. +/// Regular `Clone` cheap as it only creates thread-safe reference counted +/// pointers to the shared internal data structures. +impl Clone for MessageDeduplicator { + fn clone(&self) -> Self { + let builder = Cache::builder(); + let builder = Self::setup_cache_builder(builder, self.max_entries, self.ttl); + let cache = builder.build(); + + Self { + ttl: self.ttl, + max_entries: self.max_entries, + cache, + } + } +} + +impl MessageDeduplicator { + fn setup_cache_builder( + mut builder: CacheBuilder<u128, bool, Cache<u128, bool>>, + max_entries: Option<u64>, + ttl: Option<IggyDuration>, + ) -> CacheBuilder<u128, bool, Cache<u128, bool>> { + if let Some(max_entries) = max_entries { + builder = builder.max_capacity(max_entries); + } + if let Some(ttl) = ttl { + builder = builder.time_to_live(ttl.get_duration()); + } + builder + } + + /// Creates a new message deduplicator with the given max entries and time to live for each ID. + pub fn new(max_entries: Option<u64>, ttl: Option<IggyDuration>) -> Self { + let builder = Cache::builder(); + let builder = Self::setup_cache_builder(builder, max_entries, ttl); + let cache = builder.build(); + + Self { + ttl, + max_entries, + cache, + } + } + + /// Checks if the given ID exists. + pub fn exists(&self, id: u128) -> bool { + self.cache.contains_key(&id) + } + + /// Inserts the given ID. + pub async fn insert(&self, id: u128) { + self.cache.insert(id, true).await + } + + /// Tries to insert the given ID, returns false if it already exists. + pub async fn try_insert(&self, id: u128) -> bool { + if self.exists(id) { + false + } else { + self.insert(id).await; + true + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn message_deduplicator_should_insert_only_unique_identifiers() { + let max_entries = 1000; + let ttl = "1s".parse::<IggyDuration>().unwrap(); + let deduplicator = MessageDeduplicator::new(Some(max_entries), Some(ttl)); + for i in 0..max_entries { + let id = i as u128; + assert!(deduplicator.try_insert(id).await); + assert!(deduplicator.exists(id)); + assert!(!deduplicator.try_insert(id).await); + } + } + + #[tokio::test] + async fn message_deduplicator_should_evict_identifiers_after_given_time_to_live() { + let max_entries = 3; + let ttl = "100ms".parse::<IggyDuration>().unwrap(); + let deduplicator = MessageDeduplicator::new(Some(max_entries), Some(ttl)); + for i in 0..max_entries { + let id = i as u128; + assert!(deduplicator.try_insert(id).await); + assert!(deduplicator.exists(id)); + tokio::time::sleep(2 * ttl.get_duration()).await; + assert!(!deduplicator.exists(id)); + assert!(deduplicator.try_insert(id).await); + } + } +} diff --git a/core/server/src/streaming/deduplication/mod.rs b/core/common/src/deduplication/mod.rs similarity index 91% copy from core/server/src/streaming/deduplication/mod.rs copy to core/common/src/deduplication/mod.rs index b92e484bb..db946af75 100644 --- a/core/server/src/streaming/deduplication/mod.rs +++ b/core/common/src/deduplication/mod.rs @@ -16,4 +16,6 @@ * under the License. */ -pub mod message_deduplicator; +mod message_deduplicator; + +pub use message_deduplicator::MessageDeduplicator; diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index 2ae5bd845..ee61e7747 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -20,6 +20,7 @@ mod alloc; mod certificates; mod commands; +mod deduplication; mod error; mod sender; pub mod sharding; @@ -34,6 +35,7 @@ pub mod locking; pub use alloc::buffer::PooledBuffer; pub use alloc::memory_pool::{MEMORY_POOL, MemoryPool, MemoryPoolConfigOther, memory_pool}; pub use certificates::generate_self_signed_certificate; +pub use deduplication::MessageDeduplicator; pub use commands::consumer_groups::*; pub use commands::consumer_offsets::*; pub use commands::messages::*; @@ -103,6 +105,7 @@ pub use utils::duration::{IggyDuration, SEC_IN_MICRO}; pub use utils::expiry::IggyExpiry; pub use utils::hash::*; pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry; +pub use utils::random_id; pub use utils::text; pub use utils::timestamp::*; pub use utils::topic_size::MaxTopicSize; diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs b/core/common/src/types/message/indexes_mut.rs similarity index 99% rename from core/server/src/streaming/segments/indexes/indexes_mut.rs rename to core/common/src/types/message/indexes_mut.rs index 159cd265e..51673bd52 100644 --- a/core/server/src/streaming/segments/indexes/indexes_mut.rs +++ b/core/common/src/types/message/indexes_mut.rs @@ -16,8 +16,8 @@ * under the License. */ -use iggy_common::PooledBuffer; -use iggy_common::{INDEX_SIZE, IggyIndexView, IggyIndexes}; +use crate::PooledBuffer; +use crate::{INDEX_SIZE, IggyIndexView, IggyIndexes}; use std::fmt; use std::ops::{Deref, Index as StdIndex}; diff --git a/core/server/src/streaming/segments/types/message_header_view_mut.rs b/core/common/src/types/message/message_header_view_mut.rs similarity index 99% rename from core/server/src/streaming/segments/types/message_header_view_mut.rs rename to core/common/src/types/message/message_header_view_mut.rs index 9caf8ffd1..ffdc137f7 100644 --- a/core/server/src/streaming/segments/types/message_header_view_mut.rs +++ b/core/common/src/types/message/message_header_view_mut.rs @@ -15,7 +15,7 @@ * specific language governing permissions and limitations * under the License. */ -use iggy_common::{ +use crate::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_ID_OFFSET_RANGE, IGGY_MESSAGE_OFFSET_OFFSET_RANGE, IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE, diff --git a/core/server/src/streaming/segments/types/message_view_mut.rs b/core/common/src/types/message/message_view_mut.rs similarity index 95% rename from core/server/src/streaming/segments/types/message_view_mut.rs rename to core/common/src/types/message/message_view_mut.rs index b1081d6bd..a8b4e9088 100644 --- a/core/server/src/streaming/segments/types/message_view_mut.rs +++ b/core/common/src/types/message/message_view_mut.rs @@ -16,8 +16,10 @@ * under the License. */ -use super::IggyMessageHeaderViewMut; -use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeaderView, calculate_checksum}; +use crate::{ + IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeaderView, calculate_checksum, + types::message::message_header_view_mut::IggyMessageHeaderViewMut, +}; use lending_iterator::prelude::*; /// A mutable view of a message for in-place modifications diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs similarity index 99% rename from core/server/src/streaming/segments/types/messages_batch_mut.rs rename to core/common/src/types/message/messages_batch_mut.rs index 33e2c7e75..41449b18a 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -16,17 +16,15 @@ * under the License. */ +use super::indexes_mut::IggyIndexesMut; use super::message_view_mut::IggyMessageViewMutIterator; -use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; -use crate::streaming::segments::indexes::IggyIndexesMut; -use crate::streaming::utils::random_id; -use bytes::{BufMut, BytesMut}; -use iggy_common::PooledBuffer; -use iggy_common::{ +use crate::{MessageDeduplicator, PooledBuffer, random_id}; +use crate::{ BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; +use bytes::{BufMut, BytesMut}; use lending_iterator::prelude::*; use std::ops::{Deref, Index}; use std::sync::Arc; diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs b/core/common/src/types/message/messages_batch_set.rs similarity index 97% rename from core/server/src/streaming/segments/types/messages_batch_set.rs rename to core/common/src/types/message/messages_batch_set.rs index 6437f3630..8dd47043e 100644 --- a/core/server/src/streaming/segments/types/messages_batch_set.rs +++ b/core/common/src/types/message/messages_batch_set.rs @@ -16,18 +16,14 @@ * under the License. */ -use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; -use crate::streaming::segments::IggyIndexesMut; -use bytes::Bytes; -use iggy_common::{ - IggyByteSize, IggyMessage, IggyMessageView, IggyMessagesBatch, PolledMessages, PooledBuffer, - Sizeable, +use crate::{ + IggyByteSize, IggyIndexesMut, IggyMessage, IggyMessageView, IggyMessagesBatch, + IggyMessagesBatchMut, IggyPollMetadata, PolledMessages, PooledBuffer, Sizeable, }; +use bytes::Bytes; use std::ops::Index; use tracing::trace; -use super::IggyMessagesBatchMut; - /// A container for multiple IggyMessagesBatch objects #[derive(Debug, Default)] pub struct IggyMessagesBatchSet { diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index caa29640b..23fe79e9a 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -20,12 +20,18 @@ mod iggy_message; mod index; mod index_view; mod indexes; +mod indexes_mut; mod message_header; mod message_header_view; +mod message_header_view_mut; mod message_view; +mod message_view_mut; mod messages_batch; +mod messages_batch_mut; +mod messages_batch_set; pub mod partitioning; pub mod partitioning_kind; +mod poll_metadata; pub mod polled_messages; pub mod polling_kind; pub mod polling_strategy; @@ -40,6 +46,7 @@ pub use iggy_message::{IggyMessage, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE}; pub use index::IggyIndex; pub use index_view::IggyIndexView; pub use indexes::IggyIndexes; +pub use indexes_mut::IggyIndexesMut; pub use message_header::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_RANGE, IGGY_MESSAGE_HEADER_SIZE, IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_ID_OFFSET_RANGE, @@ -48,9 +55,14 @@ pub use message_header::{ IggyMessageHeader, }; pub use message_header_view::IggyMessageHeaderView; +pub use message_header_view_mut::IggyMessageHeaderViewMut; pub use message_view::{IggyMessageView, IggyMessageViewIterator}; +pub use message_view_mut::{IggyMessageViewMut, IggyMessageViewMutIterator}; pub use messages_batch::IggyMessagesBatch; +pub use messages_batch_mut::IggyMessagesBatchMut; +pub use messages_batch_set::IggyMessagesBatchSet; pub use partitioning::Partitioning; +pub use poll_metadata::IggyPollMetadata; pub use partitioning_kind::PartitioningKind; pub use polled_messages::PolledMessages; pub use polling_kind::PollingKind; diff --git a/core/server/src/streaming/segments/indexes/mod.rs b/core/common/src/types/message/poll_metadata.rs similarity index 73% copy from core/server/src/streaming/segments/indexes/mod.rs copy to core/common/src/types/message/poll_metadata.rs index 6644d3071..8f0f631b4 100644 --- a/core/server/src/streaming/segments/indexes/mod.rs +++ b/core/common/src/types/message/poll_metadata.rs @@ -16,10 +16,17 @@ * under the License. */ -mod index_reader; -mod index_writer; -mod indexes_mut; +#[derive(Debug, Clone, Copy)] +pub struct IggyPollMetadata { + pub partition_id: u32, + pub current_offset: u64, +} -pub use index_reader::IndexReader; -pub use index_writer::IndexWriter; -pub use indexes_mut::IggyIndexesMut; +impl IggyPollMetadata { + pub fn new(partition_id: u32, current_offset: u64) -> Self { + Self { + partition_id, + current_offset, + } + } +} diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs index 34e080b6c..00793cad1 100644 --- a/core/common/src/utils/mod.rs +++ b/core/common/src/utils/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod duration; pub(crate) mod expiry; pub(crate) mod hash; pub(crate) mod personal_access_token_expiry; +pub mod random_id; pub mod text; pub(crate) mod timestamp; pub(crate) mod topic_size; diff --git a/core/server/src/streaming/utils/mod.rs b/core/common/src/utils/random_id.rs similarity index 85% copy from core/server/src/streaming/utils/mod.rs copy to core/common/src/utils/random_id.rs index 00e9f65f3..4dbfe5d49 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/common/src/utils/random_id.rs @@ -16,8 +16,13 @@ * under the License. */ -pub mod address; -pub mod crypto; -pub mod file; -pub mod ptr; -pub mod random_id; +use ulid::Ulid; +use uuid::Uuid; + +pub fn get_uuid() -> u128 { + Uuid::new_v4().to_u128_le() +} + +pub fn get_ulid() -> Ulid { + Ulid::new() +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index e6365772d..f07b1823e 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -73,7 +73,6 @@ human-repr = { workspace = true } iggy_common = { workspace = true } jsonwebtoken = { workspace = true } left-right = { workspace = true } -lending-iterator = { workspace = true } metadata = { workspace = true } mimalloc = { workspace = true, optional = true } mime_guess = { workspace = true, optional = true } diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index e37bbe284..dfa696d65 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -24,25 +24,10 @@ use crate::shard::IggyShard; use crate::shard::system::messages::PollingArgs; use crate::streaming::session::Session; use iggy_common::SenderKind; -use iggy_common::{IggyError, PollMessages, PooledBuffer}; +use iggy_common::{IggyError, IggyPollMetadata, PollMessages, PooledBuffer}; use std::rc::Rc; use tracing::{debug, trace}; -#[derive(Debug)] -pub struct IggyPollMetadata { - pub partition_id: u32, - pub current_offset: u64, -} - -impl IggyPollMetadata { - pub fn new(partition_id: u32, current_offset: u64) -> Self { - Self { - partition_id, - current_offset, - } - } -} - impl ServerCommandHandler for PollMessages { fn code(&self) -> u32 { iggy_common::POLL_MESSAGES_CODE diff --git a/core/server/src/http/http_shard_wrapper.rs b/core/server/src/http/http_shard_wrapper.rs index b1f5bc754..043900138 100644 --- a/core/server/src/http/http_shard_wrapper.rs +++ b/core/server/src/http/http_shard_wrapper.rs @@ -23,7 +23,7 @@ use iggy_common::{ }; use send_wrapper::SendWrapper; -use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use iggy_common::IggyPollMetadata; use crate::shard::system::messages::PollingArgs; use crate::state::command::EntryCommand; use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet}; diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 008495743..f94a2c862 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -17,7 +17,7 @@ */ use super::COMPONENT; -use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use iggy_common::IggyPollMetadata; use crate::shard::IggyShard; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index 055ca4222..bdeb55501 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -16,10 +16,9 @@ * under the License. */ use async_channel::Sender; -use iggy_common::{IggyError, Stats}; +use iggy_common::{IggyError, IggyPollMetadata, Stats}; use crate::{ - binary::handlers::messages::poll_messages_handler::IggyPollMetadata, shard::transmission::message::ShardMessage, streaming::{segments::IggyMessagesBatchSet, users::user::User}, }; diff --git a/core/server/src/streaming/deduplication/mod.rs b/core/server/src/streaming/deduplication/mod.rs index b92e484bb..fee04f314 100644 --- a/core/server/src/streaming/deduplication/mod.rs +++ b/core/server/src/streaming/deduplication/mod.rs @@ -16,4 +16,4 @@ * under the License. */ -pub mod message_deduplicator; +pub use iggy_common::MessageDeduplicator; diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index a7a0bc29b..e03080cf2 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -17,7 +17,7 @@ use crate::{ configs::system::SystemConfig, - streaming::deduplication::message_deduplicator::MessageDeduplicator, + streaming::deduplication::MessageDeduplicator, }; pub fn create_message_deduplicator(config: &SystemConfig) -> Option<MessageDeduplicator> { diff --git a/core/server/src/streaming/partitions/local_partition.rs b/core/server/src/streaming/partitions/local_partition.rs index 066db698d..decbd799e 100644 --- a/core/server/src/streaming/partitions/local_partition.rs +++ b/core/server/src/streaming/partitions/local_partition.rs @@ -25,7 +25,7 @@ use super::{ journal::MemoryMessageJournal, log::SegmentedLog, }; use crate::streaming::{ - deduplication::message_deduplicator::MessageDeduplicator, stats::PartitionStats, + deduplication::MessageDeduplicator, stats::PartitionStats, }; use iggy_common::IggyTimestamp; use std::sync::{Arc, atomic::AtomicU64}; diff --git a/core/server/src/streaming/partitions/ops.rs b/core/server/src/streaming/partitions/ops.rs index a7163331c..5c52fe4c1 100644 --- a/core/server/src/streaming/partitions/ops.rs +++ b/core/server/src/streaming/partitions/ops.rs @@ -23,7 +23,7 @@ use super::journal::Journal; use super::local_partitions::LocalPartitions; -use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use iggy_common::IggyPollMetadata; use crate::shard::system::messages::PollingArgs; use crate::streaming::polling_consumer::PollingConsumer; use crate::streaming::segments::IggyMessagesBatchSet; diff --git a/core/server/src/streaming/segments/indexes/mod.rs b/core/server/src/streaming/segments/indexes/mod.rs index 6644d3071..b9054a752 100644 --- a/core/server/src/streaming/segments/indexes/mod.rs +++ b/core/server/src/streaming/segments/indexes/mod.rs @@ -18,8 +18,7 @@ mod index_reader; mod index_writer; -mod indexes_mut; +pub use iggy_common::IggyIndexesMut; pub use index_reader::IndexReader; pub use index_writer::IndexWriter; -pub use indexes_mut::IggyIndexesMut; diff --git a/core/server/src/streaming/segments/types/mod.rs b/core/server/src/streaming/segments/types/mod.rs index 9f5266eed..17113c1e9 100644 --- a/core/server/src/streaming/segments/types/mod.rs +++ b/core/server/src/streaming/segments/types/mod.rs @@ -16,12 +16,7 @@ * under the License. */ -mod message_header_view_mut; -mod message_view_mut; -mod messages_batch_mut; -mod messages_batch_set; - -pub use message_header_view_mut::IggyMessageHeaderViewMut; -pub use message_view_mut::IggyMessageViewMut; -pub use messages_batch_mut::IggyMessagesBatchMut; -pub use messages_batch_set::IggyMessagesBatchSet; +pub use iggy_common::{ + IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessageViewMutIterator, IggyMessagesBatchMut, + IggyMessagesBatchSet, +}; diff --git a/core/server/src/streaming/utils/mod.rs b/core/server/src/streaming/utils/mod.rs index 00e9f65f3..9e8cdea1e 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/server/src/streaming/utils/mod.rs @@ -20,4 +20,4 @@ pub mod address; pub mod crypto; pub mod file; pub mod ptr; -pub mod random_id; +pub use iggy_common::random_id;
