This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch create_resource_option_id_removal
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d981ac214d53cad76800d0b7845392a58b99d7d7
Author: numminex <[email protected]>
AuthorDate: Sun Sep 21 16:30:35 2025 +0200

    extra fixes
---
 core/ai/mcp/src/service/mod.rs                     |  9 +--
 core/ai/mcp/src/service/requests.rs                |  8 ---
 core/bench/src/benchmarks/benchmark.rs             |  3 +-
 core/bench/src/benchmarks/common.rs                |  5 +-
 .../create_consumer_group.rs                       |  1 -
 .../src/cli/binary_streams/create_stream.rs        |  2 +-
 core/cli/src/main.rs                               |  4 +-
 .../scenarios/consumer_group_join_scenario.rs      | 44 ++++++++------
 ...h_multiple_clients_polling_messages_scenario.rs |  8 +--
 ...with_single_client_polling_messages_scenario.rs |  1 -
 core/integration/tests/server/scenarios/mod.rs     |  4 +-
 core/tools/src/data-seeder/seeder.rs               | 65 ++++++++++----------
 examples/rust/src/getting-started/producer/main.rs | 70 +++++++++++++++-------
 examples/rust/src/multi-tenant/producer/main.rs    |  2 +-
 examples/rust/src/shared/system.rs                 |  3 +-
 15 files changed, 122 insertions(+), 107 deletions(-)

diff --git a/core/ai/mcp/src/service/mod.rs b/core/ai/mcp/src/service/mod.rs
index 24d28c0c..166e839d 100644
--- a/core/ai/mcp/src/service/mod.rs
+++ b/core/ai/mcp/src/service/mod.rs
@@ -79,10 +79,10 @@ impl IggyService {
     #[tool(description = "Create stream")]
     pub async fn create_stream(
         &self,
-        Parameters(CreateStream { name, stream_id }): Parameters<CreateStream>,
+        Parameters(CreateStream { name }): Parameters<CreateStream>,
     ) -> Result<CallToolResult, ErrorData> {
         self.permissions.ensure_create()?;
-        request(self.client.create_stream(&name, stream_id).await)
+        request(self.client.create_stream(&name).await)
     }
 
     #[tool(description = "Update stream")]
@@ -146,7 +146,6 @@ impl IggyService {
             partitions_count,
             compression_algorithm,
             replication_factor,
-            topic_id,
             message_expiry,
             max_size,
         }): Parameters<CreateTopic>,
@@ -167,7 +166,6 @@ impl IggyService {
                     partitions_count,
                     compression_algorithm,
                     replication_factor,
-                    topic_id,
                     message_expiry,
                     max_size,
                 )
@@ -501,13 +499,12 @@ impl IggyService {
             stream_id,
             topic_id,
             name,
-            group_id,
         }): Parameters<CreateConsumerGroup>,
     ) -> Result<CallToolResult, ErrorData> {
         self.permissions.ensure_create()?;
         request(
             self.client
-                .create_consumer_group(&id(&stream_id)?, &id(&topic_id)?, 
&name, group_id)
+                .create_consumer_group(&id(&stream_id)?, &id(&topic_id)?, 
&name)
                 .await,
         )
     }
