This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 4c173fccb fix(bench): use correct topic name and partitioning in high
level benchmarks (#2553)
4c173fccb is described below
commit 4c173fccbb5cccae35a7b7e7940863cb888fc2d6
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 12 08:36:16 2026 +0100
fix(bench): use correct topic name and partitioning in high level
benchmarks (#2553)
---
.../bench/src/actors/producer/client/high_level.rs | 46 ++++++++++++++--------
1 file changed, 30 insertions(+), 16 deletions(-)
diff --git a/core/bench/src/actors/producer/client/high_level.rs
b/core/bench/src/actors/producer/client/high_level.rs
index 5dd37f679..7544d2340 100644
--- a/core/bench/src/actors/producer/client/high_level.rs
+++ b/core/bench/src/actors/producer/client/high_level.rs
@@ -16,12 +16,15 @@
* under the License.
*/
-use crate::actors::{
- ApiLabel, BatchMetrics, BenchmarkInit,
- producer::client::{
- BenchmarkProducerClient,
- interface::{BenchmarkProducerConfig, ProducerClient},
+use crate::{
+ actors::{
+ ApiLabel, BatchMetrics, BenchmarkInit,
+ producer::client::{
+ BenchmarkProducerClient,
+ interface::{BenchmarkProducerConfig, ProducerClient},
+ },
},
+ utils::batch_generator::BenchmarkBatchGenerator,
};
use iggy::prelude::*;
use integration::test_server::{ClientFactory, login_root};
@@ -43,27 +46,32 @@ impl HighLevelProducerClient {
}
}
}
-#[allow(clippy::significant_drop_tightening)]
+
impl ProducerClient for HighLevelProducerClient {
async fn produce_batch(
&mut self,
- batch_generator: &mut
crate::utils::batch_generator::BenchmarkBatchGenerator,
+ batch_generator: &mut BenchmarkBatchGenerator,
) -> Result<Option<BatchMetrics>, IggyError> {
- let producer = self.producer.as_mut().expect("Producer not
initialized");
-
let batch = batch_generator.generate_owned_batch();
if batch.messages.is_empty() {
return Ok(None);
}
let message_count = u32::try_from(batch.messages.len()).unwrap();
+ let user_data_bytes = batch.user_data_bytes;
+ let total_bytes = batch.total_bytes;
+
let before_send = Instant::now();
- producer.send(batch.messages).await?;
+ self.producer
+ .as_mut()
+ .expect("Producer not initialized")
+ .send(batch.messages)
+ .await?;
let latency = before_send.elapsed();
Ok(Some(BatchMetrics {
messages: message_count,
- user_data_bytes: batch.user_data_bytes,
- total_bytes: batch.total_bytes,
+ user_data_bytes,
+ total_bytes,
latency,
}))
}
@@ -71,18 +79,25 @@ impl ProducerClient for HighLevelProducerClient {
impl BenchmarkInit for HighLevelProducerClient {
async fn setup(&mut self) -> Result<(), IggyError> {
- let topic_id: u32 = 1;
+ let topic_id_str = "topic-1";
+ let default_partition_id = 0u32;
let client = self.client_factory.create_client().await;
let client = IggyClient::create(client, None, None);
login_root(&client).await;
let stream_id_str = self.config.stream_id.clone();
- let topic_id_str = topic_id.to_string();
+
+ let partitioning = match self.config.partitions {
+ 0 => panic!("Partition count must be greater than 0"),
+ 1 => Partitioning::partition_id(default_partition_id),
+ _ => Partitioning::balanced(),
+ };
self.producer = Some(
client
- .producer(&stream_id_str, &topic_id_str)?
+ .producer(&stream_id_str, topic_id_str)?
+ .partitioning(partitioning)
.create_stream_if_not_exists()
.create_topic_if_not_exists(
self.config.partitions,
@@ -92,7 +107,6 @@ impl BenchmarkInit for HighLevelProducerClient {
)
.build(),
);
-
self.producer.as_mut().unwrap().init().await?;
Ok(())
}