This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch move_batch_to_common
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3322547b40aec247c26fd56901d7a36f3ff11bc3
Author: numinex <[email protected]>
AuthorDate: Mon Feb 9 12:13:01 2026 +0100

    v2
---
 Cargo.lock                                         |   5 +-
 core/common/Cargo.toml                             |   4 +
 .../src/deduplication/message_deduplicator.rs      | 126 +++++++++++++++++++++
 .../streaming => common/src}/deduplication/mod.rs  |   4 +-
 core/common/src/lib.rs                             |   3 +
 .../src/types/message}/indexes_mut.rs              |   4 +-
 .../src/types/message}/message_header_view_mut.rs  |   2 +-
 .../src/types/message}/message_view_mut.rs         |   6 +-
 .../src/types/message}/messages_batch_mut.rs       |  10 +-
 .../src/types/message}/messages_batch_set.rs       |  12 +-
 core/common/src/types/message/mod.rs               |  12 ++
 .../src/types/message/poll_metadata.rs}            |  19 +++-
 core/common/src/utils/mod.rs                       |   1 +
 .../utils/mod.rs => common/src/utils/random_id.rs} |  15 ++-
 core/server/Cargo.toml                             |   1 -
 .../handlers/messages/poll_messages_handler.rs     |  17 +--
 core/server/src/http/http_shard_wrapper.rs         |   2 +-
 core/server/src/shard/system/messages.rs           |   2 +-
 core/server/src/shard/transmission/frame.rs        |   3 +-
 core/server/src/streaming/deduplication/mod.rs     |   2 +-
 core/server/src/streaming/partitions/helpers.rs    |   2 +-
 .../src/streaming/partitions/local_partition.rs    |   2 +-
 core/server/src/streaming/partitions/ops.rs        |   2 +-
 core/server/src/streaming/segments/indexes/mod.rs  |   3 +-
 core/server/src/streaming/segments/types/mod.rs    |  13 +--
 core/server/src/streaming/utils/mod.rs             |   2 +-
 26 files changed, 205 insertions(+), 69 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 13a4dd133..906117550 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4653,6 +4653,8 @@ dependencies = [
  "err_trail",
  "human-repr",
  "humantime",
+ "lending-iterator",
+ "moka",
  "nix 0.31.1",
  "once_cell",
  "rcgen",
@@ -4668,6 +4670,8 @@ dependencies = [
  "tracing",
  "tungstenite",
  "twox-hash",
+ "ulid",
+ "uuid",
 ]
 
 [[package]]
@@ -8284,7 +8288,6 @@ dependencies = [
  "iggy_common",
  "jsonwebtoken",
  "left-right",
- "lending-iterator",
  "metadata",
  "mimalloc",
  "mime_guess",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 89adb6a99..43d8133e4 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -48,6 +48,8 @@ derive_more = { workspace = true }
 err_trail = { workspace = true }
 human-repr = { workspace = true }
 humantime = { workspace = true }
+lending-iterator = { workspace = true }
+moka = { workspace = true }
 once_cell = { workspace = true }
 rcgen = { workspace = true }
 ring = { workspace = true }
@@ -61,6 +63,8 @@ tokio = { workspace = true }
 tracing = { workspace = true }
 tungstenite = { workspace = true }
 twox-hash = { workspace = true }
+ulid = { workspace = true }
+uuid = { workspace = true }
 
 [target.'cfg(unix)'.dependencies]
 nix = { workspace = true }
diff --git a/core/common/src/deduplication/message_deduplicator.rs 
b/core/common/src/deduplication/message_deduplicator.rs
new file mode 100644
index 000000000..3a98ad925
--- /dev/null
+++ b/core/common/src/deduplication/message_deduplicator.rs
@@ -0,0 +1,126 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::IggyDuration;
+use moka::future::{Cache, CacheBuilder};
+
+#[derive(Debug)]
+pub struct MessageDeduplicator {
+    ttl: Option<IggyDuration>,
+    max_entries: Option<u64>,
+    cache: Cache<u128, bool>,
+}
+
+/// Create deep copy of the `MessageDeduplicator` instance.
+/// Regular `Clone` cheap as it only creates thread-safe reference counted
+/// pointers to the shared internal data structures.
+impl Clone for MessageDeduplicator {
+    fn clone(&self) -> Self {
+        let builder = Cache::builder();
+        let builder = Self::setup_cache_builder(builder, self.max_entries, 
self.ttl);
+        let cache = builder.build();
+
+        Self {
+            ttl: self.ttl,
+            max_entries: self.max_entries,
+            cache,
+        }
+    }
+}
+
+impl MessageDeduplicator {
+    fn setup_cache_builder(
+        mut builder: CacheBuilder<u128, bool, Cache<u128, bool>>,
+        max_entries: Option<u64>,
+        ttl: Option<IggyDuration>,
+    ) -> CacheBuilder<u128, bool, Cache<u128, bool>> {
+        if let Some(max_entries) = max_entries {
+            builder = builder.max_capacity(max_entries);
+        }
+        if let Some(ttl) = ttl {
+            builder = builder.time_to_live(ttl.get_duration());
+        }
+        builder
+    }
+
+    /// Creates a new message deduplicator with the given max entries and time 
to live for each ID.
+    pub fn new(max_entries: Option<u64>, ttl: Option<IggyDuration>) -> Self {
+        let builder = Cache::builder();
+        let builder = Self::setup_cache_builder(builder, max_entries, ttl);
+        let cache = builder.build();
+
+        Self {
+            ttl,
+            max_entries,
+            cache,
+        }
+    }
+
+    /// Checks if the given ID exists.
+    pub fn exists(&self, id: u128) -> bool {
+        self.cache.contains_key(&id)
+    }
+
+    /// Inserts the given ID.
+    pub async fn insert(&self, id: u128) {
+        self.cache.insert(id, true).await
+    }
+
+    /// Tries to insert the given ID, returns false if it already exists.
+    pub async fn try_insert(&self, id: u128) -> bool {
+        if self.exists(id) {
+            false
+        } else {
+            self.insert(id).await;
+            true
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn message_deduplicator_should_insert_only_unique_identifiers() {
+        let max_entries = 1000;
+        let ttl = "1s".parse::<IggyDuration>().unwrap();
+        let deduplicator = MessageDeduplicator::new(Some(max_entries), 
Some(ttl));
+        for i in 0..max_entries {
+            let id = i as u128;
+            assert!(deduplicator.try_insert(id).await);
+            assert!(deduplicator.exists(id));
+            assert!(!deduplicator.try_insert(id).await);
+        }
+    }
+
+    #[tokio::test]
+    async fn 
message_deduplicator_should_evict_identifiers_after_given_time_to_live() {
+        let max_entries = 3;
+        let ttl = "100ms".parse::<IggyDuration>().unwrap();
+        let deduplicator = MessageDeduplicator::new(Some(max_entries), 
Some(ttl));
+        for i in 0..max_entries {
+            let id = i as u128;
+            assert!(deduplicator.try_insert(id).await);
+            assert!(deduplicator.exists(id));
+            tokio::time::sleep(2 * ttl.get_duration()).await;
+            assert!(!deduplicator.exists(id));
+            assert!(deduplicator.try_insert(id).await);
+        }
+    }
+}
diff --git a/core/server/src/streaming/deduplication/mod.rs 
b/core/common/src/deduplication/mod.rs
similarity index 91%
copy from core/server/src/streaming/deduplication/mod.rs
copy to core/common/src/deduplication/mod.rs
index b92e484bb..db946af75 100644
--- a/core/server/src/streaming/deduplication/mod.rs
+++ b/core/common/src/deduplication/mod.rs
@@ -16,4 +16,6 @@
  * under the License.
  */
 
-pub mod message_deduplicator;
+mod message_deduplicator;
+
+pub use message_deduplicator::MessageDeduplicator;
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 2ae5bd845..ee61e7747 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -20,6 +20,7 @@
 mod alloc;
 mod certificates;
 mod commands;
+mod deduplication;
 mod error;
 mod sender;
 pub mod sharding;
@@ -34,6 +35,7 @@ pub mod locking;
 pub use alloc::buffer::PooledBuffer;
 pub use alloc::memory_pool::{MEMORY_POOL, MemoryPool, MemoryPoolConfigOther, 
memory_pool};
 pub use certificates::generate_self_signed_certificate;
+pub use deduplication::MessageDeduplicator;
 pub use commands::consumer_groups::*;
 pub use commands::consumer_offsets::*;
 pub use commands::messages::*;
@@ -103,6 +105,7 @@ pub use utils::duration::{IggyDuration, SEC_IN_MICRO};
 pub use utils::expiry::IggyExpiry;
 pub use utils::hash::*;
 pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
+pub use utils::random_id;
 pub use utils::text;
 pub use utils::timestamp::*;
 pub use utils::topic_size::MaxTopicSize;
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs 
b/core/common/src/types/message/indexes_mut.rs
similarity index 99%
rename from core/server/src/streaming/segments/indexes/indexes_mut.rs
rename to core/common/src/types/message/indexes_mut.rs
index 159cd265e..51673bd52 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/common/src/types/message/indexes_mut.rs
@@ -16,8 +16,8 @@
  * under the License.
  */
 
-use iggy_common::PooledBuffer;
-use iggy_common::{INDEX_SIZE, IggyIndexView, IggyIndexes};
+use crate::PooledBuffer;
+use crate::{INDEX_SIZE, IggyIndexView, IggyIndexes};
 use std::fmt;
 use std::ops::{Deref, Index as StdIndex};
 
diff --git 
a/core/server/src/streaming/segments/types/message_header_view_mut.rs 
b/core/common/src/types/message/message_header_view_mut.rs
similarity index 99%
rename from core/server/src/streaming/segments/types/message_header_view_mut.rs
rename to core/common/src/types/message/message_header_view_mut.rs
index 9caf8ffd1..ffdc137f7 100644
--- a/core/server/src/streaming/segments/types/message_header_view_mut.rs
+++ b/core/common/src/types/message/message_header_view_mut.rs
@@ -15,7 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use iggy_common::{
+use crate::{
     IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, 
IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE,
     IGGY_MESSAGE_ID_OFFSET_RANGE, IGGY_MESSAGE_OFFSET_OFFSET_RANGE,
     IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, 
IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE,
diff --git a/core/server/src/streaming/segments/types/message_view_mut.rs 
b/core/common/src/types/message/message_view_mut.rs
similarity index 95%
rename from core/server/src/streaming/segments/types/message_view_mut.rs
rename to core/common/src/types/message/message_view_mut.rs
index b1081d6bd..a8b4e9088 100644
--- a/core/server/src/streaming/segments/types/message_view_mut.rs
+++ b/core/common/src/types/message/message_view_mut.rs
@@ -16,8 +16,10 @@
  * under the License.
  */
 
-use super::IggyMessageHeaderViewMut;
-use iggy_common::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeaderView, 
calculate_checksum};
+use crate::{
+    IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeaderView, calculate_checksum,
+    types::message::message_header_view_mut::IggyMessageHeaderViewMut,
+};
 use lending_iterator::prelude::*;
 
 /// A mutable view of a message for in-place modifications
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/common/src/types/message/messages_batch_mut.rs
similarity index 99%
rename from core/server/src/streaming/segments/types/messages_batch_mut.rs
rename to core/common/src/types/message/messages_batch_mut.rs
index 33e2c7e75..41449b18a 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -16,17 +16,15 @@
  * under the License.
  */
 
+use super::indexes_mut::IggyIndexesMut;
 use super::message_view_mut::IggyMessageViewMutIterator;
-use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator;
-use crate::streaming::segments::indexes::IggyIndexesMut;
-use crate::streaming::utils::random_id;
-use bytes::{BufMut, BytesMut};
-use iggy_common::PooledBuffer;
-use iggy_common::{
+use crate::{MessageDeduplicator, PooledBuffer, random_id};
+use crate::{
     BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, 
IggyError,
     IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, 
IggyMessagesBatch,
     IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, 
Validatable,
 };
+use bytes::{BufMut, BytesMut};
 use lending_iterator::prelude::*;
 use std::ops::{Deref, Index};
 use std::sync::Arc;
diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs 
b/core/common/src/types/message/messages_batch_set.rs
similarity index 97%
rename from core/server/src/streaming/segments/types/messages_batch_set.rs
rename to core/common/src/types/message/messages_batch_set.rs
index 6437f3630..8dd47043e 100644
--- a/core/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/core/common/src/types/message/messages_batch_set.rs
@@ -16,18 +16,14 @@
  * under the License.
  */
 
-use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
-use crate::streaming::segments::IggyIndexesMut;
-use bytes::Bytes;
-use iggy_common::{
-    IggyByteSize, IggyMessage, IggyMessageView, IggyMessagesBatch, 
PolledMessages, PooledBuffer,
-    Sizeable,
+use crate::{
+    IggyByteSize, IggyIndexesMut, IggyMessage, IggyMessageView, 
IggyMessagesBatch,
+    IggyMessagesBatchMut, IggyPollMetadata, PolledMessages, PooledBuffer, 
Sizeable,
 };
+use bytes::Bytes;
 use std::ops::Index;
 use tracing::trace;
 
-use super::IggyMessagesBatchMut;
-
 /// A container for multiple IggyMessagesBatch objects
 #[derive(Debug, Default)]
 pub struct IggyMessagesBatchSet {
diff --git a/core/common/src/types/message/mod.rs 
b/core/common/src/types/message/mod.rs
index caa29640b..23fe79e9a 100644
--- a/core/common/src/types/message/mod.rs
+++ b/core/common/src/types/message/mod.rs
@@ -20,12 +20,18 @@ mod iggy_message;
 mod index;
 mod index_view;
 mod indexes;
+mod indexes_mut;
 mod message_header;
 mod message_header_view;
+mod message_header_view_mut;
 mod message_view;
+mod message_view_mut;
 mod messages_batch;
+mod messages_batch_mut;
+mod messages_batch_set;
 pub mod partitioning;
 pub mod partitioning_kind;
+mod poll_metadata;
 pub mod polled_messages;
 pub mod polling_kind;
 pub mod polling_strategy;
@@ -40,6 +46,7 @@ pub use iggy_message::{IggyMessage, MAX_PAYLOAD_SIZE, 
MAX_USER_HEADERS_SIZE};
 pub use index::IggyIndex;
 pub use index_view::IggyIndexView;
 pub use indexes::IggyIndexes;
+pub use indexes_mut::IggyIndexesMut;
 pub use message_header::{
     IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_RANGE, 
IGGY_MESSAGE_HEADER_SIZE,
     IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_ID_OFFSET_RANGE,
@@ -48,9 +55,14 @@ pub use message_header::{
     IggyMessageHeader,
 };
 pub use message_header_view::IggyMessageHeaderView;
+pub use message_header_view_mut::IggyMessageHeaderViewMut;
 pub use message_view::{IggyMessageView, IggyMessageViewIterator};
+pub use message_view_mut::{IggyMessageViewMut, IggyMessageViewMutIterator};
 pub use messages_batch::IggyMessagesBatch;
+pub use messages_batch_mut::IggyMessagesBatchMut;
+pub use messages_batch_set::IggyMessagesBatchSet;
 pub use partitioning::Partitioning;
+pub use poll_metadata::IggyPollMetadata;
 pub use partitioning_kind::PartitioningKind;
 pub use polled_messages::PolledMessages;
 pub use polling_kind::PollingKind;
diff --git a/core/server/src/streaming/segments/indexes/mod.rs 
b/core/common/src/types/message/poll_metadata.rs
similarity index 73%
copy from core/server/src/streaming/segments/indexes/mod.rs
copy to core/common/src/types/message/poll_metadata.rs
index 6644d3071..8f0f631b4 100644
--- a/core/server/src/streaming/segments/indexes/mod.rs
+++ b/core/common/src/types/message/poll_metadata.rs
@@ -16,10 +16,17 @@
  * under the License.
  */
 
-mod index_reader;
-mod index_writer;
-mod indexes_mut;
+#[derive(Debug, Clone, Copy)]
+pub struct IggyPollMetadata {
+    pub partition_id: u32,
+    pub current_offset: u64,
+}
 
-pub use index_reader::IndexReader;
-pub use index_writer::IndexWriter;
-pub use indexes_mut::IggyIndexesMut;
+impl IggyPollMetadata {
+    pub fn new(partition_id: u32, current_offset: u64) -> Self {
+        Self {
+            partition_id,
+            current_offset,
+        }
+    }
+}
diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs
index 34e080b6c..00793cad1 100644
--- a/core/common/src/utils/mod.rs
+++ b/core/common/src/utils/mod.rs
@@ -24,6 +24,7 @@ pub(crate) mod duration;
 pub(crate) mod expiry;
 pub(crate) mod hash;
 pub(crate) mod personal_access_token_expiry;
+pub mod random_id;
 pub mod text;
 pub(crate) mod timestamp;
 pub(crate) mod topic_size;
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/common/src/utils/random_id.rs
similarity index 85%
copy from core/server/src/streaming/utils/mod.rs
copy to core/common/src/utils/random_id.rs
index 00e9f65f3..4dbfe5d49 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/common/src/utils/random_id.rs
@@ -16,8 +16,13 @@
  * under the License.
  */
 
-pub mod address;
-pub mod crypto;
-pub mod file;
-pub mod ptr;
-pub mod random_id;
+use ulid::Ulid;
+use uuid::Uuid;
+
+pub fn get_uuid() -> u128 {
+    Uuid::new_v4().to_u128_le()
+}
+
+pub fn get_ulid() -> Ulid {
+    Ulid::new()
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index e6365772d..f07b1823e 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -73,7 +73,6 @@ human-repr = { workspace = true }
 iggy_common = { workspace = true }
 jsonwebtoken = { workspace = true }
 left-right = { workspace = true }
-lending-iterator = { workspace = true }
 metadata = { workspace = true }
 mimalloc = { workspace = true, optional = true }
 mime_guess = { workspace = true, optional = true }
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index e37bbe284..dfa696d65 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -24,25 +24,10 @@ use crate::shard::IggyShard;
 use crate::shard::system::messages::PollingArgs;
 use crate::streaming::session::Session;
 use iggy_common::SenderKind;
-use iggy_common::{IggyError, PollMessages, PooledBuffer};
+use iggy_common::{IggyError, IggyPollMetadata, PollMessages, PooledBuffer};
 use std::rc::Rc;
 use tracing::{debug, trace};
 
-#[derive(Debug)]
-pub struct IggyPollMetadata {
-    pub partition_id: u32,
-    pub current_offset: u64,
-}
-
-impl IggyPollMetadata {
-    pub fn new(partition_id: u32, current_offset: u64) -> Self {
-        Self {
-            partition_id,
-            current_offset,
-        }
-    }
-}
-
 impl ServerCommandHandler for PollMessages {
     fn code(&self) -> u32 {
         iggy_common::POLL_MESSAGES_CODE
diff --git a/core/server/src/http/http_shard_wrapper.rs 
b/core/server/src/http/http_shard_wrapper.rs
index b1f5bc754..043900138 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -23,7 +23,7 @@ use iggy_common::{
 };
 use send_wrapper::SendWrapper;
 
-use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use iggy_common::IggyPollMetadata;
 use crate::shard::system::messages::PollingArgs;
 use crate::state::command::EntryCommand;
 use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 008495743..f94a2c862 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -17,7 +17,7 @@
  */
 
 use super::COMPONENT;
-use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use iggy_common::IggyPollMetadata;
 use crate::shard::IggyShard;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index 055ca4222..bdeb55501 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -16,10 +16,9 @@
  * under the License.
  */
 use async_channel::Sender;
-use iggy_common::{IggyError, Stats};
+use iggy_common::{IggyError, IggyPollMetadata, Stats};
 
 use crate::{
-    binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     shard::transmission::message::ShardMessage,
     streaming::{segments::IggyMessagesBatchSet, users::user::User},
 };
diff --git a/core/server/src/streaming/deduplication/mod.rs 
b/core/server/src/streaming/deduplication/mod.rs
index b92e484bb..fee04f314 100644
--- a/core/server/src/streaming/deduplication/mod.rs
+++ b/core/server/src/streaming/deduplication/mod.rs
@@ -16,4 +16,4 @@
  * under the License.
  */
 
-pub mod message_deduplicator;
+pub use iggy_common::MessageDeduplicator;
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index a7a0bc29b..e03080cf2 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -17,7 +17,7 @@
 
 use crate::{
     configs::system::SystemConfig,
-    streaming::deduplication::message_deduplicator::MessageDeduplicator,
+    streaming::deduplication::MessageDeduplicator,
 };
 
 pub fn create_message_deduplicator(config: &SystemConfig) -> 
Option<MessageDeduplicator> {
diff --git a/core/server/src/streaming/partitions/local_partition.rs 
b/core/server/src/streaming/partitions/local_partition.rs
index 066db698d..decbd799e 100644
--- a/core/server/src/streaming/partitions/local_partition.rs
+++ b/core/server/src/streaming/partitions/local_partition.rs
@@ -25,7 +25,7 @@ use super::{
     journal::MemoryMessageJournal, log::SegmentedLog,
 };
 use crate::streaming::{
-    deduplication::message_deduplicator::MessageDeduplicator, 
stats::PartitionStats,
+    deduplication::MessageDeduplicator, stats::PartitionStats,
 };
 use iggy_common::IggyTimestamp;
 use std::sync::{Arc, atomic::AtomicU64};
diff --git a/core/server/src/streaming/partitions/ops.rs 
b/core/server/src/streaming/partitions/ops.rs
index a7163331c..5c52fe4c1 100644
--- a/core/server/src/streaming/partitions/ops.rs
+++ b/core/server/src/streaming/partitions/ops.rs
@@ -23,7 +23,7 @@
 
 use super::journal::Journal;
 use super::local_partitions::LocalPartitions;
-use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use iggy_common::IggyPollMetadata;
 use crate::shard::system::messages::PollingArgs;
 use crate::streaming::polling_consumer::PollingConsumer;
 use crate::streaming::segments::IggyMessagesBatchSet;
diff --git a/core/server/src/streaming/segments/indexes/mod.rs 
b/core/server/src/streaming/segments/indexes/mod.rs
index 6644d3071..b9054a752 100644
--- a/core/server/src/streaming/segments/indexes/mod.rs
+++ b/core/server/src/streaming/segments/indexes/mod.rs
@@ -18,8 +18,7 @@
 
 mod index_reader;
 mod index_writer;
-mod indexes_mut;
 
+pub use iggy_common::IggyIndexesMut;
 pub use index_reader::IndexReader;
 pub use index_writer::IndexWriter;
-pub use indexes_mut::IggyIndexesMut;
diff --git a/core/server/src/streaming/segments/types/mod.rs 
b/core/server/src/streaming/segments/types/mod.rs
index 9f5266eed..17113c1e9 100644
--- a/core/server/src/streaming/segments/types/mod.rs
+++ b/core/server/src/streaming/segments/types/mod.rs
@@ -16,12 +16,7 @@
  * under the License.
  */
 
-mod message_header_view_mut;
-mod message_view_mut;
-mod messages_batch_mut;
-mod messages_batch_set;
-
-pub use message_header_view_mut::IggyMessageHeaderViewMut;
-pub use message_view_mut::IggyMessageViewMut;
-pub use messages_batch_mut::IggyMessagesBatchMut;
-pub use messages_batch_set::IggyMessagesBatchSet;
+pub use iggy_common::{
+    IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessageViewMutIterator, 
IggyMessagesBatchMut,
+    IggyMessagesBatchSet,
+};
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/server/src/streaming/utils/mod.rs
index 00e9f65f3..9e8cdea1e 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -20,4 +20,4 @@ pub mod address;
 pub mod crypto;
 pub mod file;
 pub mod ptr;
-pub mod random_id;
+pub use iggy_common::random_id;

Reply via email to