This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch create_resource_option_id_removal in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d981ac214d53cad76800d0b7845392a58b99d7d7 Author: numminex <[email protected]> AuthorDate: Sun Sep 21 16:30:35 2025 +0200 extra fixes --- core/ai/mcp/src/service/mod.rs | 9 +-- core/ai/mcp/src/service/requests.rs | 8 --- core/bench/src/benchmarks/benchmark.rs | 3 +- core/bench/src/benchmarks/common.rs | 5 +- .../create_consumer_group.rs | 1 - .../src/cli/binary_streams/create_stream.rs | 2 +- core/cli/src/main.rs | 4 +- .../scenarios/consumer_group_join_scenario.rs | 44 ++++++++------ ...h_multiple_clients_polling_messages_scenario.rs | 8 +-- ...with_single_client_polling_messages_scenario.rs | 1 - core/integration/tests/server/scenarios/mod.rs | 4 +- core/tools/src/data-seeder/seeder.rs | 65 ++++++++++---------- examples/rust/src/getting-started/producer/main.rs | 70 +++++++++++++++------- examples/rust/src/multi-tenant/producer/main.rs | 2 +- examples/rust/src/shared/system.rs | 3 +- 15 files changed, 122 insertions(+), 107 deletions(-) diff --git a/core/ai/mcp/src/service/mod.rs b/core/ai/mcp/src/service/mod.rs index 24d28c0c..166e839d 100644 --- a/core/ai/mcp/src/service/mod.rs +++ b/core/ai/mcp/src/service/mod.rs @@ -79,10 +79,10 @@ impl IggyService { #[tool(description = "Create stream")] pub async fn create_stream( &self, - Parameters(CreateStream { name, stream_id }): Parameters<CreateStream>, + Parameters(CreateStream { name }): Parameters<CreateStream>, ) -> Result<CallToolResult, ErrorData> { self.permissions.ensure_create()?; - request(self.client.create_stream(&name, stream_id).await) + request(self.client.create_stream(&name).await) } #[tool(description = "Update stream")] @@ -146,7 +146,6 @@ impl IggyService { partitions_count, compression_algorithm, replication_factor, - topic_id, message_expiry, max_size, }): Parameters<CreateTopic>, @@ -167,7 +166,6 @@ impl IggyService { partitions_count, compression_algorithm, replication_factor, - topic_id, message_expiry, max_size, ) @@ -501,13 +499,12 @@ impl IggyService { stream_id, topic_id, name, - group_id, }): Parameters<CreateConsumerGroup>, ) -> Result<CallToolResult, ErrorData> { self.permissions.ensure_create()?; request( self.client - .create_consumer_group(&id(&stream_id)?, &id(&topic_id)?, &name, group_id) + .create_consumer_group(&id(&stream_id)?, &id(&topic_id)?, &name) .await, ) } diff --git a/core/ai/mcp/src/service/requests.rs b/core/ai/mcp/src/service/requests.rs index 1a8aea5c..f4ee7039 100644 --- a/core/ai/mcp/src/service/requests.rs +++ b/core/ai/mcp/src/service/requests.rs @@ -32,8 +32,6 @@ pub struct GetStream { pub struct CreateStream { #[schemars(description = "stream name (required, must be unique)")] pub name: String, - #[schemars(description = "stream identifier (numeric, optional)")] - pub stream_id: Option<u32>, } #[derive(Debug, Deserialize, JsonSchema)] @@ -88,9 +86,6 @@ pub struct CreateTopic { #[schemars(description = "replication factor (optional, must be greater than 0)")] pub replication_factor: Option<u8>, - #[schemars(description = "topic identifier (numeric, optional)")] - pub topic_id: Option<u32>, - #[schemars(description = "message expiry (optional)")] pub message_expiry: Option<String>, @@ -282,9 +277,6 @@ pub struct CreateConsumerGroup { #[schemars(description = "consumer group name (required, must be unique)")] pub name: String, - - #[schemars(description = "consumer group identifier (optional, number)")] - pub group_id: Option<u32>, } #[derive(Debug, Deserialize, JsonSchema)] diff --git a/core/bench/src/benchmarks/benchmark.rs b/core/bench/src/benchmarks/benchmark.rs index 77179033..37e65b78 100644 --- a/core/bench/src/benchmarks/benchmark.rs +++ b/core/bench/src/benchmarks/benchmark.rs @@ -106,7 +106,7 @@ pub trait Benchmarkable: Send { let stream_id: Identifier = stream_name.as_str().try_into()?; if streams.iter().all(|s| s.name != stream_name) { info!("Creating the test stream '{}'", stream_name); - client.create_stream(&stream_name, None).await?; + client.create_stream(&stream_name).await?; let topic_name = "topic-1".to_string(); let max_topic_size = self .args() @@ -125,7 +125,6 @@ pub trait Benchmarkable: Send { partitions_count, CompressionAlgorithm::default(), None, - None, IggyExpiry::NeverExpire, max_topic_size, ) diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index d9d19991..6b588646 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -84,15 +84,14 @@ pub async fn init_consumer_groups( let topic_id: Identifier = "topic-1".try_into()?; let consumer_group_name = format!("{CONSUMER_GROUP_NAME_PREFIX}-{consumer_group_id}"); info!( - "Creating test consumer group: name={}, id={}, stream={}, topic={}", - consumer_group_name, consumer_group_id, stream_name, topic_id + "Creating test consumer group: name={}, stream={}, topic={}", + consumer_group_name, stream_name, topic_id ); match client .create_consumer_group( &stream_id, &topic_id, &consumer_group_name, - Some(consumer_group_id), ) .await { diff --git a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs index ed90c5b9..34e2106a 100644 --- a/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs +++ b/core/binary_protocol/src/cli/binary_consumer_groups/create_consumer_group.rs @@ -33,7 +33,6 @@ impl CreateConsumerGroupCmd { stream_id: Identifier, topic_id: Identifier, name: String, - _group_id: Option<u32>, ) -> Self { Self { create_consumer_group: CreateConsumerGroup { diff --git a/core/binary_protocol/src/cli/binary_streams/create_stream.rs b/core/binary_protocol/src/cli/binary_streams/create_stream.rs index ed7472c1..9fe750f8 100644 --- a/core/binary_protocol/src/cli/binary_streams/create_stream.rs +++ b/core/binary_protocol/src/cli/binary_streams/create_stream.rs @@ -28,7 +28,7 @@ pub struct CreateStreamCmd { } impl CreateStreamCmd { - pub fn new(_stream_id: Option<u32>, name: String) -> Self { + pub fn new(name: String) -> Self { Self { create_stream: CreateStream { name }, } diff --git a/core/cli/src/main.rs b/core/cli/src/main.rs index d21536bb..8451b0d6 100644 --- a/core/cli/src/main.rs +++ b/core/cli/src/main.rs @@ -107,7 +107,7 @@ fn get_command( match command { Command::Stream(command) => match command { StreamAction::Create(args) => { - Box::new(CreateStreamCmd::new(args.stream_id, args.name.clone())) + Box::new(CreateStreamCmd::new(args.name.clone())) } StreamAction::Delete(args) => Box::new(DeleteStreamCmd::new(args.stream_id.clone())), StreamAction::Update(args) => Box::new(UpdateStreamCmd::new( @@ -121,7 +121,6 @@ fn get_command( Command::Topic(command) => match command { TopicAction::Create(args) => Box::new(CreateTopicCmd::new( args.stream_id.clone(), - args.topic_id, args.partitions_count, args.compression_algorithm, args.name.clone(), @@ -252,7 +251,6 @@ fn get_command( create_args.stream_id.clone(), create_args.topic_id.clone(), create_args.name.clone(), - create_args.group_id, )), ConsumerGroupAction::Delete(delete_args) => Box::new(DeleteConsumerGroupCmd::new( delete_args.stream_id.clone(), diff --git a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs index 1b297228..bd2a08f2 100644 --- a/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_join_scenario.rs @@ -42,37 +42,41 @@ pub async fn run(client_factory: &dyn ClientFactory) { login_root(&system_client).await; // 1. Create the stream - system_client - .create_stream(STREAM_NAME, Some(STREAM_ID)) + let stream = system_client + .create_stream(STREAM_NAME) .await .unwrap(); + let stream_id = stream.id; + // 2. Create the topic - system_client + let topic = system_client .create_topic( &Identifier::named(STREAM_NAME).unwrap(), TOPIC_NAME, PARTITIONS_COUNT, CompressionAlgorithm::default(), None, - Some(TOPIC_ID), IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) .await .unwrap(); + let topic_id = topic.id; + // 3. Create the consumer group - system_client + let consumer_group = system_client .create_consumer_group( &Identifier::named(STREAM_NAME).unwrap(), &Identifier::named(TOPIC_NAME).unwrap(), CONSUMER_GROUP_NAME, - Some(CONSUMER_GROUP_ID), ) .await .unwrap(); + let consumer_group_id = consumer_group.id; + // 4. Create the users for all clients create_user(&system_client, USERNAME_1).await; create_user(&system_client, USERNAME_2).await; @@ -87,10 +91,10 @@ pub async fn run(client_factory: &dyn ClientFactory) { join_consumer_group(&client1).await; // 5. Get client1 info and validate that it contains the single consumer group - let client1_info = get_me_and_validate_consumer_groups(&client1).await; + let client1_info = get_me_and_validate_consumer_groups(&client1, stream_id, topic_id, consumer_group_id).await; // 6. Validate that the consumer group has 1 member and this member has all partitions assigned - let consumer_group = get_consumer_group_and_validate_members(&system_client, 1).await; + let consumer_group = get_consumer_group_and_validate_members(&system_client, 1, consumer_group_id).await; let member = &consumer_group.members[0]; assert_eq!(member.id, client1_info.client_id); assert_eq!(member.partitions_count, PARTITIONS_COUNT); @@ -100,10 +104,10 @@ pub async fn run(client_factory: &dyn ClientFactory) { join_consumer_group(&client2).await; // 8. Validate that client 2 contains the single consumer group - get_me_and_validate_consumer_groups(&client2).await; + get_me_and_validate_consumer_groups(&client2, stream_id, topic_id, consumer_group_id).await; // 9. Validate that the consumer group has 2 members and partitions are distributed between them - let consumer_group = get_consumer_group_and_validate_members(&system_client, 2).await; + let consumer_group = get_consumer_group_and_validate_members(&system_client, 2, consumer_group_id).await; let member1 = &consumer_group.members[0]; let member2 = &consumer_group.members[1]; assert!(member1.partitions_count >= 1 && member1.partitions_count < PARTITIONS_COUNT); @@ -117,10 +121,10 @@ pub async fn run(client_factory: &dyn ClientFactory) { join_consumer_group(&client3).await; // 11. Validate that client 3 contains the single consumer group - get_me_and_validate_consumer_groups(&client3).await; + get_me_and_validate_consumer_groups(&client3, stream_id, topic_id, consumer_group_id).await; // 12. Validate that the consumer group has 3 members and partitions are equally distributed between them - let consumer_group = get_consumer_group_and_validate_members(&system_client, 3).await; + let consumer_group = get_consumer_group_and_validate_members(&system_client, 3, consumer_group_id).await; let member1 = &consumer_group.members[0]; let member2 = &consumer_group.members[1]; let member3 = &consumer_group.members[2]; @@ -135,7 +139,12 @@ pub async fn run(client_factory: &dyn ClientFactory) { assert_clean_system(&system_client).await; } -async fn get_me_and_validate_consumer_groups(client: &IggyClient) -> ClientInfoDetails { +async fn get_me_and_validate_consumer_groups( + client: &IggyClient, + stream_id: u32, + topic_id: u32, + consumer_group_id: u32 +) -> ClientInfoDetails { let client_info = client.get_me().await.unwrap(); assert!(client_info.client_id > 0); @@ -143,9 +152,9 @@ async fn get_me_and_validate_consumer_groups(client: &IggyClient) -> ClientInfoD assert_eq!(client_info.consumer_groups.len(), 1); let consumer_group = &client_info.consumer_groups[0]; - assert_eq!(consumer_group.stream_id, STREAM_ID); - assert_eq!(consumer_group.topic_id, TOPIC_ID); - assert_eq!(consumer_group.group_id, CONSUMER_GROUP_ID); + assert_eq!(consumer_group.stream_id, stream_id); + assert_eq!(consumer_group.topic_id, topic_id); + assert_eq!(consumer_group.group_id, consumer_group_id); client_info } @@ -153,6 +162,7 @@ async fn get_me_and_validate_consumer_groups(client: &IggyClient) -> ClientInfoD async fn get_consumer_group_and_validate_members( client: &IggyClient, members_count: u32, + consumer_group_id: u32, ) -> ConsumerGroupDetails { let consumer_group = client .get_consumer_group( @@ -164,7 +174,7 @@ async fn get_consumer_group_and_validate_members( .unwrap() .expect("Failed to get consumer group"); - assert_eq!(consumer_group.id, CONSUMER_GROUP_ID); + assert_eq!(consumer_group.id, consumer_group_id); assert_eq!(consumer_group.name, CONSUMER_GROUP_NAME); assert_eq!(consumer_group.partitions_count, PARTITIONS_COUNT); assert_eq!(consumer_group.members_count, members_count); diff --git a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs index 1d3bc543..f1e42c15 100644 --- a/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_with_multiple_clients_polling_messages_scenario.rs @@ -51,7 +51,7 @@ async fn init_system( ) { // 1. Create the stream system_client - .create_stream(STREAM_NAME, Some(STREAM_ID)) + .create_stream(STREAM_NAME) .await .unwrap(); @@ -63,7 +63,6 @@ async fn init_system( PARTITIONS_COUNT, CompressionAlgorithm::default(), None, - Some(TOPIC_ID), IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -76,7 +75,6 @@ async fn init_system( &Identifier::named(STREAM_NAME).unwrap(), &Identifier::named(TOPIC_NAME).unwrap(), CONSUMER_GROUP_NAME, - Some(CONSUMER_GROUP_ID), ) .await .unwrap(); @@ -136,7 +134,7 @@ async fn execute_using_messages_key_key( } async fn poll_messages(client: &IggyClient) -> u32 { - let consumer = Consumer::group(Identifier::numeric(CONSUMER_GROUP_ID).unwrap()); + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); let mut total_read_messages_count = 0; for _ in 1..=PARTITIONS_COUNT * MESSAGES_COUNT { let polled_messages = client @@ -202,7 +200,7 @@ async fn execute_using_none_key( } async fn validate_message_polling(client: &IggyClient, consumer_group: &ConsumerGroupDetails) { - let consumer = Consumer::group(Identifier::numeric(CONSUMER_GROUP_ID).unwrap()); + let consumer = Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap()); let client_info = client.get_me().await.unwrap(); let consumer_group_member = consumer_group .members diff --git a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs index 4933a7dc..ff3b0aff 100644 --- a/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs +++ b/core/integration/tests/server/scenarios/consumer_group_with_single_client_polling_messages_scenario.rs @@ -78,7 +78,6 @@ async fn init_system(client: &IggyClient) { let consumer_group_info = get_consumer_group(client).await; let client_info = client.get_me().await.unwrap(); - assert_eq!(consumer_group_info.id, CONSUMER_GROUP_ID); assert_eq!(consumer_group_info.members_count, 1); assert_eq!(consumer_group_info.members.len(), 1); diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 50029a8d..c63cc212 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -66,7 +66,7 @@ async fn join_consumer_group(client: &IggyClient) { client .join_consumer_group( &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::numeric(TOPIC_ID).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), ) .await @@ -77,7 +77,7 @@ async fn leave_consumer_group(client: &IggyClient) { client .leave_consumer_group( &Identifier::named(STREAM_NAME).unwrap(), - &Identifier::numeric(TOPIC_ID).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), &Identifier::named(CONSUMER_GROUP_NAME).unwrap(), ) .await diff --git a/core/tools/src/data-seeder/seeder.rs b/core/tools/src/data-seeder/seeder.rs index 8691b534..ec536317 100644 --- a/core/tools/src/data-seeder/seeder.rs +++ b/core/tools/src/data-seeder/seeder.rs @@ -21,36 +21,41 @@ use rand::Rng; use std::collections::HashMap; use std::str::FromStr; -const PROD_STREAM_ID: u32 = 1; -const TEST_STREAM_ID: u32 = 2; -const DEV_STREAM_ID: u32 = 3; +const PROD_STREAM_NAME: &str = "prod"; +const TEST_STREAM_NAME: &str = "test"; +const DEV_STREAM_NAME: &str = "dev"; pub async fn seed(client: &IggyClient) -> Result<(), IggyError> { - create_streams(client).await?; - create_topics(client).await?; - send_messages(client).await?; + let streams = create_streams(client).await?; + create_topics(client, &streams).await?; + send_messages(client, &streams).await?; Ok(()) } -async fn create_streams(client: &IggyClient) -> Result<(), IggyError> { - client.create_stream("prod", Some(PROD_STREAM_ID)).await?; - client.create_stream("test", Some(TEST_STREAM_ID)).await?; - client.create_stream("dev", Some(DEV_STREAM_ID)).await?; - Ok(()) +async fn create_streams(client: &IggyClient) -> Result<Vec<(String, u32)>, IggyError> { + let mut streams = Vec::new(); + + let prod_stream = client.create_stream(PROD_STREAM_NAME).await?; + streams.push((PROD_STREAM_NAME.to_string(), prod_stream.id)); + + let test_stream = client.create_stream(TEST_STREAM_NAME).await?; + streams.push((TEST_STREAM_NAME.to_string(), test_stream.id)); + + let dev_stream = client.create_stream(DEV_STREAM_NAME).await?; + streams.push((DEV_STREAM_NAME.to_string(), dev_stream.id)); + + Ok(streams) } -async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { - let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID]; - for stream_id in streams { - let stream_id = stream_id.try_into()?; +async fn create_topics(client: &IggyClient, streams: &[(String, u32)]) -> Result<(), IggyError> { + for (stream_name, _stream_id) in streams { client .create_topic( - &stream_id, + &Identifier::named(stream_name).unwrap(), "orders", 1, Default::default(), None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -58,12 +63,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { client .create_topic( - &stream_id, + &Identifier::named(stream_name).unwrap(), "users", 2, Default::default(), None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -71,12 +75,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { client .create_topic( - &stream_id, + &Identifier::named(stream_name).unwrap(), "notifications", 3, Default::default(), None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -84,12 +87,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { client .create_topic( - &stream_id, + &Identifier::named(stream_name).unwrap(), "payments", 2, Default::default(), None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -97,12 +99,11 @@ async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { client .create_topic( - &stream_id, + &Identifier::named(stream_name).unwrap(), "deliveries", 1, Default::default(), None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) @@ -111,9 +112,8 @@ async fn create_topics(client: &IggyClient) -> Result<(), IggyError> { Ok(()) } -async fn send_messages(client: &IggyClient) -> Result<(), IggyError> { +async fn send_messages(client: &IggyClient, streams: &[(String, u32)]) -> Result<(), IggyError> { let mut rng = rand::rng(); - let streams = [PROD_STREAM_ID, TEST_STREAM_ID, DEV_STREAM_ID]; let partitioning = Partitioning::balanced(); let message_batches_range = 100..=1000; @@ -122,18 +122,19 @@ async fn send_messages(client: &IggyClient) -> Result<(), IggyError> { let mut total_messages_sent = 0; let mut total_batches_sent = 0; - for (stream_idx, stream_id) in streams.iter().enumerate() { - let stream_id_identifier = (*stream_id).try_into()?; + for (stream_idx, (stream_name, stream_id)) in streams.iter().enumerate() { + let stream_id_identifier = Identifier::named(stream_name).unwrap(); let topics = client.get_topics(&stream_id_identifier).await?; tracing::info!( - "Processing stream {} ({}/{})", + "Processing stream {} ({}) ({}/{})", + stream_name, stream_id, stream_idx + 1, streams.len() ); for (topic_idx, topic) in topics.iter().enumerate() { - let topic_id = topic.id.try_into()?; + let topic_id_identifier = Identifier::named(&topic.name).unwrap(); let message_batches = rng.random_range(message_batches_range.clone()); tracing::info!( @@ -183,7 +184,7 @@ async fn send_messages(client: &IggyClient) -> Result<(), IggyError> { client .send_messages( &stream_id_identifier, - &topic_id, + &topic_id_identifier, &partitioning, &mut messages, ) diff --git a/examples/rust/src/getting-started/producer/main.rs b/examples/rust/src/getting-started/producer/main.rs index 60e8a1f4..6114bc74 100644 --- a/examples/rust/src/getting-started/producer/main.rs +++ b/examples/rust/src/getting-started/producer/main.rs @@ -25,9 +25,9 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry}; -const STREAM_ID: u32 = 1; -const TOPIC_ID: u32 = 1; -const PARTITION_ID: u32 = 1; +const STREAM_NAME: &str = "sample-stream"; +const TOPIC_NAME: &str = "sample-topic"; +const PARTITION_ID: u32 = 0; const BATCHES_LIMIT: u32 = 5; #[tokio::main] @@ -50,41 +50,65 @@ async fn main() -> Result<(), Box<dyn Error>> { client .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) .await?; - init_system(&client).await; - produce_messages(&client).await + let (stream_id, topic_id) = init_system(&client).await; + produce_messages(&client, stream_id, topic_id).await } -async fn init_system(client: &IggyClient) { - match client.create_stream("sample-stream", Some(STREAM_ID)).await { - Ok(_) => info!("Stream was created."), - Err(_) => warn!("Stream already exists and will not be created again."), - } +async fn init_system(client: &IggyClient) -> (u32, u32) { + let stream = match client.create_stream(STREAM_NAME).await { + Ok(stream) => { + info!("Stream was created."); + stream + } + Err(_) => { + warn!("Stream already exists and will not be created again."); + client.get_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap() + .expect("Failed to get stream") + } + }; - match client + let topic = match client .create_topic( - &STREAM_ID.try_into().unwrap(), - "sample-topic", + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, 1, CompressionAlgorithm::default(), None, - Some(TOPIC_ID), IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, ) .await { - Ok(_) => info!("Topic was created."), - Err(_) => warn!("Topic already exists and will not be created again."), - } + Ok(topic) => { + info!("Topic was created."); + topic + } + Err(_) => { + warn!("Topic already exists and will not be created again."); + client.get_topic( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + ) + .await + .unwrap() + .expect("Failed to get topic") + } + }; + + (stream.id, topic.id) } -async fn produce_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> { +async fn produce_messages(client: &dyn Client, stream_id: u32, topic_id: u32) -> Result<(), Box<dyn Error>> { let duration = IggyDuration::from_str("500ms")?; let mut interval = tokio::time::interval(duration.get_duration()); info!( - "Messages will be sent to stream: {}, topic: {}, partition: {} with interval {}.", - STREAM_ID, - TOPIC_ID, + "Messages will be sent to stream: {} ({}), topic: {} ({}), partition: {} with interval {}.", + STREAM_NAME, + stream_id, + TOPIC_NAME, + topic_id, PARTITION_ID, duration.as_human_time_string() ); @@ -109,8 +133,8 @@ async fn produce_messages(client: &dyn Client) -> Result<(), Box<dyn Error>> { } client .send_messages( - &STREAM_ID.try_into().unwrap(), - &TOPIC_ID.try_into().unwrap(), + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), &partitioning, &mut messages, ) diff --git a/examples/rust/src/multi-tenant/producer/main.rs b/examples/rust/src/multi-tenant/producer/main.rs index 2a9682de..ec46086c 100644 --- a/examples/rust/src/multi-tenant/producer/main.rs +++ b/examples/rust/src/multi-tenant/producer/main.rs @@ -327,7 +327,7 @@ async fn create_stream_and_user( username: &str, client: &IggyClient, ) -> Result<(), IggyError> { - let stream = client.create_stream(stream_name, None).await?; + let stream = client.create_stream(stream_name).await?; info!("Created stream: {stream_name} with ID: {}", stream.id); let mut streams_permissions = AHashMap::new(); streams_permissions.insert( diff --git a/examples/rust/src/shared/system.rs b/examples/rust/src/shared/system.rs index 30aa382b..8c53722d 100644 --- a/examples/rust/src/shared/system.rs +++ b/examples/rust/src/shared/system.rs @@ -82,7 +82,7 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Ig } info!("Stream does not exist, creating..."); - client.create_stream(&args.stream_id, None).await?; + client.create_stream(&args.stream_id).await?; client .create_topic( &stream_id, @@ -90,7 +90,6 @@ pub async fn init_by_producer(args: &Args, client: &dyn Client) -> Result<(), Ig args.partitions_count, CompressionAlgorithm::from_code(args.compression_algorithm)?, None, - None, IggyExpiry::NeverExpire, MaxTopicSize::ServerDefault, )
