This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch move_batch_to_common in repository https://gitbox.apache.org/repos/asf/iggy.git
commit e27920014a6db963713b3c1638041485f5a525cc Author: numinex <[email protected]> AuthorDate: Mon Feb 9 12:16:14 2026 +0100 v3 --- .../handlers/messages/poll_messages_handler.rs | 2 +- .../deduplication/message_deduplicator.rs | 127 --------------------- core/server/src/streaming/segments/types/mod.rs | 2 +- core/server/src/streaming/utils/random_id.rs | 28 ----- 4 files changed, 2 insertions(+), 157 deletions(-) diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index dfa696d65..d95e71fb5 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -24,7 +24,7 @@ use crate::shard::IggyShard; use crate::shard::system::messages::PollingArgs; use crate::streaming::session::Session; use iggy_common::SenderKind; -use iggy_common::{IggyError, IggyPollMetadata, PollMessages, PooledBuffer}; +use iggy_common::{IggyError, PollMessages, PooledBuffer}; use std::rc::Rc; use tracing::{debug, trace}; diff --git a/core/server/src/streaming/deduplication/message_deduplicator.rs b/core/server/src/streaming/deduplication/message_deduplicator.rs deleted file mode 100644 index d6c4abb6d..000000000 --- a/core/server/src/streaming/deduplication/message_deduplicator.rs +++ /dev/null @@ -1,127 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use iggy_common::IggyDuration; -use moka::future::{Cache, CacheBuilder}; - -#[derive(Debug)] -pub struct MessageDeduplicator { - ttl: Option<IggyDuration>, - max_entries: Option<u64>, - cache: Cache<u128, bool>, -} - -/// Create deep copy of the `MessageDeduplicator` instance. -/// Regular `Clone` cheap as it only creates thread-safe reference counted -/// pointers to the shared internal data structures. -impl Clone for MessageDeduplicator { - fn clone(&self) -> Self { - let builder = Cache::builder(); - let builder = Self::setup_cache_builder(builder, self.max_entries, self.ttl); - let cache = builder.build(); - - Self { - ttl: self.ttl, - max_entries: self.max_entries, - cache, - } - } -} - -impl MessageDeduplicator { - fn setup_cache_builder( - mut builder: CacheBuilder<u128, bool, Cache<u128, bool>>, - max_entries: Option<u64>, - ttl: Option<IggyDuration>, - ) -> CacheBuilder<u128, bool, Cache<u128, bool>> { - if let Some(max_entries) = max_entries { - builder = builder.max_capacity(max_entries); - } - if let Some(ttl) = ttl { - builder = builder.time_to_live(ttl.get_duration()); - } - builder - } - - /// Creates a new message deduplicator with the given max entries and time to live for each ID. - pub fn new(max_entries: Option<u64>, ttl: Option<IggyDuration>) -> Self { - let builder = Cache::builder(); - let builder = Self::setup_cache_builder(builder, max_entries, ttl); - let cache = builder.build(); - - Self { - ttl, - max_entries, - cache, - } - } - - /// Checks if the given ID exists. - pub fn exists(&self, id: u128) -> bool { - self.cache.contains_key(&id) - } - - /// Inserts the given ID. - pub async fn insert(&self, id: u128) { - self.cache.insert(id, true).await - } - - /// Tries to insert the given ID, returns false if it already exists. - pub async fn try_insert(&self, id: u128) -> bool { - if self.exists(id) { - false - } else { - self.insert(id).await; - true - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use compio::time::sleep; - - #[compio::test] - async fn message_deduplicator_should_insert_only_unique_identifiers() { - let max_entries = 1000; - let ttl = "1s".parse::<IggyDuration>().unwrap(); - let deduplicator = MessageDeduplicator::new(Some(max_entries), Some(ttl)); - for i in 0..max_entries { - let id = i as u128; - assert!(deduplicator.try_insert(id).await); - assert!(deduplicator.exists(id)); - assert!(!deduplicator.try_insert(id).await); - } - } - - #[compio::test] - async fn message_deduplicator_should_evict_identifiers_after_given_time_to_live() { - let max_entries = 3; - let ttl = "100ms".parse::<IggyDuration>().unwrap(); - let deduplicator = MessageDeduplicator::new(Some(max_entries), Some(ttl)); - for i in 0..max_entries { - let id = i as u128; - assert!(deduplicator.try_insert(id).await); - assert!(deduplicator.exists(id)); - sleep(2 * ttl.get_duration()).await; - assert!(!deduplicator.exists(id)); - assert!(deduplicator.try_insert(id).await); - } - } -} diff --git a/core/server/src/streaming/segments/types/mod.rs b/core/server/src/streaming/segments/types/mod.rs index 17113c1e9..27fe76e00 100644 --- a/core/server/src/streaming/segments/types/mod.rs +++ b/core/server/src/streaming/segments/types/mod.rs @@ -17,6 +17,6 @@ */ pub use iggy_common::{ - IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessageViewMutIterator, IggyMessagesBatchMut, + IggyMessageHeaderViewMut, IggyMessageViewMut, IggyMessagesBatchMut, IggyMessagesBatchSet, }; diff --git a/core/server/src/streaming/utils/random_id.rs b/core/server/src/streaming/utils/random_id.rs deleted file mode 100644 index 4dbfe5d49..000000000 --- a/core/server/src/streaming/utils/random_id.rs +++ /dev/null @@ -1,28 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use ulid::Ulid; -use uuid::Uuid; - -pub fn get_uuid() -> u128 { - Uuid::new_v4().to_u128_le() -} - -pub fn get_ulid() -> Ulid { - Ulid::new() -}
