This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e58bee328 fix(server): handle entity ID gaps on bootstrap after
deletions (#2548)
e58bee328 is described below
commit e58bee328fdd3b66cc87115d3733941a25c13fa0
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Jan 8 12:36:58 2026 +0100
fix(server): handle entity ID gaps on bootstrap after deletions (#2548)
Server panicked on restart when streams, topics, partitions, or consumer
groups had been deleted, creating gaps in ID sequences.
The issue: `slab.insert()` auto-assigns sequential IDs (0, 1, 2...),
but after deletions the state file contains non-sequential IDs (e.g.,
{0, 2} after deleting ID 1). On restart, bootstrap called `insert()`
which returned sequential IDs, causing assertion failures when the
assigned ID didn't match the expected ID from state.
The fix uses `Slab::from_iter()` via new `from_entries()` constructors
on Streams, Topics, Partitions, and ConsumerGroups. This correctly
places entities at their original keys regardless of gaps.
---
.../data_integrity/verify_after_server_restart.rs | 210 ++++++++++++++++++++-
core/server/src/bootstrap.rs | 203 +++++++++-----------
core/server/src/shard/system/consumer_offsets.rs | 10 +-
core/server/src/slab/consumer_groups.rs | 27 +++
core/server/src/slab/partitions.rs | 38 ++++
core/server/src/slab/streams.rs | 25 +++
core/server/src/slab/topics.rs | 28 +++
core/server/src/streaming/streams/stream.rs | 4 +
core/server/src/streaming/topics/topic.rs | 8 +
9 files changed, 434 insertions(+), 119 deletions(-)
diff --git
a/core/integration/tests/data_integrity/verify_after_server_restart.rs
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 3a6059ee1..f632abd92 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -16,9 +16,7 @@
* under the License.
*/
-use iggy::clients::client::IggyClient;
-use iggy::prelude::{ConsumerGroupClient, Identifier, IggyByteSize,
MessageClient, SystemClient};
-use iggy_common::TransportProtocol;
+use iggy::prelude::*;
use integration::bench_utils::run_bench_and_wait_for_finish;
use integration::{
tcp_client::TcpClientFactory,
@@ -319,3 +317,209 @@ async fn
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
// 16. Manual cleanup
std::fs::remove_dir_all(local_data_path).unwrap();
}
+
+/// Test that verifies server correctly handles ID gaps after deletions at all
levels:
+/// streams, topics, partitions, and consumer groups.
+///
+/// Creates a hierarchy with gaps from deletions:
+/// - Streams: 0, 2 (1 deleted)
+/// - Topics per stream: 0, 2 (1 deleted)
+/// - Partitions per topic: 0, 2 (1 deleted)
+/// - Consumer groups per topic: 0, 2 (1 deleted)
+#[tokio::test]
+#[parallel]
+async fn should_handle_resource_deletion_and_restart() {
+ let env_vars = HashMap::from([(
+ SYSTEM_PATH_ENV_VAR.to_owned(),
+ TestServer::get_random_path(),
+ )]);
+
+ let mut test_server = TestServer::new(Some(env_vars.clone()), false, None,
IpAddrKind::V4);
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let local_data_path = test_server.get_local_data_path().to_owned();
+
+ let client = IggyClient::create(
+ TcpClientFactory {
+ server_addr: server_addr.clone(),
+ ..Default::default()
+ }
+ .create_client()
+ .await,
+ None,
+ None,
+ );
+ login_root(&client).await;
+
+ // Create 3 streams
+ let stream_0 = client.create_stream("stream-0").await.unwrap();
+ let stream_1 = client.create_stream("stream-1").await.unwrap();
+ let stream_2 = client.create_stream("stream-2").await.unwrap();
+
+ assert_eq!(stream_0.id, 0);
+ assert_eq!(stream_1.id, 1);
+ assert_eq!(stream_2.id, 2);
+
+ // For streams 0 and 2, create topics, partitions, and consumer groups
with gaps
+ for stream_id in [0u32, 2u32] {
+ let stream_ident = Identifier::numeric(stream_id).unwrap();
+
+ // Create 3 topics per stream (each with 3 partitions initially)
+ for topic_idx in 0..3 {
+ let topic = client
+ .create_topic(
+ &stream_ident,
+ &format!("topic-{}", topic_idx),
+ 3,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::Unlimited,
+ )
+ .await
+ .unwrap();
+ assert_eq!(topic.id, topic_idx);
+
+ let topic_ident = Identifier::numeric(topic_idx).unwrap();
+
+ // Create 3 consumer groups per topic
+ for cg_idx in 0..3 {
+ client
+ .create_consumer_group(&stream_ident, &topic_ident,
&format!("cg-{}", cg_idx))
+ .await
+ .unwrap();
+ }
+ }
+
+ // Delete middle topic (topic 1)
+ client
+ .delete_topic(&stream_ident, &Identifier::numeric(1).unwrap())
+ .await
+ .unwrap();
+
+ // For remaining topics (0 and 2), delete middle partition then middle
consumer group
+ // This order tests that the server handles partition deletion before
consumer group deletion
+ for topic_id in [0u32, 2u32] {
+ let topic_ident = Identifier::numeric(topic_id).unwrap();
+
+ client
+ .delete_partitions(&stream_ident, &topic_ident, 1)
+ .await
+ .unwrap();
+
+ client
+ .delete_consumer_group(
+ &stream_ident,
+ &topic_ident,
+ &Identifier::numeric(1).unwrap(),
+ )
+ .await
+ .unwrap();
+ }
+ }
+
+ // Delete middle stream (stream 1)
+ client
+ .delete_stream(&Identifier::numeric(1).unwrap())
+ .await
+ .unwrap();
+
+ // Verify state before restart
+ let streams = client.get_streams().await.unwrap();
+ assert_eq!(streams.len(), 2);
+ let stream_ids: Vec<u32> = streams.iter().map(|s| s.id).collect();
+ assert!(stream_ids.contains(&0) && stream_ids.contains(&2));
+
+ drop(client);
+ test_server.stop();
+ drop(test_server);
+
+ // Restart server
+ let mut test_server = TestServer::new(Some(env_vars.clone()), false, None,
IpAddrKind::V4);
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+ let client = IggyClient::create(
+ TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ }
+ .create_client()
+ .await,
+ None,
+ None,
+ );
+ login_root(&client).await;
+
+ // Verify streams after restart
+ let streams = client.get_streams().await.unwrap();
+ assert_eq!(streams.len(), 2, "Expected 2 streams after restart");
+ let stream_ids: Vec<u32> = streams.iter().map(|s| s.id).collect();
+ assert!(
+ stream_ids.contains(&0) && stream_ids.contains(&2),
+ "Expected streams 0 and 2, got: {:?}",
+ stream_ids
+ );
+
+ // Verify topics, partitions, and consumer groups for each stream
+ for stream_id in [0u32, 2u32] {
+ let stream_ident = Identifier::numeric(stream_id).unwrap();
+ let stream = client.get_stream(&stream_ident).await.unwrap().unwrap();
+
+ assert_eq!(
+ stream.topics_count, 2,
+ "Stream {} should have 2 topics",
+ stream_id
+ );
+
+ // Verify topics have correct IDs (0 and 2, not 0 and 1)
+ let topics = client.get_topics(&stream_ident).await.unwrap();
+ let topic_ids: Vec<u32> = topics.iter().map(|t| t.id).collect();
+ assert!(
+ topic_ids.contains(&0) && topic_ids.contains(&2),
+ "Stream {} should have topics 0 and 2, got: {:?}",
+ stream_id,
+ topic_ids
+ );
+
+ for topic_id in [0u32, 2u32] {
+ let topic_ident = Identifier::numeric(topic_id).unwrap();
+ let topic = client
+ .get_topic(&stream_ident, &topic_ident)
+ .await
+ .unwrap()
+ .unwrap();
+
+ assert_eq!(
+ topic.partitions_count, 2,
+ "Topic {} in stream {} should have 2 partitions",
+ topic_id, stream_id
+ );
+
+ // Verify consumer groups have correct IDs (0 and 2)
+ let cgs = client
+ .get_consumer_groups(&stream_ident, &topic_ident)
+ .await
+ .unwrap();
+ assert_eq!(
+ cgs.len(),
+ 2,
+ "Topic {} in stream {} should have 2 consumer groups",
+ topic_id,
+ stream_id
+ );
+ let cg_ids: Vec<u32> = cgs.iter().map(|c| c.id).collect();
+ assert!(
+ cg_ids.contains(&0) && cg_ids.contains(&2),
+ "Topic {} in stream {} should have consumer groups 0 and 2,
got: {:?}",
+ topic_id,
+ stream_id,
+ cg_ids
+ );
+ }
+ }
+
+ drop(client);
+ test_server.stop();
+ std::fs::remove_dir_all(local_data_path).unwrap();
+}
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 67c832231..361c224e5 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -36,13 +36,10 @@ use crate::{
},
},
slab::{
- streams::Streams,
- traits_ext::{
- EntityComponentSystem, EntityComponentSystemMutCell, Insert,
InsertCell, IntoComponents,
- },
- users::Users,
+ consumer_groups::ConsumerGroups, partitions::Partitions,
streams::Streams, topics::Topics,
+ traits_ext::IntoComponents, users::Users,
},
- state::system::{StreamState, TopicState, UserState},
+ state::system::{StreamState, UserState},
streaming::{
partitions::{
consumer_offset::ConsumerOffset,
@@ -82,144 +79,120 @@ pub async fn load_streams(
state: impl IntoIterator<Item = StreamState>,
config: &SystemConfig,
) -> Result<Streams, IggyError> {
- let streams = Streams::default();
- for StreamState {
- name,
- created_at,
- id,
- topics,
- } in state
- {
+ let state: Vec<StreamState> = state.into_iter().collect();
+ let mut stream_entries = Vec::with_capacity(state.len());
+
+ for stream_state in state {
+ let stream_id = stream_state.id as usize;
info!(
"Loading stream with ID: {}, name: {} from state...",
- id, name
- );
- let stream_id = id;
- let stats = Arc::new(StreamStats::default());
- let stream = stream::Stream::new(name.clone(), stats.clone(),
created_at);
- let new_id = streams.insert(stream);
- assert_eq!(
- new_id, stream_id as usize,
- "load_streams: id mismatch when inserting stream, mismatch for
stream with ID: {}, name: {}",
- stream_id, name
- );
- info!(
- "Loaded stream with ID: {}, name: {} from state...",
- id, name
+ stream_state.id, stream_state.name
);
- let topics = topics.into_values();
- for TopicState {
- id,
- name,
- created_at,
- compression_algorithm,
- message_expiry,
- max_topic_size,
- replication_factor,
- consumer_groups,
- partitions,
- } in topics
- {
+ let stream_stats = Arc::new(StreamStats::default());
+ let mut topic_entries = Vec::new();
+
+ for topic_state in stream_state.topics.into_values() {
+ let topic_id = topic_state.id as usize;
info!(
"Loading topic with ID: {}, name: {} from state...",
- id, name
+ topic_state.id, topic_state.name
);
- let topic_id = id;
- let parent_stats = stats.clone();
- let stats = Arc::new(TopicStats::new(parent_stats));
- let topic_id = streams.with_components_by_id_mut(stream_id as
usize, |(mut root, ..)| {
- let topic = topic::Topic::new(
- name.clone(),
- stats.clone(),
- created_at,
- replication_factor.unwrap_or(1),
- message_expiry,
- compression_algorithm,
- max_topic_size,
- );
- let new_id = root.topics_mut().insert(topic);
- assert_eq!(
- new_id, topic_id as usize,
- "load_streams: topic id mismatch when inserting topic,
mismatch for topic with ID: {}, name: {}",
- topic_id, &name
- );
- new_id
- });
- info!("Loaded topic with ID: {}, name: {} from state...", id,
name);
- let parent_stats = stats.clone();
- let cgs = consumer_groups.into_values();
- let partitions = partitions.into_values();
+ let topic_stats = Arc::new(TopicStats::new(stream_stats.clone()));
- // Load each partition asynchronously and insert immediately
- for partition_state in partitions {
+ // Build partitions
+ let mut partition_entries = Vec::new();
+ for partition_state in topic_state.partitions.into_values() {
+ let partition_id = partition_state.id as usize;
info!(
"Loading partition with ID: {}, for topic with ID: {} from
state...",
- partition_state.id, topic_id
+ partition_id, topic_id
);
- let partition_id = partition_state.id;
let partition = load_partition(
config,
- stream_id as usize,
+ stream_id,
topic_id,
partition_state,
- parent_stats.clone(),
+ topic_stats.clone(),
)
.await?;
- streams.with_components_by_id(stream_id as usize, |(root, ..)|
{
- root.topics()
- .with_components_by_id_mut(topic_id, |(mut root, ..)| {
- let new_id =
root.partitions_mut().insert(partition);
- assert_eq!(
- new_id, partition_id as usize,
- "load_streams: partition id mismatch when
inserting partition, mismatch for partition with ID: {}, for topic with ID: {},
for stream with ID: {}",
- partition_id, topic_id, stream_id
- );
- });
- });
+ partition_entries.push((partition_id, partition));
info!(
"Loaded partition with ID: {}, for topic with ID: {} from
state...",
partition_id, topic_id
);
}
- let partition_ids = streams.with_components_by_id(stream_id as
usize, |(root, ..)| {
- root.topics().with_components_by_id(topic_id, |(root, ..)| {
- root.partitions().with_components(|components| {
- let (root, ..) = components.into_components();
- root.iter().map(|(_, root)|
root.id()).collect::<Vec<_>>()
- })
+
+ // Build consumer groups
+ let partition_ids: Vec<_> = partition_entries.iter().map(|(id, _)|
*id).collect();
+ let cg_entries: Vec<_> = topic_state
+ .consumer_groups
+ .into_values()
+ .map(|cg_state| {
+ info!(
+ "Loading consumer group with ID: {}, name: {} for
topic with ID: {} from state...",
+ cg_state.id, cg_state.name, topic_id
+ );
+ let cg = consumer_group::ConsumerGroup::new(
+ cg_state.name.clone(),
+ Default::default(),
+ partition_ids.clone(),
+ );
+ info!(
+ "Loaded consumer group with ID: {}, name: {} for topic
with ID: {} from state...",
+ cg_state.id, cg_state.name, topic_id
+ );
+ (cg_state.id as usize, cg)
})
- });
+ .collect();
+
+ // Build topic with pre-built partitions and consumer groups
+ let mut topic = topic::Topic::new(
+ topic_state.name.clone(),
+ topic_stats,
+ topic_state.created_at,
+ topic_state.replication_factor.unwrap_or(1),
+ topic_state.message_expiry,
+ topic_state.compression_algorithm,
+ topic_state.max_topic_size,
+ );
- for cg_state in cgs {
- info!(
- "Loading consumer group with ID: {}, name: {} for topic
with ID: {} from state...",
- cg_state.id, cg_state.name, topic_id
- );
- streams.with_components_by_id(stream_id as usize, |(root, ..)|
{
- root.topics()
- .with_components_by_id_mut(topic_id, |(mut root, ..)| {
- let id = cg_state.id;
- let cg =
consumer_group::ConsumerGroup::new(cg_state.name.clone(), Default::default(),
partition_ids.clone());
- let new_id = root.consumer_groups_mut().insert(cg);
- assert_eq!(
- new_id, id as usize,
- "load_streams: consumer group id mismatch when
inserting consumer group, mismatch for consumer group with ID: {}, name: {} for
topic with ID: {}, for stream with ID: {}",
- id, cg_state.name, topic_id, stream_id
- );
- });
- });
- info!(
- "Loaded consumer group with ID: {}, name: {} for topic
with ID: {} from state...",
- cg_state.id, cg_state.name, topic_id
- );
- }
+ // Decompose, set nested containers, recompose
+ let (mut root, auxilary, stats) = topic.into_components();
+ root.set_partitions(Partitions::from_entries(partition_entries));
+ root.set_consumer_groups(ConsumerGroups::from_entries(cg_entries));
+ topic = topic::Topic::new_with_components(root, auxilary, stats);
+
+ topic_entries.push((topic_id, topic));
+ info!(
+ "Loaded topic with ID: {}, name: {} from state...",
+ topic_state.id, topic_state.name
+ );
}
+
+ // Build stream with pre-built topics
+ let mut stream = stream::Stream::new(
+ stream_state.name.clone(),
+ stream_stats,
+ stream_state.created_at,
+ );
+
+ // Decompose, set nested containers, recompose
+ let (mut root, stats) = stream.into_components();
+ root.set_topics(Topics::from_entries(topic_entries));
+ stream = stream::Stream::new_with_components(root, stats);
+
+ stream_entries.push((stream_id, stream));
+ info!(
+ "Loaded stream with ID: {}, name: {} from state...",
+ stream_state.id, stream_state.name
+ );
}
- Ok(streams)
+
+ Ok(Streams::from_entries(stream_entries))
}
pub fn load_users(state: impl IntoIterator<Item = UserState>) -> Users {
diff --git a/core/server/src/shard/system/consumer_offsets.rs
b/core/server/src/shard/system/consumer_offsets.rs
index 74cdb82f1..1777675a1 100644
--- a/core/server/src/shard/system/consumer_offsets.rs
+++ b/core/server/src/shard/system/consumer_offsets.rs
@@ -200,7 +200,15 @@ impl IggyShard {
partition_ids: &[usize],
) -> Result<(), IggyError> {
for &partition_id in partition_ids {
- // Skip if offset does not exist.
+ // Skip if partition was deleted
+ let partition_exists = self
+ .streams
+ .with_partitions(stream_id, topic_id, |p|
p.exists(partition_id));
+ if !partition_exists {
+ continue;
+ }
+
+ // Skip if offset does not exist
let has_offset = self
.streams
.with_partition_by_id(
diff --git a/core/server/src/slab/consumer_groups.rs
b/core/server/src/slab/consumer_groups.rs
index cf5af3b19..618ecb156 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -171,3 +171,30 @@ impl Default for ConsumerGroups {
}
}
}
+
+impl ConsumerGroups {
+ /// Construct from pre-built entries with specific IDs.
+ pub fn from_entries(
+ entries: impl IntoIterator<Item = (usize,
consumer_group::ConsumerGroup)>,
+ ) -> Self {
+ let entries: Vec<_> = entries.into_iter().collect();
+
+ let mut index = AHashMap::with_capacity(entries.len());
+ let mut root_entries = Vec::with_capacity(entries.len());
+ let mut members_entries = Vec::with_capacity(entries.len());
+
+ for (id, cg) in entries {
+ let (mut root, members) = cg.into_components();
+ root.update_id(id);
+ index.insert(root.key().clone(), id);
+ root_entries.push((id, root));
+ members_entries.push((id, members));
+ }
+
+ Self {
+ index,
+ root: root_entries.into_iter().collect(),
+ members: members_entries.into_iter().collect(),
+ }
+ }
+}
diff --git a/core/server/src/slab/partitions.rs
b/core/server/src/slab/partitions.rs
index 0ad5abbf0..214ffc6b1 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -208,6 +208,44 @@ impl Default for Partitions {
}
}
+impl Partitions {
+ /// Construct from pre-built entries with specific IDs.
+ pub fn from_entries(entries: impl IntoIterator<Item = (usize,
partition::Partition)>) -> Self {
+ let entries: Vec<_> = entries.into_iter().collect();
+
+ let mut root_entries = Vec::with_capacity(entries.len());
+ let mut stats_entries = Vec::with_capacity(entries.len());
+ let mut dedup_entries = Vec::with_capacity(entries.len());
+ let mut offset_entries = Vec::with_capacity(entries.len());
+ let mut consumer_offset_entries = Vec::with_capacity(entries.len());
+ let mut consumer_group_offset_entries =
Vec::with_capacity(entries.len());
+ let mut log_entries = Vec::with_capacity(entries.len());
+
+ for (id, partition) in entries {
+ let (mut root, stats, dedup, offset, consumer_offset,
consumer_group_offset, log) =
+ partition.into_components();
+ root.update_id(id);
+ root_entries.push((id, root));
+ stats_entries.push((id, stats));
+ dedup_entries.push((id, dedup));
+ offset_entries.push((id, offset));
+ consumer_offset_entries.push((id, consumer_offset));
+ consumer_group_offset_entries.push((id, consumer_group_offset));
+ log_entries.push((id, log));
+ }
+
+ Self {
+ root: root_entries.into_iter().collect(),
+ stats: stats_entries.into_iter().collect(),
+ message_deduplicator: dedup_entries.into_iter().collect(),
+ offset: offset_entries.into_iter().collect(),
+ consumer_offset: consumer_offset_entries.into_iter().collect(),
+ consumer_group_offset:
consumer_group_offset_entries.into_iter().collect(),
+ log: log_entries.into_iter().collect(),
+ }
+ }
+}
+
impl Partitions {
pub fn len(&self) -> usize {
self.root.len()
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index eebfe9a1d..66e5c7cbb 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -86,6 +86,31 @@ impl Default for Streams {
}
}
+impl Streams {
+ /// Construct from pre-built entries with specific IDs.
+ pub fn from_entries(entries: impl IntoIterator<Item = (usize,
stream::Stream)>) -> Self {
+ let entries: Vec<_> = entries.into_iter().collect();
+
+ let mut index = AHashMap::with_capacity(entries.len());
+ let mut root_entries = Vec::with_capacity(entries.len());
+ let mut stats_entries = Vec::with_capacity(entries.len());
+
+ for (id, stream) in entries {
+ let (mut root, stats) = stream.into_components();
+ root.update_id(id);
+ index.insert(root.key().clone(), id);
+ root_entries.push((id, root));
+ stats_entries.push((id, stats));
+ }
+
+ Self {
+ index: RefCell::new(index),
+ root: RefCell::new(root_entries.into_iter().collect()),
+ stats: RefCell::new(stats_entries.into_iter().collect()),
+ }
+ }
+}
+
impl<'a> From<&'a Streams> for stream::StreamRef<'a> {
fn from(value: &'a Streams) -> Self {
let root = value.root.borrow();
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 7bfed97e7..5270dbd84 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -137,6 +137,34 @@ impl Default for Topics {
}
}
+impl Topics {
+ /// Construct from pre-built entries with specific IDs.
+ pub fn from_entries(entries: impl IntoIterator<Item = (usize,
topic::Topic)>) -> Self {
+ let entries: Vec<_> = entries.into_iter().collect();
+
+ let mut index = AHashMap::with_capacity(entries.len());
+ let mut root_entries = Vec::with_capacity(entries.len());
+ let mut auxilary_entries = Vec::with_capacity(entries.len());
+ let mut stats_entries = Vec::with_capacity(entries.len());
+
+ for (id, topic) in entries {
+ let (mut root, auxilary, stats) = topic.into_components();
+ root.update_id(id);
+ index.insert(root.key().clone(), id);
+ root_entries.push((id, root));
+ auxilary_entries.push((id, auxilary));
+ stats_entries.push((id, stats));
+ }
+
+ Self {
+ index: RefCell::new(index),
+ root: RefCell::new(root_entries.into_iter().collect()),
+ auxilaries: RefCell::new(auxilary_entries.into_iter().collect()),
+ stats: RefCell::new(stats_entries.into_iter().collect()),
+ }
+ }
+}
+
impl EntityComponentSystem<InteriorMutability> for Topics {
type Idx = ContainerId;
type Entity = topic::Topic;
diff --git a/core/server/src/streaming/streams/stream.rs
b/core/server/src/streaming/streams/stream.rs
index c596a11f8..f2e4c18fe 100644
--- a/core/server/src/streaming/streams/stream.rs
+++ b/core/server/src/streaming/streams/stream.rs
@@ -90,6 +90,10 @@ impl StreamRoot {
&mut self.topics
}
+ pub fn set_topics(&mut self, topics: Topics) {
+ self.topics = topics;
+ }
+
pub fn created_at(&self) -> IggyTimestamp {
self.created_at
}
diff --git a/core/server/src/streaming/topics/topic.rs
b/core/server/src/streaming/topics/topic.rs
index 348dc3c8a..c87958f68 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -328,6 +328,14 @@ impl TopicRoot {
&mut self.consumer_groups
}
+ pub fn set_partitions(&mut self, partitions: Partitions) {
+ self.partitions = partitions;
+ }
+
+ pub fn set_consumer_groups(&mut self, consumer_groups: ConsumerGroups) {
+ self.consumer_groups = consumer_groups;
+ }
+
pub fn created_at(&self) -> IggyTimestamp {
self.created_at
}