This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 745899dbbe53a8a162d79e17a77160aece496754 Author: Grzegorz Koszyk <[email protected]> AuthorDate: Sun Mar 16 18:15:25 2025 +0100 feat(bench): improve benchmark by parallelizing actors (#1645) This PR improves our benchmark suite, by parallelizing benchmark actors using `JoinSet` rather than `select` --- Cargo.lock | 3 +- bench/Cargo.toml | 3 +- bench/src/actors/consumer.rs | 2 +- bench/src/actors/producer.rs | 2 +- bench/src/actors/producing_consumer.rs | 2 +- bench/src/benchmarks/benchmark.rs | 13 +++--- bench/src/benchmarks/consumer_benchmark.rs | 18 +++++--- bench/src/benchmarks/consumer_group_benchmark.rs | 19 +++++--- .../benchmarks/producer_and_consumer_benchmark.rs | 21 +++++---- .../producer_and_consumer_group_benchmark.rs | 23 +++++----- bench/src/benchmarks/producer_benchmark.rs | 17 +++++--- .../src/benchmarks/producing_consumer_benchmark.rs | 18 +++++--- .../producing_consumer_group_benchmark.rs | 16 ++++--- bench/src/runner.rs | 12 ++--- sdk/src/models/messaging/message_view.rs | 4 -- server/src/streaming/partitions/messages.rs | 3 +- server/src/streaming/segments/segment.rs | 51 ++++++++++------------ 17 files changed, 119 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6cee216b..78bbc4a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -607,7 +607,7 @@ dependencies = [ [[package]] name = "bench" -version = "0.2.3" +version = "0.2.4" dependencies = [ "async-trait", "atomic-time", @@ -616,7 +616,6 @@ dependencies = [ "chrono", "clap", "figlet-rs", - "futures", "hostname", "human-repr", "iggy", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index ff919c28..ea88d38e 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bench" -version = "0.2.3" +version = "0.2.4" edition = "2021" license = "Apache-2.0" # Due to dependency to integration, which has a dependency to server, setting @@ -14,7 +14,6 @@ charming = "0.4.0" chrono = "0.4.39" clap = { version = "4.5.30", features = ["derive"] } figlet-rs = "0.1.5" -futures = "0.3.31" hostname = "0.4.0" human-repr = "1.1.0" iggy = { path = "../sdk" } diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs index 0f5860fb..e919067a 100644 --- a/bench/src/actors/consumer.rs +++ b/bench/src/actors/consumer.rs @@ -75,7 +75,7 @@ impl Consumer { } } - pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> { + pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let message_batches = self.message_batches as u64; diff --git a/bench/src/actors/producer.rs b/bench/src/actors/producer.rs index 4cca4713..affbd457 100644 --- a/bench/src/actors/producer.rs +++ b/bench/src/actors/producer.rs @@ -71,7 +71,7 @@ impl Producer { } } - pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> { + pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let partitions_count = self.partitions_count; diff --git a/bench/src/actors/producing_consumer.rs b/bench/src/actors/producing_consumer.rs index e0cd55a8..0517c2a3 100644 --- a/bench/src/actors/producing_consumer.rs +++ b/bench/src/actors/producing_consumer.rs @@ -82,7 +82,7 @@ impl ProducingConsumer { } } - pub async fn run(&self) -> Result<BenchmarkIndividualMetrics, IggyError> { + pub async fn run(self) -> Result<BenchmarkIndividualMetrics, IggyError> { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let message_batches = self.message_batches; diff --git a/bench/src/benchmarks/benchmark.rs b/bench/src/benchmarks/benchmark.rs index be87d3c8..cc2f8fd4 100644 --- a/bench/src/benchmarks/benchmark.rs +++ b/bench/src/benchmarks/benchmark.rs @@ -1,7 +1,6 @@ use crate::args::kind::BenchmarkKindCommand; use crate::{args::common::IggyBenchArgs, utils::client_factory::create_client_factory}; use async_trait::async_trait; -use futures::Future; use iggy::client::{StreamClient, TopicClient}; use iggy::clients::client::IggyClient; use iggy::compression::compression_algorithm::CompressionAlgorithm; @@ -11,7 +10,8 @@ use iggy::utils::topic_size::MaxTopicSize; use iggy_bench_report::benchmark_kind::BenchmarkKind; use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::{login_root, ClientFactory}; -use std::{pin::Pin, sync::Arc}; +use std::sync::Arc; +use tokio::task::JoinSet; use tracing::info; use super::consumer_benchmark::ConsumerBenchmark; @@ -22,11 +22,6 @@ use super::producer_benchmark::ProducerBenchmark; use super::producing_consumer_benchmark::EndToEndProducingConsumerBenchmark; use super::producing_consumer_group_benchmark::EndToEndProducingConsumerGroupBenchmark; -pub type BenchmarkFutures = Result< - Vec<Pin<Box<dyn Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send>>>, - IggyError, ->; - impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { fn from(args: IggyBenchArgs) -> Self { let client_factory = create_client_factory(&args); @@ -63,7 +58,9 @@ impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { #[async_trait] pub trait Benchmarkable { - async fn run(&mut self) -> BenchmarkFutures; + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError>; fn kind(&self) -> BenchmarkKind; fn args(&self) -> &IggyBenchArgs; fn client_factory(&self) -> &Arc<dyn ClientFactory>; diff --git a/bench/src/benchmarks/consumer_benchmark.rs b/bench/src/benchmarks/consumer_benchmark.rs index 33c92521..c5fffdd6 100644 --- a/bench/src/benchmarks/consumer_benchmark.rs +++ b/bench/src/benchmarks/consumer_benchmark.rs @@ -1,13 +1,16 @@ use crate::actors::consumer::Consumer; use crate::args::common::IggyBenchArgs; -use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::benchmarks::benchmark::Benchmarkable; use crate::rate_limiter::RateLimiter; use async_trait::async_trait; +use iggy::error::IggyError; use iggy::messages::PollingKind; use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::ClientFactory; use std::sync::atomic::AtomicI64; use std::sync::Arc; +use tokio::task::JoinSet; use tracing::info; pub struct ConsumerBenchmark { @@ -26,14 +29,16 @@ impl ConsumerBenchmark { #[async_trait] impl Benchmarkable for ConsumerBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.check_streams().await?; let consumers_count = self.args.consumers(); info!("Creating {} consumer(s)...", consumers_count); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); - let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(consumers_count as usize)); + let mut set = JoinSet::new(); for consumer_id in 1..=consumers_count { let args = self.args.clone(); let client_factory = self.client_factory.clone(); @@ -76,12 +81,11 @@ impl Benchmarkable for ConsumerBenchmark { args.rate_limit() .map(|rl| RateLimiter::new(rl.as_bytes_u64())), ); - - let future = Box::pin(async move { consumer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(consumer.run()); } + info!("Created {} consumer(s).", consumers_count); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs index 697fd9b6..42195f56 100644 --- a/bench/src/benchmarks/consumer_group_benchmark.rs +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -1,4 +1,4 @@ -use super::benchmark::{BenchmarkFutures, Benchmarkable}; +use super::benchmark::Benchmarkable; use crate::{ actors::consumer::Consumer, args::common::IggyBenchArgs, @@ -10,9 +10,12 @@ use iggy::{ client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError, messages::PollingKind, }; -use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::{ + benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics, +}; use integration::test_server::{login_root, ClientFactory}; use std::sync::{atomic::AtomicI64, Arc}; +use tokio::task::JoinSet; use tracing::{error, info}; pub struct ConsumerGroupBenchmark { @@ -69,7 +72,9 @@ impl ConsumerGroupBenchmark { #[async_trait] impl Benchmarkable for ConsumerGroupBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.check_streams().await?; let consumer_groups_count = self.args.number_of_consumer_groups(); self.init_consumer_groups(consumer_groups_count) @@ -81,11 +86,11 @@ impl Benchmarkable for ConsumerGroupBenchmark { let consumers = self.args.consumers(); let messages_per_batch = self.args.messages_per_batch(); let warmup_time = self.args.warmup_time(); - let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); let polling_kind = PollingKind::Next; let message_batches = self.args.message_batches(); let total_message_batches = Arc::new(AtomicI64::new((message_batches * consumers) as i64)); + let mut set = JoinSet::new(); for consumer_id in 1..=consumers { let consumer_group_id = start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); @@ -110,14 +115,14 @@ impl Benchmarkable for ConsumerGroupBenchmark { .rate_limit() .map(|rl| RateLimiter::new(rl.as_bytes_u64())), ); - let future = Box::pin(async move { consumer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(consumer.run()); } + info!( "Starting consumer group benchmark with {} messages", self.total_messages() ); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/producer_and_consumer_benchmark.rs b/bench/src/benchmarks/producer_and_consumer_benchmark.rs index 1c4a3fd8..6bfe6bee 100644 --- a/bench/src/benchmarks/producer_and_consumer_benchmark.rs +++ b/bench/src/benchmarks/producer_and_consumer_benchmark.rs @@ -1,14 +1,17 @@ use crate::actors::consumer::Consumer; use crate::actors::producer::Producer; use crate::args::common::IggyBenchArgs; -use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::benchmarks::benchmark::Benchmarkable; use crate::rate_limiter::RateLimiter; use async_trait::async_trait; +use iggy::error::IggyError; use iggy::messages::PollingKind; use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::ClientFactory; use std::sync::atomic::AtomicI64; use std::sync::Arc; +use tokio::task::JoinSet; use tracing::info; pub struct ProducerAndConsumerBenchmark { @@ -27,7 +30,9 @@ impl ProducerAndConsumerBenchmark { #[async_trait] impl Benchmarkable for ProducerAndConsumerBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.init_streams().await.expect("Failed to init streams!"); let start_stream_id = self.args.start_stream_id(); let producers = self.args.producers(); @@ -38,8 +43,7 @@ impl Benchmarkable for ProducerAndConsumerBenchmark { let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let mut futures: BenchmarkFutures = - Ok(Vec::with_capacity((producers + consumers) as usize)); + let mut set = JoinSet::new(); for producer_id in 1..=producers { let stream_id = start_stream_id + producer_id; let producer = Producer::new( @@ -59,8 +63,7 @@ impl Benchmarkable for ProducerAndConsumerBenchmark { .map(|rl| RateLimiter::new(rl.as_bytes_u64())), false, // TODO: put timestamp into first message, it should be an argument to iggy-bench ); - let future = Box::pin(async move { producer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(producer.run()); } let polling_kind = PollingKind::Offset; @@ -85,14 +88,14 @@ impl Benchmarkable for ProducerAndConsumerBenchmark { .rate_limit() .map(|rl| RateLimiter::new(rl.as_bytes_u64())), ); - let future = Box::pin(async move { consumer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(consumer.run()); } + info!( "Starting to send and poll {} messages", self.total_messages() ); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs b/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs index 00f46eca..7fee789b 100644 --- a/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs +++ b/bench/src/benchmarks/producer_and_consumer_group_benchmark.rs @@ -1,4 +1,4 @@ -use super::benchmark::{BenchmarkFutures, Benchmarkable}; +use super::benchmark::Benchmarkable; use crate::{ actors::{consumer::Consumer, producer::Producer}, args::common::IggyBenchArgs, @@ -8,9 +8,12 @@ use crate::{ use async_trait::async_trait; use iggy::messages::PollingKind; use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError}; -use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::{ + benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics, +}; use integration::test_server::{login_root, ClientFactory}; use std::sync::{atomic::AtomicI64, Arc}; +use tokio::task::JoinSet; use tracing::{error, info}; pub struct ProducerAndConsumerGroupBenchmark { @@ -67,7 +70,9 @@ impl ProducerAndConsumerGroupBenchmark { #[async_trait] impl Benchmarkable for ProducerAndConsumerGroupBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.init_streams().await.expect("Failed to init streams!"); let consumer_groups_count = self.args.number_of_consumer_groups(); self.init_consumer_groups(consumer_groups_count) @@ -96,9 +101,7 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark { message_batches * producers, ); - let mut futures: BenchmarkFutures = - Ok(Vec::with_capacity((producers + consumers) as usize)); - + let mut set = JoinSet::new(); for producer_id in 1..=producers { info!("Executing the benchmark on producer #{}...", producer_id); let stream_id = self.args.start_stream_id() + 1 + (producer_id % streams_number); @@ -120,8 +123,7 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark { .map(|rl| RateLimiter::new(rl.as_bytes_u64())), false, // TODO: Put latency into payload of first message, it should be an argument to iggy-bench ); - let future = Box::pin(async move { producer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(producer.run()); } info!("Created {} producer(s).", producers); @@ -148,15 +150,14 @@ impl Benchmarkable for ProducerAndConsumerGroupBenchmark { .rate_limit() .map(|rl| RateLimiter::new(rl.as_bytes_u64())), ); - let future = Box::pin(async move { consumer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(consumer.run()); } info!( "Starting to send and poll {} messages", self.total_messages() ); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/producer_benchmark.rs b/bench/src/benchmarks/producer_benchmark.rs index 0e9379e0..7453508b 100644 --- a/bench/src/benchmarks/producer_benchmark.rs +++ b/bench/src/benchmarks/producer_benchmark.rs @@ -1,11 +1,14 @@ use crate::actors::producer::Producer; use crate::args::common::IggyBenchArgs; -use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::benchmarks::benchmark::Benchmarkable; use crate::rate_limiter::RateLimiter; use async_trait::async_trait; +use iggy::error::IggyError; use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::ClientFactory; use std::sync::Arc; +use tokio::task::JoinSet; use tracing::info; pub struct ProducerBenchmark { @@ -24,7 +27,9 @@ impl ProducerBenchmark { #[async_trait] impl Benchmarkable for ProducerBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.init_streams().await.expect("Failed to init streams!"); let producers_count = self.args.producers(); info!("Creating {} producer(s)...", producers_count); @@ -35,7 +40,7 @@ impl Benchmarkable for ProducerBenchmark { let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(producers_count as usize)); + let mut set = JoinSet::new(); for producer_id in 1..=producers_count { let args = self.args.clone(); let client_factory = self.client_factory.clone(); @@ -58,11 +63,11 @@ impl Benchmarkable for ProducerBenchmark { .map(|rl| RateLimiter::new(rl.as_bytes_u64())), false, // TODO: Put timestamp in payload of first message, it should be an argument to iggy-bench ); - let future = Box::pin(async move { producer.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(producer.run()); } + info!("Created {} producer(s).", producers_count); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/producing_consumer_benchmark.rs b/bench/src/benchmarks/producing_consumer_benchmark.rs index 8909636f..4692280b 100644 --- a/bench/src/benchmarks/producing_consumer_benchmark.rs +++ b/bench/src/benchmarks/producing_consumer_benchmark.rs @@ -1,13 +1,16 @@ use crate::actors::producing_consumer::ProducingConsumer; use crate::args::common::IggyBenchArgs; -use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::benchmarks::benchmark::Benchmarkable; use crate::rate_limiter::RateLimiter; use async_trait::async_trait; +use iggy::error::IggyError; use iggy::messages::PollingKind; use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::ClientFactory; use std::sync::atomic::AtomicI64; use std::sync::Arc; +use tokio::task::JoinSet; use tracing::info; pub struct EndToEndProducingConsumerBenchmark { @@ -26,7 +29,9 @@ impl EndToEndProducingConsumerBenchmark { #[async_trait] impl Benchmarkable for EndToEndProducingConsumerBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.init_streams().await.expect("Failed to init streams!"); let actors_count = self.args.producers(); info!("Creating {} producing consumer(s)...", actors_count); @@ -36,7 +41,8 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark { let message_size = self.args.message_size(); let warmup_time = self.args.warmup_time(); let polling_kind = PollingKind::Offset; - let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(actors_count as usize)); + + let mut set = JoinSet::new(); for actor_id in 1..=actors_count { let args = self.args.clone(); let client_factory = self.client_factory.clone(); @@ -65,11 +71,11 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark { polling_kind, false, ); - let future = Box::pin(async move { actor.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(actor.run()); } + info!("Created {} producing consumer(s).", actors_count); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/benchmarks/producing_consumer_group_benchmark.rs b/bench/src/benchmarks/producing_consumer_group_benchmark.rs index 81b8f52b..b5d57c7e 100644 --- a/bench/src/benchmarks/producing_consumer_group_benchmark.rs +++ b/bench/src/benchmarks/producing_consumer_group_benchmark.rs @@ -1,6 +1,6 @@ use crate::actors::producing_consumer::ProducingConsumer; use crate::args::common::IggyBenchArgs; -use crate::benchmarks::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX}; use crate::rate_limiter::RateLimiter; use async_trait::async_trait; @@ -9,9 +9,11 @@ use iggy::clients::client::IggyClient; use iggy::error::IggyError; use iggy::messages::PollingKind; use iggy_bench_report::benchmark_kind::BenchmarkKind; +use iggy_bench_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::{login_root, ClientFactory}; use std::sync::atomic::AtomicI64; use std::sync::Arc; +use tokio::task::JoinSet; use tracing::{error, info}; pub struct EndToEndProducingConsumerGroupBenchmark { @@ -68,7 +70,9 @@ impl EndToEndProducingConsumerGroupBenchmark { #[async_trait] impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark { - async fn run(&mut self) -> BenchmarkFutures { + async fn run( + &mut self, + ) -> Result<JoinSet<Result<BenchmarkIndividualMetrics, IggyError>>, IggyError> { self.init_streams().await.expect("Failed to init streams!"); let consumer_groups_count = self.args.number_of_consumer_groups(); self.init_consumer_groups(consumer_groups_count) @@ -87,11 +91,11 @@ impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark { let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); let polling_kind = PollingKind::Next; - let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(actors_count as usize)); let consumer_groups_count = self.args.number_of_consumer_groups(); let total_message_batches = Arc::new(AtomicI64::new((message_batches * actors_count) as i64)); + let mut set = JoinSet::new(); for actor_id in 1..=actors_count { let args = self.args.clone(); let client_factory = self.client_factory.clone(); @@ -122,11 +126,11 @@ impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark { polling_kind, false, ); - let future = Box::pin(async move { actor.run().await }); - futures.as_mut().unwrap().push(future); + set.spawn(actor.run()); } + info!("Created {actors_count} producing consumer(s) which would be part of {consumer_groups_count} consumer groups"); - futures + Ok(set) } fn kind(&self) -> BenchmarkKind { diff --git a/bench/src/runner.rs b/bench/src/runner.rs index 71cbfb68..60732720 100644 --- a/bench/src/runner.rs +++ b/bench/src/runner.rs @@ -5,7 +5,6 @@ use crate::plot::{plot_chart, ChartType}; use crate::utils::collect_server_logs_and_save_to_file; use crate::utils::cpu_name::append_cpu_name_lowercase; use crate::utils::server_starter::start_server_if_needed; -use futures::future::select_all; use iggy::error::IggyError; use iggy_bench_report::hardware::BenchmarkHardware; use iggy_bench_report::params::BenchmarkParams; @@ -42,18 +41,15 @@ impl BenchmarkRunner { let mut individual_metrics = Vec::new(); - while !join_handles.is_empty() { - let (result, _index, remaining) = select_all(join_handles).await; - join_handles = remaining; - - match result { - Ok(r) => individual_metrics.push(r), + while let Some(individual_metric) = join_handles.join_next().await { + let individual_metric = individual_metric.expect("Failed to join actor!"); + match individual_metric { + Ok(individual_metric) => individual_metrics.push(individual_metric), Err(e) => return Err(e), } } info!("All actors joined!"); - let hardware = BenchmarkHardware::get_system_info_with_identifier(benchmark.args().identifier()); let params = BenchmarkParams::from(benchmark.args()); diff --git a/sdk/src/models/messaging/message_view.rs b/sdk/src/models/messaging/message_view.rs index ba51d1f4..050c86a2 100644 --- a/sdk/src/models/messaging/message_view.rs +++ b/sdk/src/models/messaging/message_view.rs @@ -1,10 +1,6 @@ use super::message_header::*; use super::message_header_view::IggyMessageHeaderView; use crate::error::IggyError; -use crate::models::messaging::header::HeaderKey; -use crate::models::messaging::header::HeaderValue; -use crate::prelude::BytesSerializable; -use std::collections::HashMap; use std::iter::Iterator; /// A immutable view of a message. diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index 5b307ee4..90ee2013 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -113,8 +113,7 @@ impl Partition { consumer_offset.offset, self.partition_id ); - //TODO: Fix me - //return Ok(Vec::new()); + return Ok(IggyBatch::empty()); } let offset = consumer_offset.offset + 1; diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 2e22a841..181ff907 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -235,33 +235,30 @@ impl Segment { } pub async fn is_expired(&self, now: IggyTimestamp) -> bool { - //TODO: Fix me - return false; - /* - if !self.is_closed { - return false; - } - - match self.message_expiry { - IggyExpiry::NeverExpire => false, - IggyExpiry::ServerDefault => false, - IggyExpiry::ExpireDuration(expiry) => { - let last_messages = self.get_messages_by_offset(self.current_offset, 1).await; - if last_messages.is_err() { - return false; - } - - let last_messages = last_messages.unwrap(); - if last_messages.is_empty() { - return false; - } - - let last_message = &last_messages[0]; - let last_message_timestamp = last_message.timestamp; - last_message_timestamp + expiry.as_micros() <= now.as_micros() - } - } - */ + false + // if !self.is_closed { + // return false; + // } + + // match self.message_expiry { + // IggyExpiry::NeverExpire => false, + // IggyExpiry::ServerDefault => false, + // IggyExpiry::ExpireDuration(expiry) => { + // let last_messages = self.get_messages_by_offset(self.end_offset, 1).await; + // if last_messages.is_err() { + // return false; + // } + + // let last_messages = last_messages.unwrap(); + // if last_messages.is_empty() { + // return false; + // } + + // let last_message = last_messages.iter().last().unwrap(); + // let last_message_timestamp = last_message.timestamp; + // last_message_timestamp + expiry.as_micros() <= now.as_micros() + // } + // } } pub async fn shutdown_reading(&mut self) {