diff --git a/core/ai/mcp/src/service/requests.rs 
b/core/ai/mcp/src/service/requests.rs
index 1a8aea5c..f4ee7039 100644
--- a/core/ai/mcp/src/service/requests.rs
+++ b/core/ai/mcp/src/service/requests.rs
@@ -32,8 +32,6 @@ pub struct GetStream {
 pub struct CreateStream {
     #[schemars(description = "stream name (required, must be unique)")]
     pub name: String,
-    #[schemars(description = "stream identifier (numeric, optional)")]
-    pub stream_id: Option<u32>,
 }
 
 #[derive(Debug, Deserialize, JsonSchema)]
@@ -88,9 +86,6 @@ pub struct CreateTopic {
     #[schemars(description = "replication factor (optional, must be greater 
than 0)")]
     pub replication_factor: Option<u8>,
 
-    #[schemars(description = "topic identifier (numeric, optional)")]
-    pub topic_id: Option<u32>,
-
     #[schemars(description = "message expiry (optional)")]
     pub message_expiry: Option<String>,
 
@@ -282,9 +277,6 @@ pub struct CreateConsumerGroup {
 
     #[schemars(description = "consumer group name (required, must be unique)")]
     pub name: String,
-
-    #[schemars(description = "consumer group identifier (optional, number)")]
-    pub group_id: Option<u32>,
 }
 
 #[derive(Debug, Deserialize, JsonSchema)]
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 77179033..37e65b78 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -106,7 +106,7 @@ pub trait Benchmarkable: Send {
             let stream_id: Identifier = stream_name.as_str().try_into()?;
             if streams.iter().all(|s| s.name != stream_name) {
                 info!("Creating the test stream '{}'", stream_name);
-                client.create_stream(&stream_name, None).await?;
+                client.create_stream(&stream_name).await?;
                 let topic_name = "topic-1".to_string();
                 let max_topic_size = self
                     .args()
@@ -125,7 +125,6 @@ pub trait Benchmarkable: Send {
                         partitions_count,
                         CompressionAlgorithm::default(),
                         None,
-                        None,
                         IggyExpiry::NeverExpire,
                         max_topic_size,
                     )
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index d9d19991..6b588646 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -84,15 +84,14 @@ pub async fn init_consumer_groups(
         let topic_id: Identifier = "topic-1".try_into()?;
         let consumer_group_name = 
format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}");
         info!(
-            "Creating test consumer group: name={}, id={}, stream={}, 
topic={}",
-            consumer_group_name, consumer_group_id, stream_name, topic_id
+            "Creating test consumer group: name={},  stream={}, topic={}",
+            consumer_group_name, stream_name, topic_id
         );
         match client
             .create_consumer_group(
                 &stream_id,
                 &topic_id,
                 &consumer_group_name,
-                Some(consumer_group_id),
             )
             .await
         {
diff --git 
a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs 
b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs
index ed90c5b9..34e2106a 100644
--- 
a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs
+++ 
b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs
@@ -33,7 +33,6 @@ impl CreateConsumerGroupCmd {
         stream_id: Identifier,
         topic_id: Identifier,
         name: String,
-        _group_id: Option<u32>,
     ) -> Self {
         Self {
             create_consumer_group: CreateConsumerGroup {
diff --git a/core/binary_protocol/src/cli/binary_streams/create_stream.rs 
b/core/binary_protocol/src/cli/binary_streams/create_stream.rs
index ed7472c1..9fe750f8 100644
--- a/core/binary_protocol/src/cli/binary_streams/create_stream.rs
+++ b/core/binary_protocol/src/cli/binary_streams/create_stream.rs
@@ -28,7 +28,7 @@ pub struct CreateStreamCmd {
 }
 
 impl CreateStreamCmd {
-    pub fn new(_stream_id: Option<u32>, name: String) -> Self {
+    pub fn new(name: String) -> Self {
         Self {
             create_stream: CreateStream { name },
         }
diff --git a/core/cli/src/main.rs b/core/cli/src/main.rs
index d21536bb..8451b0d6 100644
--- a/core/cli/src/main.rs
+++ b/core/cli/src/main.rs
@@ -107,7 +107,7 @@ fn get_command(
     match command {
         Command::Stream(command) => match command {
             StreamAction::Create(args) => {
-                Box::new(CreateStreamCmd::new(args.stream_id, 
args.name.clone()))
+                Box::new(CreateStreamCmd::new(args.name.clone()))
             }
             StreamAction::Delete(args) => 
Box::new(DeleteStreamCmd::new(args.stream_id.clone())),
             StreamAction::Update(args) => Box::new(UpdateStreamCmd::new(
@@ -121,7 +121,6 @@ fn get_command(
         Command::Topic(command) => match command {
             TopicAction::Create(args) => Box::new(CreateTopicCmd::new(
                 args.stream_id.clone(),
-                args.topic_id,
                 args.partitions_count,
                 args.compression_algorithm,
                 args.name.clone(),
@@ -252,7 +251,6 @@ fn get_command(
                 create_args.stream_id.clone(),
                 create_args.topic_id.clone(),
                 create_args.name.clone(),
-                create_args.group_id,
             )),
             ConsumerGroupAction::Delete(delete_args) => 
Box::new(DeleteConsumerGroupCmd::new(
                 delete_args.stream_id.clone(),
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs 
b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
index 1b297228..bd2a08f2 100644
--- a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
+++ b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs
@@ -42,37 +42,41 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     login_root(&system_client).await;
 
     // 1. Create the stream
-    system_client
-        .create_stream(STREAM_NAME, Some(STREAM_ID))
+    let stream = system_client
+        .create_stream(STREAM_NAME)
         .await
         .unwrap();
 
+    let stream_id = stream.id;
+
     // 2. Create the topic
-    system_client
+    let topic = system_client
         .create_topic(
             &Identifier::named(STREAM_NAME).unwrap(),
             TOPIC_NAME,
             PARTITIONS_COUNT,
             CompressionAlgorithm::default(),
             None,
-            Some(TOPIC_ID),
             IggyExpiry::NeverExpire,
             MaxTopicSize::ServerDefault,
         )
         .await
         .unwrap();
 
+    let topic_id = topic.id;
+
     // 3. Create the consumer group
-    system_client
+    let consumer_group = system_client
         .create_consumer_group(
             &Identifier::named(STREAM_NAME).unwrap(),
             &Identifier::named(TOPIC_NAME).unwrap(),
             CONSUMER_GROUP_NAME,
-            Some(CONSUMER_GROUP_ID),
         )
         .await
         .unwrap();
 
+    let consumer_group_id = consumer_group.id;
+
     // 4. Create the users for all clients
     create_user(&system_client, USERNAME_1).await;
     create_user(&system_client, USERNAME_2).await;
@@ -87,10 +91,10 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     join_consumer_group(&client1).await;
 
     // 5. Get client1 info and validate that it contains the single consumer 
group
-    let client1_info = get_me_and_validate_consumer_groups(&client1).await;
+    let client1_info = get_me_and_validate_consumer_groups(&client1, 
stream_id, topic_id, consumer_group_id).await;
 
     // 6. Validate that the consumer group has 1 member and this member has 
all partitions assigned
-    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 1).await;
+    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 1, 
consumer_group_id).await;
     let member = &consumer_group.members[0];
     assert_eq!(member.id, client1_info.client_id);
     assert_eq!(member.partitions_count, PARTITIONS_COUNT);
@@ -100,10 +104,10 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     join_consumer_group(&client2).await;
 
     // 8. Validate that client 2 contains the single consumer group
-    get_me_and_validate_consumer_groups(&client2).await;
+    get_me_and_validate_consumer_groups(&client2, stream_id, topic_id, 
consumer_group_id).await;
 
     // 9. Validate that the consumer group has 2 members and partitions are 
distributed between them
-    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 2).await;
+    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 2, 
consumer_group_id).await;
     let member1 = &consumer_group.members[0];
     let member2 = &consumer_group.members[1];
     assert!(member1.partitions_count >= 1 && member1.partitions_count < 
PARTITIONS_COUNT);
@@ -117,10 +121,10 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     join_consumer_group(&client3).await;
 
     // 11. Validate that client 3 contains the single consumer group
-    get_me_and_validate_consumer_groups(&client3).await;
+    get_me_and_validate_consumer_groups(&client3, stream_id, topic_id, 
consumer_group_id).await;
 
     // 12. Validate that the consumer group has 3 members and partitions are 
equally distributed between them
-    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 3).await;
+    let consumer_group = 
get_consumer_group_and_validate_members(&system_client, 3, 
consumer_group_id).await;
     let member1 = &consumer_group.members[0];
     let member2 = &consumer_group.members[1];
     let member3 = &consumer_group.members[2];
@@ -135,7 +139,12 @@ pub async fn run(client_factory: &dyn ClientFactory) {
     assert_clean_system(&system_client).await;
 }
 
-async fn get_me_and_validate_consumer_groups(client: &IggyClient) -> 
ClientInfoDetails {
+async fn get_me_and_validate_consumer_groups(
+    client: &IggyClient, 
+    stream_id: u32, 
+    topic_id: u32, 
+    consumer_group_id: u32
+) -> ClientInfoDetails {
     let client_info = client.get_me().await.unwrap();
 
     assert!(client_info.client_id > 0);
@@ -143,9 +152,9 @@ async fn get_me_and_validate_consumer_groups(client: 
&IggyClient) -> ClientInfoD
     assert_eq!(client_info.consumer_groups.len(), 1);
 
     let consumer_group = &client_info.consumer_groups[0];
-    assert_eq!(consumer_group.stream_id, STREAM_ID);
-    assert_eq!(consumer_group.topic_id, TOPIC_ID);
-    assert_eq!(consumer_group.group_id, CONSUMER_GROUP_ID);
+    assert_eq!(consumer_group.stream_id, stream_id);
+    assert_eq!(consumer_group.topic_id, topic_id);
+    assert_eq!(consumer_group.group_id, consumer_group_id);
 
     client_info
 }
@@ -153,6 +162,7 @@ async fn get_me_and_validate_consumer_groups(client: 
&IggyClient) -> ClientInfoD
 async fn get_consumer_group_and_validate_members(
     client: &IggyClient,
     members_count: u32,
+    consumer_group_id: u32,
 ) -> ConsumerGroupDetails {
     let consumer_group = client
         .get_consumer_group(
@@ -164,7 +174,7 @@ async fn get_consumer_group_and_validate_members(
         .unwrap()
         .expect("Failed to get consumer group");
 
-    assert_eq!(consumer_group.id, CONSUMER_GROUP_ID);
+    assert_eq!(consumer_group.id, consumer_group_id);
     assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME);
     assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT);
     assert_eq!(consumer_group.members_count, members_count);
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
index 1d3bc543..f1e42c15 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs
@@ -51,7 +51,7 @@ async fn init_system(
 ) {
     // 1. Create the stream
     system_client
-        .create_stream(STREAM_NAME, Some(STREAM_ID))
+        .create_stream(STREAM_NAME)
         .await
         .unwrap();
 
@@ -63,7 +63,6 @@ async fn init_system(
             PARTITIONS_COUNT,
             CompressionAlgorithm::default(),
             None,
-            Some(TOPIC_ID),
             IggyExpiry::NeverExpire,
             MaxTopicSize::ServerDefault,
         )
@@ -76,7 +75,6 @@ async fn init_system(
             &Identifier::named(STREAM_NAME).unwrap(),
             &Identifier::named(TOPIC_NAME).unwrap(),
             CONSUMER_GROUP_NAME,
-            Some(CONSUMER_GROUP_ID),
         )
         .await
         .unwrap();
@@ -136,7 +134,7 @@ async fn execute_using_messages_key_key(
 }
 
 async fn poll_messages(client: &IggyClient) -> u32 {
-    let consumer = 
Consumer::group(Identifier::numeric(CONSUMER_GROUP_ID).unwrap());
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
     let mut total_read_messages_count = 0;
     for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT {
         let polled_messages = client
@@ -202,7 +200,7 @@ async fn execute_using_none_key(
 }
 
 async fn validate_message_polling(client: &IggyClient, consumer_group: 
&ConsumerGroupDetails) {
-    let consumer = 
Consumer::group(Identifier::numeric(CONSUMER_GROUP_ID).unwrap());
+    let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
     let client_info = client.get_me().await.unwrap();
     let consumer_group_member = consumer_group
         .members
diff --git 
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
index 4933a7dc..ff3b0aff 100644
--- 
a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs
@@ -78,7 +78,6 @@ async fn init_system(client: &IggyClient) {
     let consumer_group_info = get_consumer_group(client).await;
 
     let client_info = client.get_me().await.unwrap();
-    assert_eq!(consumer_group_info.id, CONSUMER_GROUP_ID);
 
     assert_eq!(consumer_group_info.members_count, 1);
     assert_eq!(consumer_group_info.members.len(), 1);
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 50029a8d..c63cc212 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -66,7 +66,7 @@ async fn join_consumer_group(client: &IggyClient) {
     client
         .join_consumer_group(
             &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
             &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
         )
         .await
@@ -77,7 +77,7 @@ async fn leave_consumer_group(client: &IggyClient) {
     client
         .leave_consumer_group(
             &Identifier::named(STREAM_NAME).unwrap(),
-            &Identifier::numeric(TOPIC_ID).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
             &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
         )
         .await
diff --git a/core/tools/src/data-seeder/seeder.rs 
b/core/tools/src/data-seeder/seeder.rs
index 8691b534..ec536317 100644
--- a/core/tools/src/data-seeder/seeder.rs
+++ b/core/tools/src/data-seeder/seeder.rs
@@ -21,36 +21,41 @@ use rand::Rng;
 use std::collections::HashMap;
 use std::str::FromStr;
 
-const PROD_STREAM_ID: u32 = 1;
-const TEST_STREAM_ID: u32 = 2;
-const DEV_STREAM_ID: u32 = 3;
+const PROD_STREAM_NAME: &str = "prod";
+const TEST_STREAM_NAME: &str = "test";
+const DEV_STREAM_NAME: &str = "dev";
 
 pub async fn seed(client: &IggyClient) -> Result<(), IggyError> {
-    create_streams(client).await?;
-    create_topics(client).await?;
-    send_messages(client).await?;
+    let streams = create_streams(client).await?;
+    create_topics(client, &streams).await?;
+    send_messages(client, &streams).await?;
     Ok(())
 }
 
-async fn create_streams(client: &IggyClient) -> Result<(), IggyError> {
-    client.create_stream("prod", Some(PROD_STREAM_ID)).await?;
-    client.create_stream("test", Some(TEST_STREAM_ID)).await?;
-    client.create_stream("dev", Some(DEV_STREAM_ID)).await?;
-    Ok(())
+async fn create_streams(client: &IggyClient) -> Result<Vec<(String, u32)>, 
IggyError> {
+    let mut streams = Vec::new();
+    
+    let prod_stream = client.create_stream(PROD_STREAM_NAME).await?;
+    streams.push((PROD_STREAM_NAME.to_string(), prod_stream.id));
+    
+    let test_stream = client.create_stream(TEST_STREAM_NAME).await?;
+    streams.push((TEST_STREAM_NAME.to_string(), test_stream.id));
+    
+    let dev_stream = client.create_stream(DEV_STREAM_NAME).await?;
+    streams.push((DEV_STREAM_NAME.to_string(), dev_stream.id));
+    
+    Ok(streams)
 }
 
-async fn create_topics(client: &IggyClient) -> Result<(), IggyError> {
-    let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID];
-    for stream_id in streams {
-        let stream_id = stream_id.try_into()?;
+async fn create_topics(client: &IggyClient, streams: &[(String, u32)]) -> 
Result<(), IggyError> {
+    for (stream_name, _stream_id) in streams {
         client
             .create_topic(
-                &stream_id,
+                &Identifier::named(stream_name).unwrap(),
                 "orders",
                 1,
                 Default::default(),
                 None,
-                None,
                 IggyExpiry::NeverExpire,
                 MaxTopicSize::ServerDefault,
             )
@@ -58,12 +63,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), 
IggyError> {
 
         client
             .create_topic(
-                &stream_id,
+                &Identifier::named(stream_name).unwrap(),
                 "users",
                 2,
                 Default::default(),
                 None,
-                None,
                 IggyExpiry::NeverExpire,
                 MaxTopicSize::ServerDefault,
             )
@@ -71,12 +75,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), 
IggyError> {
 
         client
             .create_topic(
-                &stream_id,
+                &Identifier::named(stream_name).unwrap(),
                 "notifications",
                 3,
                 Default::default(),
                 None,
-                None,
                 IggyExpiry::NeverExpire,
                 MaxTopicSize::ServerDefault,
             )
@@ -84,12 +87,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), 
IggyError> {
 
         client
             .create_topic(
-                &stream_id,
+                &Identifier::named(stream_name).unwrap(),
                 "payments",
                 2,
                 Default::default(),
                 None,
-                None,
                 IggyExpiry::NeverExpire,
                 MaxTopicSize::ServerDefault,
             )
@@ -97,12 +99,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), 
IggyError> {
 
         client
             .create_topic(
-                &stream_id,
+                &Identifier::named(stream_name).unwrap(),
                 "deliveries",
                 1,
                 Default::default(),
                 None,
-                None,
                 IggyExpiry::NeverExpire,
                 MaxTopicSize::ServerDefault,
             )
@@ -111,9 +112,8 @@ async fn create_topics(client: &IggyClient) -> Result<(), 
IggyError> {
     Ok(())
 }
 
-async fn send_messages(client: &IggyClient) -> Result<(), IggyError> {
+async fn send_messages(client: &IggyClient, streams: &[(String, u32)]) -> 
Result<(), IggyError> {
     let mut rng = rand::rng();
-    let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID];
     let partitioning = Partitioning::balanced();
 
     let message_batches_range = 100..=1000;
@@ -122,18 +122,19 @@ async fn send_messages(client: &IggyClient) -> Result<(), 
IggyError> {
     let mut total_messages_sent = 0;
     let mut total_batches_sent = 0;
 
-    for (stream_idx, stream_id) in streams.iter().enumerate() {
-        let stream_id_identifier = (*stream_id).try_into()?;
+    for (stream_idx, (stream_name, stream_id)) in streams.iter().enumerate() {
+        let stream_id_identifier = Identifier::named(stream_name).unwrap();
         let topics = client.get_topics(&stream_id_identifier).await?;
         tracing::info!(
-            "Processing stream {} ({}/{})",
+            "Processing stream {} ({}) ({}/{})",
+            stream_name,
             stream_id,
             stream_idx + 1,
             streams.len()
         );
 
         for (topic_idx, topic) in topics.iter().enumerate() {
-            let topic_id = topic.id.try_into()?;
+            let topic_id_identifier = Identifier::named(&topic.name).unwrap();
 
             let message_batches = 
rng.random_range(message_batches_range.clone());
             tracing::info!(
@@ -183,7 +184,7 @@ async fn send_messages(client: &IggyClient) -> Result<(), 
IggyError> {
                 client
                     .send_messages(
                         &stream_id_identifier,
-                        &topic_id,
+                        &topic_id_identifier,
                         &partitioning,
                         &mut messages,
                     )
diff --git a/examples/rust/src/getting-started/producer/main.rs 
b/examples/rust/src/getting-started/producer/main.rs
index 60e8a1f4..6114bc74 100644
--- a/examples/rust/src/getting-started/producer/main.rs
+++ b/examples/rust/src/getting-started/producer/main.rs
@@ -25,9 +25,9 @@ use tracing_subscriber::layer::SubscriberExt;
 use tracing_subscriber::util::SubscriberInitExt;
 use tracing_subscriber::{EnvFilter, Registry};
 
-const STREAM_ID: u32 = 1;
-const TOPIC_ID: u32 = 1;
-const PARTITION_ID: u32 = 1;
+const STREAM_NAME: &str = "sample-stream";
+const TOPIC_NAME: &str = "sample-topic";
+const PARTITION_ID: u32 = 0;
 const BATCHES_LIMIT: u32 = 5;
 
 #[tokio::main]
@@ -50,41 +50,65 @@ async fn main() -> Result<(), Box<dyn Error>> {
     client
         .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
         .await?;
-    init_system(&client).await;
-    produce_messages(&client).await
+    let (stream_id, topic_id) = init_system(&client).await;
+    produce_messages(&client, stream_id, topic_id).await
 }
 
-async fn init_system(client: &IggyClient) {
-    match client.create_stream("sample-stream", Some(STREAM_ID)).await {
-        Ok(_) => info!("Stream was created."),
-        Err(_) => warn!("Stream already exists and will not be created 
again."),
-    }
+async fn init_system(client: &IggyClient) -> (u32, u32) {
+    let stream = match client.create_stream(STREAM_NAME).await {
+        Ok(stream) => {
+            info!("Stream was created.");
+            stream
+        }
+        Err(_) => {
+            warn!("Stream already exists and will not be created again.");
+            client.get_stream(&Identifier::named(STREAM_NAME).unwrap())
+                .await
+                .unwrap()
+                .expect("Failed to get stream")
+        }
+    };
 
-    match client
+    let topic = match client
         .create_topic(
-            &STREAM_ID.try_into().unwrap(),
-            "sample-topic",
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
             1,
             CompressionAlgorithm::default(),
             None,
-            Some(TOPIC_ID),
             IggyExpiry::NeverExpire,
             MaxTopicSize::ServerDefault,
         )
         .await
     {
-        Ok(_) => info!("Topic was created."),
-        Err(_) => warn!("Topic already exists and will not be created again."),
-    }
+        Ok(topic) => {
+            info!("Topic was created.");
+            topic
+        }
+        Err(_) => {
+            warn!("Topic already exists and will not be created again.");
+            client.get_topic(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+            )
+            .await
+            .unwrap()
+            .expect("Failed to get topic")
+        }
+    };
+
+    (stream.id, topic.id)
 }
 
-async fn produce_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> {
+async fn produce_messages(client: &dyn Client, stream_id: u32, topic_id: u32) 
-> Result<(), Box<dyn Error>> {
     let duration = IggyDuration::from_str("500ms")?;
     let mut interval = tokio::time::interval(duration.get_duration());
     info!(
-        "Messages will be sent to stream: {}, topic: {}, partition: {} with 
interval {}.",
-        STREAM_ID,
-        TOPIC_ID,
+        "Messages will be sent to stream: {} ({}), topic: {} ({}), partition: 
{} with interval {}.",
+        STREAM_NAME,
+        stream_id,
+        TOPIC_NAME,
+        topic_id,
         PARTITION_ID,
         duration.as_human_time_string()
     );
@@ -109,8 +133,8 @@ async fn produce_messages(client: &dyn Client) -> 
Result<(), Box<dyn Error>> {
         }
         client
             .send_messages(
-                &STREAM_ID.try_into().unwrap(),
-                &TOPIC_ID.try_into().unwrap(),
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
                 &partitioning,
                 &mut messages,
             )
diff --git a/examples/rust/src/multi-tenant/producer/main.rs 
b/examples/rust/src/multi-tenant/producer/main.rs
index 2a9682de..ec46086c 100644
--- a/examples/rust/src/multi-tenant/producer/main.rs
+++ b/examples/rust/src/multi-tenant/producer/main.rs
@@ -327,7 +327,7 @@ async fn create_stream_and_user(
     username: &str,
     client: &IggyClient,
 ) -> Result<(), IggyError> {
-    let stream = client.create_stream(stream_name, None).await?;
+    let stream = client.create_stream(stream_name).await?;
     info!("Created stream: {stream_name} with ID: {}", stream.id);
     let mut streams_permissions = AHashMap::new();
     streams_permissions.insert(
diff --git a/examples/rust/src/shared/system.rs 
b/examples/rust/src/shared/system.rs
index 30aa382b..8c53722d 100644
--- a/examples/rust/src/shared/system.rs
+++ b/examples/rust/src/shared/system.rs
@@ -82,7 +82,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn 
Client) -> Result<(), Ig
     }
 
     info!("Stream does not exist, creating...");
-    client.create_stream(&args.stream_id, None).await?;
+    client.create_stream(&args.stream_id).await?;
     client
         .create_topic(
             &stream_id,
@@ -90,7 +90,6 @@ pub async fn init_by_producer(args: &Args, client: &dyn 
Client) -> Result<(), Ig
             args.partitions_count,
             CompressionAlgorithm::from_code(args.compression_algorithm)?,
             None,
-            None,
             IggyExpiry::NeverExpire,
             MaxTopicSize::ServerDefault,
         )

Reply via email to