This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch move_batch_to_common
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e27920014a6db963713b3c1638041485f5a525cc
Author: numinex <[email protected]>
AuthorDate: Mon Feb 9 12:16:14 2026 +0100

    v3
---
 .../handlers/messages/poll_messages_handler.rs     |   2 +-
 .../deduplication/message_deduplicator.rs          | 127 ---------------------
 core/server/src/streaming/segments/types/mod.rs    |   2 +-
 core/server/src/streaming/utils/random_id.rs       |  28 -----
 4 files changed, 2 insertions(+), 157 deletions(-)

diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index dfa696d65..d95e71fb5 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -24,7 +24,7 @@ use crate::shard::IggyShard;
 use crate::shard::system::messages::PollingArgs;
 use crate::streaming::session::Session;
 use iggy_common::SenderKind;
-use iggy_common::{IggyError, IggyPollMetadata, PollMessages, PooledBuffer};
+use iggy_common::{IggyError, PollMessages, PooledBuffer};
 use std::rc::Rc;
 use tracing::{debug, trace};
 
diff --git a/core/server/src/streaming/deduplication/message_deduplicator.rs 
b/core/server/src/streaming/deduplication/message_deduplicator.rs
deleted file mode 100644
index d6c4abb6d..000000000
--- a/core/server/src/streaming/deduplication/message_deduplicator.rs
+++ /dev/null
@@ -1,127 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use iggy_common::IggyDuration;
-use moka::future::{Cache, CacheBuilder};
-
-#[derive(Debug)]
-pub struct MessageDeduplicator {
-    ttl: Option<IggyDuration>,
-    max_entries: Option<u64>,
-    cache: Cache<u128, bool>,
-}
-
-/// Create deep copy of the `MessageDeduplicator` instance.
-/// Regular `Clone` cheap as it only creates thread-safe reference counted
-/// pointers to the shared internal data structures.
-impl Clone for MessageDeduplicator {
-    fn clone(&self) -> Self {
-        let builder = Cache::builder();
-        let builder = Self::setup_cache_builder(builder, self.max_entries, 
self.ttl);
-        let cache = builder.build();
-
-        Self {
-            ttl: self.ttl,
-            max_entries: self.max_entries,
-            cache,
-        }
-    }
-}
-
-impl MessageDeduplicator {
-    fn setup_cache_builder(
-        mut builder: CacheBuilder<u128, bool, Cache<u128, bool>>,
-        max_entries: Option<u64>,
-        ttl: Option<IggyDuration>,
-    ) -> CacheBuilder<u128, bool, Cache<u128, bool>> {
-        if let Some(max_entries) = max_entries {
-            builder = builder.max_capacity(max_entries);
-        }
-        if let Some(ttl) = ttl {
-            builder = builder.time_to_live(ttl.get_duration());
-        }
-        builder
-    }
-
-    /// Creates a new message deduplicator with the given max entries and time 
to live for each ID.
-    pub fn new(max_entries: Option<u64>, ttl: Option<IggyDuration>) -> Self {
-        let builder = Cache::builder();
-        let builder = Self::setup_cache_builder(builder, max_entries, ttl);
-        let cache = builder.build();
-
-        Self {
-            ttl,
-            max_entries,
-            cache,
-        }
-    }
-
-    /// Checks if the given ID exists.
-    pub fn exists(&self, id: u128) -> bool {
-        self.cache.contains_key(&id)
-    }
-
-    /// Inserts the given ID.
-    pub async fn insert(&self, id: u128) {
-        self.cache.insert(id, true).await
-    }
-
-    /// Tries to insert the given ID, returns false if it already exists.
-    pub async fn try_insert(&self, id: u128) -> bool {
-        if self.exists(id) {
-            false
-        } else {
-            self.insert(id).await;
-            true
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use compio::time::sleep;
-
-    #[compio::test]
-    async fn message_deduplicator_should_insert_only_unique_identifiers() {
-        let max_entries = 1000;
-        let ttl = "1s".parse::<IggyDuration>().unwrap();
-        let deduplicator = MessageDeduplicator::new(Some(max_entries), 
Some(ttl));
-        for i in 0..max_entries {
-            let id = i as u128;
-            assert!(deduplicator.try_insert(id).await);
-            assert!(deduplicator.exists(id));
-            assert!(!deduplicator.try_insert(id).await);
-        }
-    }
-
-    #[compio::test]
-    async fn 
message_deduplicator_should_evict_identifiers_after_given_time_to_live() {
-        let max_entries = 3;
-        let ttl = "100ms".parse::<IggyDuration>().unwrap();
-        let deduplicator = MessageDeduplicator::new(Some(max_entries), 
Some(ttl));
-        for i in 0..max_entries {
-            let id = i as u128;
-            assert!(deduplicator.try_insert(id).await);
-            assert!(deduplicator.exists(id));
-            sleep(2 * ttl.get_duration()).await;
-            assert!(!deduplicator.exists(id));
-            assert!(deduplicator.try_insert(id).await);
-        }
-    }
-}
diff --git a/core/server/src/streaming/segments/types/mod.rs 
b/core/server/src/streaming/segments/types/mod.rs
index 17113c1e9..27fe76e00 100644
--- a/core/server/src/streaming/segments/types/mod.rs
+++ b/core/server/src/streaming/segments/types/mod.rs
@@ -17,6 +17,6 @@
  */
 
 pub use iggy_common::{
-    IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessageViewMutIterator, 
IggyMessagesBatchMut,
+    IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessagesBatchMut,
     IggyMessagesBatchSet,
 };
diff --git a/core/server/src/streaming/utils/random_id.rs 
b/core/server/src/streaming/utils/random_id.rs
deleted file mode 100644
index 4dbfe5d49..000000000
--- a/core/server/src/streaming/utils/random_id.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use ulid::Ulid;
-use uuid::Uuid;
-
-pub fn get_uuid() -> u128 {
-    Uuid::new_v4().to_u128_le()
-}
-
-pub fn get_ulid() -> Ulid {
-    Ulid::new()
-}

Reply via email to