This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch global-metadata-leftright
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/global-metadata-leftright by
this push:
new caed7ffe8 remove unused functions
caed7ffe8 is described below
commit caed7ffe86cc9ed085498e9ee684c73b5d8341e8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 23 17:46:16 2026 +0100
remove unused functions
---
core/server/src/configs/system.rs | 8 ----
core/server/src/shard/communication.rs | 17 --------
core/server/src/shard/system/consumer_groups.rs | 45 ----------------------
core/server/src/shard/system/consumer_offsets.rs | 13 -------
core/server/src/shard/system/segments.rs | 14 +------
core/server/src/shard/system/topics.rs | 32 ---------------
core/server/src/shard/system/utils.rs | 5 ---
.../src/streaming/segments/indexes/indexes_mut.rs | 5 ---
.../streaming/segments/messages/messages_reader.rs | 38 ------------------
core/server/src/streaming/segments/segment.rs | 4 --
.../segments/types/message_header_view_mut.rs | 15 --------
11 files changed, 1 insertion(+), 195 deletions(-)
diff --git a/core/server/src/configs/system.rs
b/core/server/src/configs/system.rs
index e3e39f317..cc45ba59d 100644
--- a/core/server/src/configs/system.rs
+++ b/core/server/src/configs/system.rs
@@ -176,14 +176,6 @@ impl SystemConfig {
format!("{}/{}", self.get_system_path(), self.backup.path)
}
- pub fn get_compatibility_backup_path(&self) -> String {
- format!(
- "{}/{}",
- self.get_backup_path(),
- self.backup.compatibility.path
- )
- }
-
pub fn get_runtime_path(&self) -> String {
format!("{}/{}", self.get_system_path(), self.runtime.path)
}
diff --git a/core/server/src/shard/communication.rs
b/core/server/src/shard/communication.rs
index 497e61013..10a7fe59a 100644
--- a/core/server/src/shard/communication.rs
+++ b/core/server/src/shard/communication.rs
@@ -145,10 +145,6 @@ impl IggyShard {
})
}
- pub fn find_shard_table_record(&self, namespace: &IggyNamespace) ->
Option<PartitionLocation> {
- self.shards_table.get(namespace).map(|entry| *entry)
- }
-
pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) ->
PartitionLocation {
self.shards_table
.remove(namespace)
@@ -156,19 +152,6 @@ impl IggyShard {
.expect("remove_shard_table_record: namespace not found")
}
- pub fn remove_shard_table_records(
- &self,
- namespaces: &[IggyNamespace],
- ) -> Vec<(IggyNamespace, PartitionLocation)> {
- namespaces
- .iter()
- .map(|ns| {
- let (ns, location) = self.shards_table.remove(ns).unwrap();
- (ns, location)
- })
- .collect()
- }
-
pub fn insert_shard_table_record(&self, ns: IggyNamespace, location:
PartitionLocation) {
self.shards_table.insert(ns, location);
}
diff --git a/core/server/src/shard/system/consumer_groups.rs
b/core/server/src/shard/system/consumer_groups.rs
index ebb0d9b20..967eb541a 100644
--- a/core/server/src/shard/system/consumer_groups.rs
+++ b/core/server/src/shard/system/consumer_groups.rs
@@ -19,8 +19,6 @@
use super::COMPONENT;
use crate::metadata::ConsumerGroupMeta;
use crate::shard::IggyShard;
-use bytes::BufMut;
-use bytes::BytesMut;
use err_trail::ErrContext;
use iggy_common::Identifier;
use iggy_common::IggyError;
@@ -71,17 +69,6 @@ impl IggyShard {
Ok(cg)
}
- pub fn delete_consumer_group_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- group_id: &Identifier,
- ) -> Result<ConsumerGroupMeta, IggyError> {
- let (stream, topic, group) =
- self.resolve_consumer_group_id(stream_id, topic_id, group_id)?;
- Ok(self.delete_consumer_group_base(stream, topic, group))
- }
-
fn delete_consumer_group_base(
&self,
stream: usize,
@@ -177,36 +164,4 @@ impl IggyShard {
Ok(())
}
-
- pub fn get_consumer_group_from_metadata(
- &self,
- stream_id: usize,
- topic_id: usize,
- group_id: usize,
- ) -> bytes::Bytes {
- let Some(cg_meta) = self
- .metadata
- .get_consumer_group(stream_id, topic_id, group_id)
- else {
- return bytes::Bytes::new();
- };
-
- let mut bytes = BytesMut::new();
-
- bytes.put_u32_le(cg_meta.id as u32);
- bytes.put_u32_le(cg_meta.partitions.len() as u32);
- bytes.put_u32_le(cg_meta.members.len() as u32);
- bytes.put_u8(cg_meta.name.len() as u8);
- bytes.put_slice(cg_meta.name.as_bytes());
-
- for (_, member) in cg_meta.members.iter() {
- bytes.put_u32_le(member.id as u32);
- bytes.put_u32_le(member.partitions.len() as u32);
- for &partition_id in &member.partitions {
- bytes.put_u32_le(partition_id as u32);
- }
- }
-
- bytes.freeze()
- }
}
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 1a8d2f7c3..961011bf8 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -320,17 +320,4 @@ impl IggyShard {
pub async fn delete_consumer_offset_from_disk(&self, path: &str) ->
Result<(), IggyError> {
crate::streaming::partitions::storage::delete_persisted_offset(path).await
}
-
- pub fn store_consumer_offset_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- polling_consumer: &PollingConsumer,
- partition_id: usize,
- offset: u64,
- ) -> Result<(), IggyError> {
- let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
- self.store_consumer_offset_base(stream, topic, polling_consumer,
partition_id, offset);
- Ok(())
- }
}
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index d8fd982c9..fb4a68496 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -17,22 +17,10 @@
*/
use crate::shard::IggyShard;
use crate::streaming::segments::Segment;
+use iggy_common::IggyError;
use iggy_common::sharding::IggyNamespace;
-use iggy_common::{Identifier, IggyError};
impl IggyShard {
- pub async fn delete_segments_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: usize,
- segments_count: u32,
- ) -> Result<(), IggyError> {
- let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
- self.delete_segments_base(stream, topic, partition_id, segments_count)
- .await
- }
-
pub(crate) async fn delete_segments_base(
&self,
stream: usize,
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 060b4e0b4..8cdde46f3 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -128,29 +128,6 @@ impl IggyShard {
)
}
- #[allow(clippy::too_many_arguments)]
- pub fn update_topic_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- name: String,
- message_expiry: IggyExpiry,
- compression_algorithm: CompressionAlgorithm,
- max_topic_size: MaxTopicSize,
- replication_factor: Option<u8>,
- ) -> Result<(), IggyError> {
- let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
- self.update_topic_base(
- stream,
- topic,
- name,
- message_expiry,
- compression_algorithm,
- max_topic_size,
- replication_factor.unwrap_or(1),
- )
- }
-
#[allow(clippy::too_many_arguments)]
fn update_topic_base(
&self,
@@ -229,15 +206,6 @@ impl IggyShard {
Ok(topic_info)
}
- pub fn delete_topic_bypass_auth(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- ) -> Result<DeletedTopicInfo, IggyError> {
- let (stream, topic) = self.resolve_topic_id(stream_id, topic_id)?;
- Ok(self.delete_topic_base(stream, topic))
- }
-
fn delete_topic_base(&self, stream: usize, topic: usize) ->
DeletedTopicInfo {
let (topic_name, partition_ids) = self.metadata.with_metadata(|m| {
let stream_meta = m.streams.get(stream).expect("Stream metadata
must exist");
diff --git a/core/server/src/shard/system/utils.rs
b/core/server/src/shard/system/utils.rs
index 6c387bf20..4919e3619 100644
--- a/core/server/src/shard/system/utils.rs
+++ b/core/server/src/shard/system/utils.rs
@@ -75,11 +75,6 @@ impl IggyShard {
Ok((stream, topic, group))
}
- pub fn ensure_stream_exists(&self, stream_id: &Identifier) -> Result<(),
IggyError> {
- self.resolve_stream_id(stream_id)?;
- Ok(())
- }
-
pub fn ensure_topic_exists(
&self,
stream_id: &Identifier,
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 197bed43b..159cd265e 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -226,11 +226,6 @@ impl IggyIndexesMut {
self.buffer.clear();
}
- /// Gets the number of unsaved indexes
- pub fn unsaved_count(&self) -> u32 {
- self.count().saturating_sub(self.saved_count)
- }
-
/// Gets the unsaved part of the index buffer
pub fn unsaved_slice(&self) -> PooledBuffer {
let start_pos = self.saved_count as usize * INDEX_SIZE;
diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs
b/core/server/src/streaming/segments/messages/messages_reader.rs
index 601ae9c30..ec2f9b856 100644
--- a/core/server/src/streaming/segments/messages/messages_reader.rs
+++ b/core/server/src/streaming/segments/messages/messages_reader.rs
@@ -88,44 +88,6 @@ impl MessagesReader {
self.file_path.clone()
}
- /// Loads and returns all message IDs from the messages file.
- /// Note that this function does not use the pool, as the messages are not
cached.
- /// This is expected - this method is called at startup and we want to
preserve
- /// memory pool usage.
- pub async fn load_all_message_ids_from_disk(
- &self,
- indexes: IggyIndexesMut,
- messages_count: u32,
- ) -> Result<Vec<u128>, IggyError> {
- let file_size = self.file_size();
- if file_size == 0 {
- return Ok(vec![]);
- }
-
- let messages_bytes = match self.read_at(0, file_size, false).await {
- Ok(buf) => buf,
- Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
- return Ok(vec![]);
- }
- Err(e) => {
- error!(
- "Error reading {messages_count} messages at position 0 in
file {} of size {}: {e}",
- self.file_path, file_size
- );
- return Err(IggyError::CannotReadMessage);
- }
- };
-
- let messages =
IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages_bytes);
- let mut ids = Vec::with_capacity(messages_count as usize);
-
- for message in messages.iter() {
- ids.push(message.header().id());
- }
-
- Ok(ids)
- }
-
/// Loads and returns a batch of messages from the messages file.
pub async fn load_messages_from_disk(
&self,
diff --git a/core/server/src/streaming/segments/segment.rs
b/core/server/src/streaming/segments/segment.rs
index aa72ac37d..fe653b44f 100644
--- a/core/server/src/streaming/segments/segment.rs
+++ b/core/server/src/streaming/segments/segment.rs
@@ -76,10 +76,6 @@ impl Segment {
self.is_expired(IggyTimestamp::now())
}
- pub fn is_sealed(&self) -> bool {
- self.sealed
- }
-
pub fn is_expired(&self, now: IggyTimestamp) -> bool {
if !self.sealed {
return false;
diff --git
a/core/server/src/streaming/segments/types/message_header_view_mut.rs
b/core/server/src/streaming/segments/types/message_header_view_mut.rs
index c63b32a95..9caf8ffd1 100644
--- a/core/server/src/streaming/segments/types/message_header_view_mut.rs
+++ b/core/server/src/streaming/segments/types/message_header_view_mut.rs
@@ -89,19 +89,4 @@ impl<'a> IggyMessageHeaderViewMut<'a> {
let bytes = value.to_le_bytes();
self.data[IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE].copy_from_slice(&bytes);
}
-
- pub fn set_origin_timestamp(&mut self, value: u64) {
- let bytes = value.to_le_bytes();
-
self.data[IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE].copy_from_slice(&bytes);
- }
-
- pub fn set_headers_length(&mut self, value: u32) {
- let bytes = value.to_le_bytes();
-
self.data[IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE].copy_from_slice(&bytes);
- }
-
- pub fn set_payload_length(&mut self, value: u32) {
- let bytes = value.to_le_bytes();
-
self.data[IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE].copy_from_slice(&bytes);
- }
}