This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c173fccb fix(bench): use correct topic name and partitioning in high 
level benchmarks (#2553)
4c173fccb is described below

commit 4c173fccbb5cccae35a7b7e7940863cb888fc2d6
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 12 08:36:16 2026 +0100

    fix(bench): use correct topic name and partitioning in high level 
benchmarks (#2553)
---
 .../bench/src/actors/producer/client/high_level.rs | 46 ++++++++++++++--------
 1 file changed, 30 insertions(+), 16 deletions(-)

diff --git a/core/bench/src/actors/producer/client/high_level.rs 
b/core/bench/src/actors/producer/client/high_level.rs
index 5dd37f679..7544d2340 100644
--- a/core/bench/src/actors/producer/client/high_level.rs
+++ b/core/bench/src/actors/producer/client/high_level.rs
@@ -16,12 +16,15 @@
  * under the License.
  */
 
-use crate::actors::{
-    ApiLabel, BatchMetrics, BenchmarkInit,
-    producer::client::{
-        BenchmarkProducerClient,
-        interface::{BenchmarkProducerConfig, ProducerClient},
+use crate::{
+    actors::{
+        ApiLabel, BatchMetrics, BenchmarkInit,
+        producer::client::{
+            BenchmarkProducerClient,
+            interface::{BenchmarkProducerConfig, ProducerClient},
+        },
     },
+    utils::batch_generator::BenchmarkBatchGenerator,
 };
 use iggy::prelude::*;
 use integration::test_server::{ClientFactory, login_root};
@@ -43,27 +46,32 @@ impl HighLevelProducerClient {
         }
     }
 }
-#[allow(clippy::significant_drop_tightening)]
+
 impl ProducerClient for HighLevelProducerClient {
     async fn produce_batch(
         &mut self,
-        batch_generator: &mut 
crate::utils::batch_generator::BenchmarkBatchGenerator,
+        batch_generator: &mut BenchmarkBatchGenerator,
     ) -> Result<Option<BatchMetrics>, IggyError> {
-        let producer = self.producer.as_mut().expect("Producer not 
initialized");
-
         let batch = batch_generator.generate_owned_batch();
         if batch.messages.is_empty() {
             return Ok(None);
         }
         let message_count = u32::try_from(batch.messages.len()).unwrap();
+        let user_data_bytes = batch.user_data_bytes;
+        let total_bytes = batch.total_bytes;
+
         let before_send = Instant::now();
-        producer.send(batch.messages).await?;
+        self.producer
+            .as_mut()
+            .expect("Producer not initialized")
+            .send(batch.messages)
+            .await?;
         let latency = before_send.elapsed();
 
         Ok(Some(BatchMetrics {
             messages: message_count,
-            user_data_bytes: batch.user_data_bytes,
-            total_bytes: batch.total_bytes,
+            user_data_bytes,
+            total_bytes,
             latency,
         }))
     }
@@ -71,18 +79,25 @@ impl ProducerClient for HighLevelProducerClient {
 
 impl BenchmarkInit for HighLevelProducerClient {
     async fn setup(&mut self) -> Result<(), IggyError> {
-        let topic_id: u32 = 1;
+        let topic_id_str = "topic-1";
+        let default_partition_id = 0u32;
 
         let client = self.client_factory.create_client().await;
         let client = IggyClient::create(client, None, None);
         login_root(&client).await;
 
         let stream_id_str = self.config.stream_id.clone();
-        let topic_id_str = topic_id.to_string();
+
+        let partitioning = match self.config.partitions {
+            0 => panic!("Partition count must be greater than 0"),
+            1 => Partitioning::partition_id(default_partition_id),
+            _ => Partitioning::balanced(),
+        };
 
         self.producer = Some(
             client
-                .producer(&stream_id_str, &topic_id_str)?
+                .producer(&stream_id_str, topic_id_str)?
+                .partitioning(partitioning)
                 .create_stream_if_not_exists()
                 .create_topic_if_not_exists(
                     self.config.partitions,
@@ -92,7 +107,6 @@ impl BenchmarkInit for HighLevelProducerClient {
                 )
                 .build(),
         );
-
         self.producer.as_mut().unwrap().init().await?;
         Ok(())
     }

Reply via email to