This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 745899dbbe53a8a162d79e17a77160aece496754
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sun Mar 16 18:15:25 2025 +0100

    feat(bench): improve benchmark by parallelizing actors (#1645)
    
    This PR improves our benchmark suite, by parallelizing benchmark actors
    using `JoinSet` rather than `select`
---
 Cargo.lock                                         |  3 +-
 bench/Cargo.toml                                   |  3 +-
 bench/src/actors/consumer.rs                       |  2 +-
 bench/src/actors/producer.rs                       |  2 +-
 bench/src/actors/producing_consumer.rs             |  2 +-
 bench/src/benchmarks/benchmark.rs                  | 13 +++---
 bench/src/benchmarks/consumer_benchmark.rs         | 18 +++++---
 bench/src/benchmarks/consumer_group_benchmark.rs   | 19 +++++---
 .../benchmarks/producer_and_consumer_benchmark.rs  | 21 +++++----
 .../producer_and_consumer_group_benchmark.rs       | 23 +++++-----
 bench/src/benchmarks/producer_benchmark.rs         | 17 +++++---
 .../src/benchmarks/producing_consumer_benchmark.rs | 18 +++++---
 .../producing_consumer_group_benchmark.rs          | 16 ++++---
 bench/src/runner.rs                                | 12 ++---
 sdk/src/models/messaging/message_view.rs           |  4 --
 server/src/streaming/partitions/messages.rs        |  3 +-
 server/src/streaming/segments/segment.rs           | 51 ++++++++++------------
 17 files changed, 119 insertions(+), 108 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6cee216b..78bbc4a3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -607,7 +607,7 @@ dependencies = [
 
 [[package]]
 name = "bench"
-version = "0.2.3"
+version = "0.2.4"
 dependencies = [
  "async-trait",
  "atomic-time",
@@ -616,7 +616,6 @@ dependencies = [
  "chrono",
  "clap",
  "figlet-rs",
- "futures",
  "hostname",
  "human-repr",
  "iggy",
diff --git a/bench/Cargo.toml b/bench/Cargo.toml
index ff919c28..ea88d38e 100644
--- a/bench/Cargo.toml
+++ b/bench/Cargo.toml
@@ -1,6 +1,6 @@
 [package]
 name = "bench"
-version = "0.2.3"
+version = "0.2.4"
 edition = "2021"
 license = "Apache-2.0"
 # Due to dependency to integration, which has a dependency to server, setting
@@ -14,7 +14,6 @@ charming = "0.4.0"
 chrono = "0.4.39"
 clap = { version = "4.5.30", features = ["derive"] }
 figlet-rs = "0.1.5"
-futures = "0.3.31"
 hostname = "0.4.0"
 human-repr = "1.1.0"
 iggy = { path = "../sdk" }
diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs
index 0f5860fb..e919067a 100644
--- a/bench/src/actors/consumer.rs
+++ b/bench/src/actors/consumer.rs
@@ -75,7 +75,7 @@ impl Consumer {
         }
     }
 
-    pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+    pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
         let topic_id: u32 = 1;
         let default_partition_id: u32 = 1;
         let message_batches = self.message_batches as u64;
diff --git a/bench/src/actors/producer.rs b/bench/src/actors/producer.rs
index 4cca4713..affbd457 100644
--- a/bench/src/actors/producer.rs
+++ b/bench/src/actors/producer.rs
@@ -71,7 +71,7 @@ impl Producer {
         }
     }
 
-    pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+    pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
         let topic_id: u32 = 1;
         let default_partition_id: u32 = 1;
         let partitions_count = self.partitions_count;
diff --git a/bench/src/actors/producing_consumer.rs 
b/bench/src/actors/producing_consumer.rs
index e0cd55a8..0517c2a3 100644
--- a/bench/src/actors/producing_consumer.rs
+++ b/bench/src/actors/producing_consumer.rs
@@ -82,7 +82,7 @@ impl ProducingConsumer {
         }
     }
 
-    pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> {
+    pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> {
         let topic_id: u32 = 1;
         let default_partition_id: u32 = 1;
         let message_batches = self.message_batches;
diff --git a/bench/src/benchmarks/benchmark.rs 
b/bench/src/benchmarks/benchmark.rs
index be87d3c8..cc2f8fd4 100644
--- a/bench/src/benchmarks/benchmark.rs
+++ b/bench/src/benchmarks/benchmark.rs
@@ -1,7 +1,6 @@
 use crate::args::kind::BenchmarkKindCommand;
 use crate::{args::common::IggyBenchArgs, 
utils::client_factory::create_client_factory};
 use async_trait::async_trait;
-use futures::Future;
 use iggy::client::{StreamClient, TopicClient};
 use iggy::clients::client::IggyClient;
 use iggy::compression::compression_algorithm::CompressionAlgorithm;
@@ -11,7 +10,8 @@ use iggy::utils::topic_size::MaxTopicSize;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
 use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::{login_root, ClientFactory};
-use std::{pin::Pin, sync::Arc};
+use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::info;
 
 use super::consumer_benchmark::ConsumerBenchmark;
@@ -22,11 +22,6 @@ use super::producer_benchmark::ProducerBenchmark;
 use super::producing_consumer_benchmark::EndToEndProducingConsumerBenchmark;
 use 
super::producing_consumer_group_benchmark::EndToEndProducingConsumerGroupBenchmark;
 
-pub type BenchmarkFutures = Result<
-    Vec<Pin<Box<dyn Future<Output = Result<BenchmarkIndividualMetrics, 
IggyError>> + Send>>>,
-    IggyError,
->;
-
 impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
     fn from(args: IggyBenchArgs) -> Self {
         let client_factory = create_client_factory(&args);
@@ -63,7 +58,9 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
 
 #[async_trait]
 pub trait Benchmarkable {
-    async fn run(&mut self) -> BenchmarkFutures;
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError>;
     fn kind(&self) -> BenchmarkKind;
     fn args(&self) -> &IggyBenchArgs;
     fn client_factory(&self) -> &Arc<dyn ClientFactory>;
diff --git a/bench/src/benchmarks/consumer_benchmark.rs 
b/bench/src/benchmarks/consumer_benchmark.rs
index 33c92521..c5fffdd6 100644
--- a/bench/src/benchmarks/consumer_benchmark.rs
+++ b/bench/src/benchmarks/consumer_benchmark.rs
@@ -1,13 +1,16 @@
 use crate::actors::consumer::Consumer;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable};
+use crate::benchmarks::benchmark::Benchmarkable;
 use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
+use iggy::error::IggyError;
 use iggy::messages::PollingKind;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::ClientFactory;
 use std::sync::atomic::AtomicI64;
 use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::info;
 
 pub struct ConsumerBenchmark {
@@ -26,14 +29,16 @@ impl ConsumerBenchmark {
 
 #[async_trait]
 impl Benchmarkable for ConsumerBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.check_streams().await?;
         let consumers_count = self.args.consumers();
         info!("Creating {} consumer(s)...", consumers_count);
         let messages_per_batch = self.args.messages_per_batch();
         let message_batches = self.args.message_batches();
 
-        let mut futures: BenchmarkFutures = 
Ok(Vec::with_capacity(consumers_count as usize));
+        let mut set = JoinSet::new();
         for consumer_id in 1..=consumers_count {
             let args = self.args.clone();
             let client_factory = self.client_factory.clone();
@@ -76,12 +81,11 @@ impl Benchmarkable for ConsumerBenchmark {
                 args.rate_limit()
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
-
-            let future = Box::pin(async move { consumer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(consumer.run());
         }
+
         info!("Created {} consumer(s).", consumers_count);
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs 
b/bench/src/benchmarks/consumer_group_benchmark.rs
index 697fd9b6..42195f56 100644
--- a/bench/src/benchmarks/consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/consumer_group_benchmark.rs
@@ -1,4 +1,4 @@
-use super::benchmark::{BenchmarkFutures, Benchmarkable};
+use super::benchmark::Benchmarkable;
 use crate::{
     actors::consumer::Consumer,
     args::common::IggyBenchArgs,
@@ -10,9 +10,12 @@ use iggy::{
     client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError,
     messages::PollingKind,
 };
-use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::{
+    benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics,
+};
 use integration::test_server::{login_root, ClientFactory};
 use std::sync::{atomic::AtomicI64, Arc};
+use tokio::task::JoinSet;
 use tracing::{error, info};
 
 pub struct ConsumerGroupBenchmark {
@@ -69,7 +72,9 @@ impl ConsumerGroupBenchmark {
 
 #[async_trait]
 impl Benchmarkable for ConsumerGroupBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.check_streams().await?;
         let consumer_groups_count = self.args.number_of_consumer_groups();
         self.init_consumer_groups(consumer_groups_count)
@@ -81,11 +86,11 @@ impl Benchmarkable for ConsumerGroupBenchmark {
         let consumers = self.args.consumers();
         let messages_per_batch = self.args.messages_per_batch();
         let warmup_time = self.args.warmup_time();
-        let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) 
as usize));
         let polling_kind = PollingKind::Next;
         let message_batches = self.args.message_batches();
         let total_message_batches = Arc::new(AtomicI64::new((message_batches * 
consumers) as i64));
 
+        let mut set = JoinSet::new();
         for consumer_id in 1..=consumers {
             let consumer_group_id =
                 start_consumer_group_id + 1 + (consumer_id % 
consumer_groups_count);
@@ -110,14 +115,14 @@ impl Benchmarkable for ConsumerGroupBenchmark {
                     .rate_limit()
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
-            let future = Box::pin(async move { consumer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(consumer.run());
         }
+
         info!(
             "Starting consumer group benchmark with {} messages",
             self.total_messages()
         );
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/producer_and_consumer_benchmark.rs 
b/bench/src/benchmarks/producer_and_consumer_benchmark.rs
index 1c4a3fd8..6bfe6bee 100644
--- a/bench/src/benchmarks/producer_and_consumer_benchmark.rs
+++ b/bench/src/benchmarks/producer_and_consumer_benchmark.rs
@@ -1,14 +1,17 @@
 use crate::actors::consumer::Consumer;
 use crate::actors::producer::Producer;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable};
+use crate::benchmarks::benchmark::Benchmarkable;
 use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
+use iggy::error::IggyError;
 use iggy::messages::PollingKind;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::ClientFactory;
 use std::sync::atomic::AtomicI64;
 use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::info;
 
 pub struct ProducerAndConsumerBenchmark {
@@ -27,7 +30,9 @@ impl ProducerAndConsumerBenchmark {
 
 #[async_trait]
 impl Benchmarkable for ProducerAndConsumerBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.init_streams().await.expect("Failed to init streams!");
         let start_stream_id = self.args.start_stream_id();
         let producers = self.args.producers();
@@ -38,8 +43,7 @@ impl Benchmarkable for ProducerAndConsumerBenchmark {
         let partitions_count = self.args.number_of_partitions();
         let warmup_time = self.args.warmup_time();
 
-        let mut futures: BenchmarkFutures =
-            Ok(Vec::with_capacity((producers + consumers) as usize));
+        let mut set = JoinSet::new();
         for producer_id in 1..=producers {
             let stream_id = start_stream_id + producer_id;
             let producer = Producer::new(
@@ -59,8 +63,7 @@ impl Benchmarkable for ProducerAndConsumerBenchmark {
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 false, // TODO: put timestamp into first message, it should be 
an argument to iggy-bench
             );
-            let future = Box::pin(async move { producer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(producer.run());
         }
 
         let polling_kind = PollingKind::Offset;
@@ -85,14 +88,14 @@ impl Benchmarkable for ProducerAndConsumerBenchmark {
                     .rate_limit()
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
-            let future = Box::pin(async move { consumer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(consumer.run());
         }
+
         info!(
             "Starting to send and poll {} messages",
             self.total_messages()
         );
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs 
b/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs
index 00f46eca..7fee789b 100644
--- a/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs
@@ -1,4 +1,4 @@
-use super::benchmark::{BenchmarkFutures, Benchmarkable};
+use super::benchmark::Benchmarkable;
 use crate::{
     actors::{consumer::Consumer, producer::Producer},
     args::common::IggyBenchArgs,
@@ -8,9 +8,12 @@ use crate::{
 use async_trait::async_trait;
 use iggy::messages::PollingKind;
 use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, 
error::IggyError};
-use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::{
+    benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics,
+};
 use integration::test_server::{login_root, ClientFactory};
 use std::sync::{atomic::AtomicI64, Arc};
+use tokio::task::JoinSet;
 use tracing::{error, info};
 
 pub struct ProducerAndConsumerGroupBenchmark {
@@ -67,7 +70,9 @@ impl ProducerAndConsumerGroupBenchmark {
 
 #[async_trait]
 impl Benchmarkable for ProducerAndConsumerGroupBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.init_streams().await.expect("Failed to init streams!");
         let consumer_groups_count = self.args.number_of_consumer_groups();
         self.init_consumer_groups(consumer_groups_count)
@@ -96,9 +101,7 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark {
             message_batches * producers,
         );
 
-        let mut futures: BenchmarkFutures =
-            Ok(Vec::with_capacity((producers + consumers) as usize));
-
+        let mut set = JoinSet::new();
         for producer_id in 1..=producers {
             info!("Executing the benchmark on producer #{}...", producer_id);
             let stream_id = self.args.start_stream_id() + 1 + (producer_id % 
streams_number);
@@ -120,8 +123,7 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark {
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 false, // TODO: Put latency into payload of first message, it 
should be an argument to iggy-bench
             );
-            let future = Box::pin(async move { producer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(producer.run());
         }
         info!("Created {} producer(s).", producers);
 
@@ -148,15 +150,14 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark {
                     .rate_limit()
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
             );
-            let future = Box::pin(async move { consumer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(consumer.run());
         }
 
         info!(
             "Starting to send and poll {} messages",
             self.total_messages()
         );
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/producer_benchmark.rs 
b/bench/src/benchmarks/producer_benchmark.rs
index 0e9379e0..7453508b 100644
--- a/bench/src/benchmarks/producer_benchmark.rs
+++ b/bench/src/benchmarks/producer_benchmark.rs
@@ -1,11 +1,14 @@
 use crate::actors::producer::Producer;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable};
+use crate::benchmarks::benchmark::Benchmarkable;
 use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
+use iggy::error::IggyError;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::ClientFactory;
 use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::info;
 
 pub struct ProducerBenchmark {
@@ -24,7 +27,9 @@ impl ProducerBenchmark {
 
 #[async_trait]
 impl Benchmarkable for ProducerBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.init_streams().await.expect("Failed to init streams!");
         let producers_count = self.args.producers();
         info!("Creating {} producer(s)...", producers_count);
@@ -35,7 +40,7 @@ impl Benchmarkable for ProducerBenchmark {
         let partitions_count = self.args.number_of_partitions();
         let warmup_time = self.args.warmup_time();
 
-        let mut futures: BenchmarkFutures = 
Ok(Vec::with_capacity(producers_count as usize));
+        let mut set = JoinSet::new();
         for producer_id in 1..=producers_count {
             let args = self.args.clone();
             let client_factory = self.client_factory.clone();
@@ -58,11 +63,11 @@ impl Benchmarkable for ProducerBenchmark {
                     .map(|rl| RateLimiter::new(rl.as_bytes_u64())),
                 false, // TODO: Put timestamp in payload of first message, it 
should be an argument to iggy-bench
             );
-            let future = Box::pin(async move { producer.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(producer.run());
         }
+
         info!("Created {} producer(s).", producers_count);
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/producing_consumer_benchmark.rs 
b/bench/src/benchmarks/producing_consumer_benchmark.rs
index 8909636f..4692280b 100644
--- a/bench/src/benchmarks/producing_consumer_benchmark.rs
+++ b/bench/src/benchmarks/producing_consumer_benchmark.rs
@@ -1,13 +1,16 @@
 use crate::actors::producing_consumer::ProducingConsumer;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable};
+use crate::benchmarks::benchmark::Benchmarkable;
 use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
+use iggy::error::IggyError;
 use iggy::messages::PollingKind;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::ClientFactory;
 use std::sync::atomic::AtomicI64;
 use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::info;
 
 pub struct EndToEndProducingConsumerBenchmark {
@@ -26,7 +29,9 @@ impl EndToEndProducingConsumerBenchmark {
 
 #[async_trait]
 impl Benchmarkable for EndToEndProducingConsumerBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.init_streams().await.expect("Failed to init streams!");
         let actors_count = self.args.producers();
         info!("Creating {} producing consumer(s)...", actors_count);
@@ -36,7 +41,8 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
         let message_size = self.args.message_size();
         let warmup_time = self.args.warmup_time();
         let polling_kind = PollingKind::Offset;
-        let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(actors_count 
as usize));
+
+        let mut set = JoinSet::new();
         for actor_id in 1..=actors_count {
             let args = self.args.clone();
             let client_factory = self.client_factory.clone();
@@ -65,11 +71,11 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
                 polling_kind,
                 false,
             );
-            let future = Box::pin(async move { actor.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(actor.run());
         }
+
         info!("Created {} producing consumer(s).", actors_count);
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/benchmarks/producing_consumer_group_benchmark.rs 
b/bench/src/benchmarks/producing_consumer_group_benchmark.rs
index 81b8f52b..b5d57c7e 100644
--- a/bench/src/benchmarks/producing_consumer_group_benchmark.rs
+++ b/bench/src/benchmarks/producing_consumer_group_benchmark.rs
@@ -1,6 +1,6 @@
 use crate::actors::producing_consumer::ProducingConsumer;
 use crate::args::common::IggyBenchArgs;
-use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable};
+use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX};
 use crate::rate_limiter::RateLimiter;
 use async_trait::async_trait;
@@ -9,9 +9,11 @@ use iggy::clients::client::IggyClient;
 use iggy::error::IggyError;
 use iggy::messages::PollingKind;
 use iggy_bench_report::benchmark_kind::BenchmarkKind;
+use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics;
 use integration::test_server::{login_root, ClientFactory};
 use std::sync::atomic::AtomicI64;
 use std::sync::Arc;
+use tokio::task::JoinSet;
 use tracing::{error, info};
 
 pub struct EndToEndProducingConsumerGroupBenchmark {
@@ -68,7 +70,9 @@ impl EndToEndProducingConsumerGroupBenchmark {
 
 #[async_trait]
 impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark {
-    async fn run(&mut self) -> BenchmarkFutures {
+    async fn run(
+        &mut self,
+    ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, 
IggyError> {
         self.init_streams().await.expect("Failed to init streams!");
         let consumer_groups_count = self.args.number_of_consumer_groups();
         self.init_consumer_groups(consumer_groups_count)
@@ -87,11 +91,11 @@ impl Benchmarkable for 
EndToEndProducingConsumerGroupBenchmark {
         let partitions_count = self.args.number_of_partitions();
         let warmup_time = self.args.warmup_time();
         let polling_kind = PollingKind::Next;
-        let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(actors_count 
as usize));
         let consumer_groups_count = self.args.number_of_consumer_groups();
         let total_message_batches =
             Arc::new(AtomicI64::new((message_batches * actors_count) as i64));
 
+        let mut set = JoinSet::new();
         for actor_id in 1..=actors_count {
             let args = self.args.clone();
             let client_factory = self.client_factory.clone();
@@ -122,11 +126,11 @@ impl Benchmarkable for 
EndToEndProducingConsumerGroupBenchmark {
                 polling_kind,
                 false,
             );
-            let future = Box::pin(async move { actor.run().await });
-            futures.as_mut().unwrap().push(future);
+            set.spawn(actor.run());
         }
+
         info!("Created {actors_count} producing consumer(s) which would be 
part of {consumer_groups_count} consumer groups");
-        futures
+        Ok(set)
     }
 
     fn kind(&self) -> BenchmarkKind {
diff --git a/bench/src/runner.rs b/bench/src/runner.rs
index 71cbfb68..60732720 100644
--- a/bench/src/runner.rs
+++ b/bench/src/runner.rs
@@ -5,7 +5,6 @@ use crate::plot::{plot_chart, ChartType};
 use crate::utils::collect_server_logs_and_save_to_file;
 use crate::utils::cpu_name::append_cpu_name_lowercase;
 use crate::utils::server_starter::start_server_if_needed;
-use futures::future::select_all;
 use iggy::error::IggyError;
 use iggy_bench_report::hardware::BenchmarkHardware;
 use iggy_bench_report::params::BenchmarkParams;
@@ -42,18 +41,15 @@ impl BenchmarkRunner {
 
         let mut individual_metrics = Vec::new();
 
-        while !join_handles.is_empty() {
-            let (result, _index, remaining) = select_all(join_handles).await;
-            join_handles = remaining;
-
-            match result {
-                Ok(r) => individual_metrics.push(r),
+        while let Some(individual_metric) = join_handles.join_next().await {
+            let individual_metric = individual_metric.expect("Failed to join 
actor!");
+            match individual_metric {
+                Ok(individual_metric) => 
individual_metrics.push(individual_metric),
                 Err(e) => return Err(e),
             }
         }
 
         info!("All actors joined!");
-
         let hardware =
             
BenchmarkHardware::get_system_info_with_identifier(benchmark.args().identifier());
         let params = BenchmarkParams::from(benchmark.args());
diff --git a/sdk/src/models/messaging/message_view.rs 
b/sdk/src/models/messaging/message_view.rs
index ba51d1f4..050c86a2 100644
--- a/sdk/src/models/messaging/message_view.rs
+++ b/sdk/src/models/messaging/message_view.rs
@@ -1,10 +1,6 @@
 use super::message_header::*;
 use super::message_header_view::IggyMessageHeaderView;
 use crate::error::IggyError;
-use crate::models::messaging::header::HeaderKey;
-use crate::models::messaging::header::HeaderValue;
-use crate::prelude::BytesSerializable;
-use std::collections::HashMap;
 use std::iter::Iterator;
 
 /// A immutable view of a message.
diff --git a/server/src/streaming/partitions/messages.rs 
b/server/src/streaming/partitions/messages.rs
index 5b307ee4..90ee2013 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -113,8 +113,7 @@ impl Partition {
                 consumer_offset.offset,
                 self.partition_id
             );
-            //TODO: Fix me
-            //return Ok(Vec::new());
+            return Ok(IggyBatch::empty());
         }
 
         let offset = consumer_offset.offset + 1;
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index 2e22a841..181ff907 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -235,33 +235,30 @@ impl Segment {
     }
 
     pub async fn is_expired(&self, now: IggyTimestamp) -> bool {
-        //TODO: Fix me
-        return false;
-        /*
-        if !self.is_closed {
-            return false;
-        }
-
-        match self.message_expiry {
-            IggyExpiry::NeverExpire => false,
-            IggyExpiry::ServerDefault => false,
-            IggyExpiry::ExpireDuration(expiry) => {
-                let last_messages = 
self.get_messages_by_offset(self.current_offset, 1).await;
-                if last_messages.is_err() {
-                    return false;
-                }
-
-                let last_messages = last_messages.unwrap();
-                if last_messages.is_empty() {
-                    return false;
-                }
-
-                let last_message = &last_messages[0];
-                let last_message_timestamp = last_message.timestamp;
-                last_message_timestamp + expiry.as_micros() <= now.as_micros()
-            }
-        }
-        */
+        false
+        // if !self.is_closed {
+        //     return false;
+        // }
+
+        // match self.message_expiry {
+        //     IggyExpiry::NeverExpire => false,
+        //     IggyExpiry::ServerDefault => false,
+        //     IggyExpiry::ExpireDuration(expiry) => {
+        //         let last_messages = 
self.get_messages_by_offset(self.end_offset, 1).await;
+        //         if last_messages.is_err() {
+        //             return false;
+        //         }
+
+        //         let last_messages = last_messages.unwrap();
+        //         if last_messages.is_empty() {
+        //             return false;
+        //         }
+
+        //         let last_message = last_messages.iter().last().unwrap();
+        //         let last_message_timestamp = last_message.timestamp;
+        //         last_message_timestamp + expiry.as_micros() <= 
now.as_micros()
+        //     }
+        // }
     }
 
     pub async fn shutdown_reading(&mut self) {

Reply via email to