This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch bench-telemetry in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ad53d9a19be5611379272b281a39797d08d47bdf Author: Hubert Gruszecki <h.grusze...@gmail.com> AuthorDate: Fri Aug 8 15:24:56 2025 +0200 feat(bench): implement otel telemetry --- Cargo.lock | 3 + Cargo.toml | 5 + core/bench/Cargo.toml | 3 + .../src/actors/consumer/benchmark_consumer.rs | 16 ++ .../actors/consumer/typed_benchmark_consumer.rs | 4 + .../src/actors/producer/benchmark_producer.rs | 15 + .../actors/producer/typed_benchmark_producer.rs | 4 + .../benchmark_producing_consumer.rs | 26 ++ .../typed_banchmark_producing_consumer.rs | 4 + core/bench/src/args/common.rs | 35 +++ core/bench/src/args/output.rs | 24 ++ .../src/benchmarks/balanced_consumer_group.rs | 28 +- core/bench/src/benchmarks/balanced_producer.rs | 23 +- .../balanced_producer_and_consumer_group.rs | 48 +++- core/bench/src/benchmarks/benchmark.rs | 50 ++-- core/bench/src/benchmarks/common.rs | 28 +- .../benchmarks/end_to_end_producing_consumer.rs | 23 +- .../end_to_end_producing_consumer_group.rs | 26 +- core/bench/src/benchmarks/pinned_consumer.rs | 22 +- core/bench/src/benchmarks/pinned_producer.rs | 26 +- .../src/benchmarks/pinned_producer_and_consumer.rs | 43 ++- core/bench/src/main.rs | 1 + core/bench/src/runner.rs | 28 +- core/bench/src/telemetry/buffer.rs | 227 ++++++++++++++++ core/bench/src/telemetry/mod.rs | 238 ++++++++++++++++ core/bench/src/telemetry/setup.rs | 302 +++++++++++++++++++++ core/server/Cargo.toml | 10 +- 27 files changed, 1218 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4bea03c..9c7ecffd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3835,6 +3835,9 @@ dependencies = [ "iggy", "integration", "nonzero_lit", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "rand 0.9.2", "rayon", "serde", diff --git a/Cargo.toml b/Cargo.toml index 239e4b03..beba7743 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,11 @@ humantime = "2.2.0" keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] } nonzero_lit = "0.1.2" once_cell = "1.21.3" +opentelemetry = "0.30.0" +opentelemetry-appender-tracing = "0.30.1" +opentelemetry-otlp = "0.30.0" +opentelemetry-semantic-conventions = "0.30.0" +opentelemetry_sdk = "0.30.0" passterm = "=2.0.1" quinn = "0.11.8" postcard = { version = "1.1.3", features = ["alloc"] } diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml index 40937ced..75359706 100644 --- a/core/bench/Cargo.toml +++ b/core/bench/Cargo.toml @@ -43,6 +43,9 @@ human-repr = { workspace = true } iggy = { workspace = true } integration = { workspace = true } nonzero_lit = { workspace = true } +opentelemetry = { version = "0.30.0", features = ["metrics"] } +opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "metrics"] } +opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "metrics"] } rand = { workspace = true } rayon = "1.10.0" serde = { workspace = true } diff --git a/core/bench/src/actors/consumer/benchmark_consumer.rs b/core/bench/src/actors/consumer/benchmark_consumer.rs index 6181fc49..7b2337f3 100644 --- a/core/bench/src/actors/consumer/benchmark_consumer.rs +++ b/core/bench/src/actors/consumer/benchmark_consumer.rs @@ -20,6 +20,7 @@ use crate::actors::consumer::client::BenchmarkConsumerClient; use crate::actors::consumer::client::interface::BenchmarkConsumerConfig; use crate::analytics::metrics::individual::from_records; use crate::analytics::record::BenchmarkRecord; +use crate::telemetry::MetricsHandle; use crate::utils::finish_condition::BenchmarkFinishCondition; use crate::utils::rate_limiter::BenchmarkRateLimiter; use bench_report::actor_kind::ActorKind; @@ -41,9 +42,11 @@ pub struct BenchmarkConsumer<C: BenchmarkConsumerClient> { pub moving_average_window: u32, pub limit_bytes_per_second: Option<IggyByteSize>, pub config: BenchmarkConsumerConfig, + pub telemetry: Option<MetricsHandle>, } impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> { + #[allow(clippy::too_many_arguments)] pub const fn new( client: C, benchmark_kind: BenchmarkKind, @@ -52,6 +55,7 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> { moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, config: BenchmarkConsumerConfig, + telemetry: Option<MetricsHandle>, ) -> Self { Self { client, @@ -61,6 +65,7 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> { moving_average_window, limit_bytes_per_second, config, + telemetry, } } @@ -110,6 +115,17 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> { total_bytes: bytes_processed, }); + // Record metrics to OpenTelemetry if enabled + if let Some(ref telemetry) = self.telemetry { + #[allow(clippy::cast_precision_loss)] + let latency_us = batch.latency.as_micros() as f64; + telemetry.record_batch_received( + u64::from(batch.messages), + batch.total_bytes, + latency_us, + ); + } + if let Some(rate_limiter) = &rate_limiter { rate_limiter .wait_until_necessary(batch.user_data_bytes) diff --git a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs index fca492ea..7422b602 100644 --- a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs +++ b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs @@ -26,6 +26,7 @@ use crate::{ low_level::LowLevelConsumerClient, }, }, + telemetry::MetricsHandle, utils::finish_condition::BenchmarkFinishCondition, }; use bench_report::{ @@ -57,6 +58,7 @@ impl TypedBenchmarkConsumer { polling_kind: PollingKind, limit_bytes_per_second: Option<IggyByteSize>, origin_timestamp_latency_calculation: bool, + telemetry: Option<MetricsHandle>, ) -> Self { let config = BenchmarkConsumerConfig { consumer_id, @@ -77,6 +79,7 @@ impl TypedBenchmarkConsumer { moving_average_window, limit_bytes_per_second, config, + telemetry, )) } else { Self::Low(BenchmarkConsumer::new( @@ -87,6 +90,7 @@ impl TypedBenchmarkConsumer { moving_average_window, limit_bytes_per_second, config, + telemetry, )) } } diff --git a/core/bench/src/actors/producer/benchmark_producer.rs b/core/bench/src/actors/producer/benchmark_producer.rs index 9c9f2a55..df7ed853 100644 --- a/core/bench/src/actors/producer/benchmark_producer.rs +++ b/core/bench/src/actors/producer/benchmark_producer.rs @@ -19,6 +19,7 @@ use crate::{ actors::producer::client::{BenchmarkProducerClient, interface::BenchmarkProducerConfig}, analytics::{metrics::individual::from_records, record::BenchmarkRecord}, + telemetry::MetricsHandle, utils::{ batch_generator::BenchmarkBatchGenerator, finish_condition::BenchmarkFinishCondition, rate_limiter::BenchmarkRateLimiter, @@ -42,6 +43,7 @@ pub struct BenchmarkProducer<P: BenchmarkProducerClient> { pub moving_average_window: u32, pub limit_bytes_per_second: Option<IggyByteSize>, pub config: BenchmarkProducerConfig, + pub telemetry: Option<MetricsHandle>, } impl<P: BenchmarkProducerClient> BenchmarkProducer<P> { @@ -54,6 +56,7 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> { moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, config: BenchmarkProducerConfig, + telemetry: Option<MetricsHandle>, ) -> Self { Self { client, @@ -63,6 +66,7 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> { moving_average_window, limit_bytes_per_second, config, + telemetry, } } @@ -126,6 +130,17 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> { total_bytes: total_bytes_processed, }); + // Record metrics to OpenTelemetry if enabled + if let Some(ref telemetry) = self.telemetry { + #[allow(clippy::cast_precision_loss)] + let latency_us = batch.latency.as_micros() as f64; + telemetry.record_batch_sent( + u64::from(batch.messages), + batch.total_bytes, + latency_us, + ); + } + if let Some(rate_limiter) = &rate_limiter { rate_limiter .wait_until_necessary(batch.user_data_bytes) diff --git a/core/bench/src/actors/producer/typed_benchmark_producer.rs b/core/bench/src/actors/producer/typed_benchmark_producer.rs index ee2de483..5c53a6cf 100644 --- a/core/bench/src/actors/producer/typed_benchmark_producer.rs +++ b/core/bench/src/actors/producer/typed_benchmark_producer.rs @@ -24,6 +24,7 @@ use crate::{ low_level::LowLevelProducerClient, }, }, + telemetry::MetricsHandle, utils::finish_condition::BenchmarkFinishCondition, }; use bench_report::{ @@ -55,6 +56,7 @@ impl TypedBenchmarkProducer { sampling_time: IggyDuration, moving_average_window: u32, limit_bytes_per_second: Option<IggyByteSize>, + telemetry: Option<MetricsHandle>, ) -> Self { let config = BenchmarkProducerConfig { producer_id, @@ -75,6 +77,7 @@ impl TypedBenchmarkProducer { moving_average_window, limit_bytes_per_second, config, + telemetry, )) } else { let client = LowLevelProducerClient::new(client_factory, config.clone()); @@ -86,6 +89,7 @@ impl TypedBenchmarkProducer { moving_average_window, limit_bytes_per_second, config, + telemetry, )) } } diff --git a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs index 0f5d323a..baa2a92f 100644 --- a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs +++ b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs @@ -24,6 +24,7 @@ use crate::{ producer::client::{BenchmarkProducerClient, interface::BenchmarkProducerConfig}, }, analytics::{metrics::individual::from_records, record::BenchmarkRecord}, + telemetry::MetricsHandle, utils::{ batch_generator::BenchmarkBatchGenerator, finish_condition::BenchmarkFinishCondition, rate_limiter::BenchmarkRateLimiter, @@ -53,6 +54,7 @@ where pub limit_bytes_per_second: Option<IggyByteSize>, pub producer_config: BenchmarkProducerConfig, pub consumer_config: BenchmarkConsumerConfig, + pub telemetry: Option<MetricsHandle>, } impl<P, C> BenchmarkProducingConsumer<P, C> @@ -72,6 +74,7 @@ where limit_bytes_per_second: Option<IggyByteSize>, producer_config: BenchmarkProducerConfig, consumer_config: BenchmarkConsumerConfig, + telemetry: Option<MetricsHandle>, ) -> Self { Self { producer, @@ -84,6 +87,7 @@ where limit_bytes_per_second, producer_config, consumer_config, + telemetry, } } #[allow(clippy::too_many_lines)] @@ -155,6 +159,17 @@ where sent_batches += 1; awaiting_reply = is_consumer; + // Record producer metrics to OpenTelemetry if enabled + if let Some(ref telemetry) = self.telemetry { + #[allow(clippy::cast_precision_loss)] + let latency_us = batch.latency.as_micros() as f64; + telemetry.record_batch_sent( + u64::from(batch.messages), + batch.total_bytes, + latency_us, + ); + } + if self .send_finish_condition .account_and_check(batch.user_data_bytes) @@ -193,6 +208,17 @@ where total_bytes: sent_total_bytes + recv_total_bytes, }); + // Record consumer metrics to OpenTelemetry if enabled + if let Some(ref telemetry) = self.telemetry { + #[allow(clippy::cast_precision_loss)] + let latency_us = batch.latency.as_micros() as f64; + telemetry.record_batch_received( + u64::from(batch.messages), + batch.total_bytes, + latency_us, + ); + } + if let Some(limiter) = &rate_limiter { limiter.wait_until_necessary(rl_value).await; rl_value = 0; diff --git a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs index 26e3e0fb..aec81bc3 100644 --- a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs +++ b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs @@ -30,6 +30,7 @@ use crate::{ }, producing_consumer::BenchmarkProducingConsumer, }, + telemetry::MetricsHandle, utils::finish_condition::BenchmarkFinishCondition, }; use bench_report::{ @@ -64,6 +65,7 @@ impl TypedBenchmarkProducingConsumer { limit_bytes_per_second: Option<IggyByteSize>, polling_kind: PollingKind, origin_timestamp_latency_calculation: bool, + telemetry: Option<MetricsHandle>, ) -> Self { let producer_config = BenchmarkProducerConfig { producer_id: actor_id, @@ -99,6 +101,7 @@ impl TypedBenchmarkProducingConsumer { limit_bytes_per_second, producer_config, consumer_config, + telemetry, )) } else { let producer = @@ -115,6 +118,7 @@ impl TypedBenchmarkProducingConsumer { limit_bytes_per_second, producer_config, consumer_config, + telemetry, )) } } diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs index 7d475aa0..0f158eca 100644 --- a/core/bench/src/args/common.rs +++ b/core/bench/src/args/common.rs @@ -16,6 +16,8 @@ * under the License. */ +use crate::telemetry::TelemetryConfig; + use super::kind::BenchmarkKindCommand; use super::output::BenchmarkOutputCommand; use super::props::{BenchmarkKindProps, BenchmarkTransportProps}; @@ -340,6 +342,39 @@ impl IggyBenchArgs { }) } + pub fn otel_config(&self) -> Option<TelemetryConfig> { + self.benchmark_kind + .inner() + .transport_command() + .output_command() + .as_ref() + .and_then(|cmd| match cmd { + BenchmarkOutputCommand::Output(args) => { + if args.otel_enabled { + Some(TelemetryConfig { + enabled: true, + endpoint: args.otel_endpoint.clone(), + export_interval: std::time::Duration::from_secs( + args.otel_export_interval_secs, + ), + export_timeout: std::time::Duration::from_secs( + args.otel_export_timeout_secs, + ), + buffer_size: args.otel_buffer_size, + buffer_flush_interval: std::time::Duration::from_millis( + args.otel_buffer_flush_ms, + ), + service_name: "iggy-bench".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + benchmark_id: uuid::Uuid::new_v4().to_string(), + }) + } else { + None + } + } + }) + } + pub fn max_topic_size(&self) -> Option<IggyByteSize> { self.benchmark_kind.inner().max_topic_size() } diff --git a/core/bench/src/args/output.rs b/core/bench/src/args/output.rs index 82e73c4f..d0dd3676 100644 --- a/core/bench/src/args/output.rs +++ b/core/bench/src/args/output.rs @@ -53,4 +53,28 @@ pub struct BenchmarkOutputArgs { /// Open generated charts in browser after benchmark is finished #[arg(long, short = 'c', default_value_t = false)] pub open_charts: bool, + + /// Enable OpenTelemetry metrics export + #[arg(long, default_value_t = false)] + pub otel_enabled: bool, + + /// OpenTelemetry collector endpoint (gRPC) + #[arg(long, default_value = "http://localhost:4317")] + pub otel_endpoint: String, + + /// OpenTelemetry metrics export interval in seconds + #[arg(long, default_value_t = 1)] + pub otel_export_interval_secs: u64, + + /// OpenTelemetry metrics export timeout in seconds + #[arg(long, default_value_t = 10)] + pub otel_export_timeout_secs: u64, + + /// Metrics buffer size (number of events) + #[arg(long, default_value_t = 10000)] + pub otel_buffer_size: usize, + + /// Metrics buffer flush interval in milliseconds + #[arg(long, default_value_t = 100)] + pub otel_buffer_flush_ms: u64, } diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs b/core/bench/src/benchmarks/balanced_consumer_group.rs index ac60fc6f..9329b54c 100644 --- a/core/bench/src/benchmarks/balanced_consumer_group.rs +++ b/core/bench/src/benchmarks/balanced_consumer_group.rs @@ -20,6 +20,7 @@ use super::benchmark::Benchmarkable; use crate::{ args::common::IggyBenchArgs, benchmarks::common::{build_consumer_futures, init_consumer_groups}, + telemetry::{MetricsHandle, TelemetryContext}, }; use async_trait::async_trait; use bench_report::{benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics}; @@ -32,13 +33,36 @@ use tracing::info; pub struct BalancedConsumerGroupBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl BalancedConsumerGroupBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let consumers = args.consumers(); + let cg_count = args.number_of_consumer_groups(); + let start_stream_id = args.start_stream_id(); + + (1..=consumers) + .map(|consumer_id| { + let stream_id = if cg_count > 0 { + start_stream_id + 1 + (consumer_id % cg_count) + } else { + start_stream_id + consumer_id + }; + ctx.create_handle("consumer", consumer_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -55,7 +79,7 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark { init_consumer_groups(cf, &args).await?; - let consumer_futures = build_consumer_futures(cf, &args); + let consumer_futures = build_consumer_futures(cf, &args, &self.telemetry_handles); for fut in consumer_futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/balanced_producer.rs b/core/bench/src/benchmarks/balanced_producer.rs index 6e81ab5e..3353eadf 100644 --- a/core/bench/src/benchmarks/balanced_producer.rs +++ b/core/bench/src/benchmarks/balanced_producer.rs @@ -19,6 +19,7 @@ use super::benchmark::Benchmarkable; use crate::args::common::IggyBenchArgs; use crate::benchmarks::common::build_producer_futures; +use crate::telemetry::{MetricsHandle, TelemetryContext}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -31,13 +32,31 @@ use tracing::info; pub struct BalancedProducerBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl BalancedProducerBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let producers = args.producers(); + let streams = args.streams(); + let start_stream_id = args.start_stream_id(); + (1..=producers) + .map(|producer_id| { + let stream_id = start_stream_id + 1 + (producer_id % streams); + ctx.create_handle("producer", producer_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -50,7 +69,7 @@ impl Benchmarkable for BalancedProducerBenchmark { self.init_streams().await?; let cf = &self.client_factory; let args = self.args.clone(); - let producer_futures = build_producer_futures(cf, &args); + let producer_futures = build_producer_futures(cf, &args, &self.telemetry_handles); let mut tasks = JoinSet::new(); for fut in producer_futures { diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs index 0eab5db8..d44569c1 100644 --- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs +++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs @@ -20,6 +20,7 @@ use super::benchmark::Benchmarkable; use crate::{ args::common::IggyBenchArgs, benchmarks::common::{build_consumer_futures, build_producer_futures, init_consumer_groups}, + telemetry::{MetricsHandle, TelemetryContext}, }; use async_trait::async_trait; use bench_report::{benchmark_kind::BenchmarkKind, individual_metrics::BenchmarkIndividualMetrics}; @@ -32,13 +33,54 @@ use tracing::info; pub struct BalancedProducerAndConsumerGroupBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + producer_telemetry_handles: Vec<MetricsHandle>, + // TODO: Use this field when consumer telemetry is implemented + #[allow(dead_code)] + consumer_telemetry_handles: Vec<MetricsHandle>, } impl BalancedProducerAndConsumerGroupBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let (producer_telemetry_handles, consumer_telemetry_handles) = telemetry.map_or_else( + || (Vec::new(), Vec::new()), + |ctx| { + let producers = args.producers(); + let consumers = args.consumers(); + let streams = args.streams(); + let cg_count = args.number_of_consumer_groups(); + let start_stream_id = args.start_stream_id(); + + let producer_handles = (1..=producers) + .map(|producer_id| { + let stream_id = start_stream_id + 1 + (producer_id % streams); + ctx.create_handle("producer", producer_id, stream_id) + }) + .collect(); + + let consumer_handles = (1..=consumers) + .map(|consumer_id| { + let stream_id = if cg_count > 0 { + start_stream_id + 1 + (consumer_id % cg_count) + } else { + start_stream_id + consumer_id + }; + ctx.create_handle("consumer", consumer_id, stream_id) + }) + .collect(); + + (producer_handles, consumer_handles) + }, + ); + Self { args, client_factory, + producer_telemetry_handles, + consumer_telemetry_handles, } } } @@ -55,8 +97,8 @@ impl Benchmarkable for BalancedProducerAndConsumerGroupBenchmark { init_consumer_groups(cf, &args).await?; - let producer_futures = build_producer_futures(cf, &args); - let consumer_futures = build_consumer_futures(cf, &args); + let producer_futures = build_producer_futures(cf, &args, &self.producer_telemetry_handles); + let consumer_futures = build_consumer_futures(cf, &args, &self.consumer_telemetry_handles); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/benchmarks/benchmark.rs b/core/bench/src/benchmarks/benchmark.rs index 9b96a1b8..25a63aac 100644 --- a/core/bench/src/benchmarks/benchmark.rs +++ b/core/bench/src/benchmarks/benchmark.rs @@ -17,6 +17,7 @@ */ use crate::args::kind::BenchmarkKindCommand; +use crate::telemetry::TelemetryContext; use crate::{args::common::IggyBenchArgs, utils::client_factory::create_client_factory}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; @@ -37,43 +38,58 @@ use super::pinned_consumer::PinnedConsumerBenchmark; use super::pinned_producer::PinnedProducerBenchmark; use super::pinned_producer_and_consumer::PinnedProducerAndConsumerBenchmark; -impl From<IggyBenchArgs> for Box<dyn Benchmarkable> { - fn from(args: IggyBenchArgs) -> Self { +impl From<(IggyBenchArgs, Option<&TelemetryContext>)> for Box<dyn Benchmarkable> { + fn from( + (args, telemetry): (IggyBenchArgs, Option<&crate::telemetry::TelemetryContext>), + ) -> Self { let client_factory = create_client_factory(&args); match args.benchmark_kind { - BenchmarkKindCommand::PinnedProducer(_) => { - Box::new(PinnedProducerBenchmark::new(Arc::new(args), client_factory)) - } + BenchmarkKindCommand::PinnedProducer(_) => Box::new(PinnedProducerBenchmark::new( + Arc::new(args), + client_factory, + telemetry, + )), - BenchmarkKindCommand::PinnedConsumer(_) => { - Box::new(PinnedConsumerBenchmark::new(Arc::new(args), client_factory)) - } + BenchmarkKindCommand::PinnedConsumer(_) => Box::new(PinnedConsumerBenchmark::new( + Arc::new(args), + client_factory, + telemetry, + )), BenchmarkKindCommand::PinnedProducerAndConsumer(_) => Box::new( - PinnedProducerAndConsumerBenchmark::new(Arc::new(args), client_factory), + PinnedProducerAndConsumerBenchmark::new(Arc::new(args), client_factory, telemetry), ), BenchmarkKindCommand::BalancedProducer(_) => Box::new(BalancedProducerBenchmark::new( Arc::new(args), client_factory, + telemetry, )), BenchmarkKindCommand::BalancedConsumerGroup(_) => Box::new( - BalancedConsumerGroupBenchmark::new(Arc::new(args), client_factory), + BalancedConsumerGroupBenchmark::new(Arc::new(args), client_factory, telemetry), ), - BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => Box::new( - BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args), client_factory), - ), + BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => { + Box::new(BalancedProducerAndConsumerGroupBenchmark::new( + Arc::new(args), + client_factory, + telemetry, + )) + } BenchmarkKindCommand::EndToEndProducingConsumer(_) => Box::new( - EndToEndProducingConsumerBenchmark::new(Arc::new(args), client_factory), + EndToEndProducingConsumerBenchmark::new(Arc::new(args), client_factory, telemetry), ), - BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => Box::new( - EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), client_factory), - ), + BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => { + Box::new(EndToEndProducingConsumerGroupBenchmark::new( + Arc::new(args), + client_factory, + telemetry, + )) + } BenchmarkKindCommand::Examples => { unreachable!("Examples should be handled before this point") } diff --git a/core/bench/src/benchmarks/common.rs b/core/bench/src/benchmarks/common.rs index d5436b56..d44d46cb 100644 --- a/core/bench/src/benchmarks/common.rs +++ b/core/bench/src/benchmarks/common.rs @@ -114,6 +114,7 @@ pub async fn init_consumer_groups( pub fn build_producer_futures( client_factory: &Arc<dyn ClientFactory>, args: &IggyBenchArgs, + telemetry_handles: &[crate::telemetry::MetricsHandle], ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { let streams = args.streams(); let partitions = args.number_of_partitions(); @@ -143,6 +144,10 @@ pub fn build_producer_futures( let stream_id = start_stream_id + 1 + (producer_id % streams); + let metrics_handle = telemetry_handles + .get((producer_id as usize).saturating_sub(1)) + .cloned(); + async move { let producer = TypedBenchmarkProducer::new( use_high_level_api, @@ -158,6 +163,7 @@ pub fn build_producer_futures( sampling_time, moving_average_window, rate_limit, + metrics_handle, ); producer.run().await } @@ -168,6 +174,7 @@ pub fn build_producer_futures( pub fn build_consumer_futures( client_factory: &Arc<dyn ClientFactory>, args: &IggyBenchArgs, + telemetry_handles: &[crate::telemetry::MetricsHandle], ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { let start_stream_id = args.start_stream_id(); let cg_count = args.number_of_consumer_groups(); @@ -215,6 +222,10 @@ pub fn build_consumer_futures( None }; + let metrics_handle = telemetry_handles + .get((consumer_id as usize).saturating_sub(1)) + .cloned(); + async move { let consumer = TypedBenchmarkConsumer::new( use_high_level_api, @@ -231,6 +242,7 @@ pub fn build_consumer_futures( polling_kind, rate_limit, origin_timestamp_latency_calculation, + metrics_handle, ); consumer.run().await } @@ -242,7 +254,8 @@ pub fn build_consumer_futures( pub fn build_producing_consumers_futures( client_factory: Arc<dyn ClientFactory>, args: Arc<IggyBenchArgs>, -) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send> { + telemetry_handles: &[crate::telemetry::MetricsHandle], +) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { let producing_consumers = args.producers(); let streams = args.streams(); let partitions = args.number_of_partitions(); @@ -277,6 +290,10 @@ pub fn build_producing_consumers_futures( let use_high_level_api = args.high_level_api(); let rate_limit = rate_limit_per_actor(args.rate_limit(), producing_consumers); + let metrics_handle = telemetry_handles + .get((actor_id as usize).saturating_sub(1)) + .cloned(); + async move { info!( "Executing producing consumer #{}, stream_id={}", @@ -300,6 +317,7 @@ pub fn build_producing_consumers_futures( rate_limit, polling_kind, origin_timestamp_latency_calculation, + metrics_handle, ); actor.run().await } @@ -311,7 +329,8 @@ pub fn build_producing_consumers_futures( pub fn build_producing_consumer_groups_futures( client_factory: Arc<dyn ClientFactory>, args: Arc<IggyBenchArgs>, -) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send> { + telemetry_handles: &[crate::telemetry::MetricsHandle], +) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + Send + use<>> { let producers = args.producers(); let consumers = args.consumers(); let total_actors = producers.max(consumers); @@ -371,6 +390,10 @@ pub fn build_producing_consumer_groups_futures( _ => unreachable!(), }; + let metrics_handle = telemetry_handles + .get((actor_id as usize).saturating_sub(1)) + .cloned(); + async move { let actor_type = match (should_produce, should_consume) { (true, true) => "(producer+consumer)", @@ -408,6 +431,7 @@ pub fn build_producing_consumer_groups_futures( rate_limit, polling_kind, origin_timestamp_latency_calculation, + metrics_handle, ); actor.run().await diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs index 2818fea6..e3f83e3e 100644 --- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs +++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs @@ -18,6 +18,7 @@ use crate::args::common::IggyBenchArgs; use crate::benchmarks::common::build_producing_consumers_futures; +use crate::telemetry::{MetricsHandle, TelemetryContext}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -32,13 +33,31 @@ use super::benchmark::Benchmarkable; pub struct EndToEndProducingConsumerBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl EndToEndProducingConsumerBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let producing_consumers = args.producers(); + let streams = args.streams(); + let start_stream_id = args.start_stream_id(); + (1..=producing_consumers) + .map(|actor_id| { + let stream_id = start_stream_id + 1 + (actor_id % streams); + ctx.create_handle("producing_consumer", actor_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -53,7 +72,7 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark { let args = self.args.clone(); let mut tasks = JoinSet::new(); - let futures = build_producing_consumers_futures(cf, args); + let futures = build_producing_consumers_futures(cf, args, &self.telemetry_handles); for fut in futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs index 2adda4d5..0ae0b78f 100644 --- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs +++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs @@ -18,6 +18,7 @@ use crate::args::common::IggyBenchArgs; use crate::benchmarks::common::{build_producing_consumer_groups_futures, init_consumer_groups}; +use crate::telemetry::{MetricsHandle, TelemetryContext}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -32,13 +33,34 @@ use super::benchmark::Benchmarkable; pub struct EndToEndProducingConsumerGroupBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl EndToEndProducingConsumerGroupBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let producers = args.producers(); + let consumers = args.consumers(); + let total_actors = producers.max(consumers); + let cg_count = args.number_of_consumer_groups(); + let start_stream_id = args.start_stream_id(); + + (1..=total_actors) + .map(|actor_id| { + let stream_id = start_stream_id + 1 + (actor_id % cg_count); + ctx.create_handle("producing_consumer_group", actor_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -55,7 +77,7 @@ impl Benchmarkable for EndToEndProducingConsumerGroupBenchmark { init_consumer_groups(&cf, &args).await?; - let futures = build_producing_consumer_groups_futures(cf, args); + let futures = build_producing_consumer_groups_futures(cf, args, &self.telemetry_handles); for fut in futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/pinned_consumer.rs b/core/bench/src/benchmarks/pinned_consumer.rs index 870df77a..18716274 100644 --- a/core/bench/src/benchmarks/pinned_consumer.rs +++ b/core/bench/src/benchmarks/pinned_consumer.rs @@ -19,6 +19,7 @@ use crate::args::common::IggyBenchArgs; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::build_consumer_futures; +use crate::telemetry::{MetricsHandle, TelemetryContext}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -31,13 +32,30 @@ use tracing::info; pub struct PinnedConsumerBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl PinnedConsumerBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let consumers = args.consumers(); + let start_stream_id = args.start_stream_id(); + (1..=consumers) + .map(|consumer_id| { + let stream_id = start_stream_id + consumer_id; + ctx.create_handle("consumer", consumer_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -52,7 +70,7 @@ impl Benchmarkable for PinnedConsumerBenchmark { let args = self.args.clone(); let mut tasks: JoinSet<_> = JoinSet::new(); - let futures = build_consumer_futures(cf, &args); + let futures = build_consumer_futures(cf, &args, &self.telemetry_handles); for fut in futures { tasks.spawn(fut); } diff --git a/core/bench/src/benchmarks/pinned_producer.rs b/core/bench/src/benchmarks/pinned_producer.rs index 3514e24d..5cfd1ca8 100644 --- a/core/bench/src/benchmarks/pinned_producer.rs +++ b/core/bench/src/benchmarks/pinned_producer.rs @@ -16,9 +16,10 @@ * under the License. */ -use crate::args::common::IggyBenchArgs; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::build_producer_futures; +use crate::telemetry::TelemetryContext; +use crate::{args::common::IggyBenchArgs, telemetry::MetricsHandle}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -31,13 +32,31 @@ use tracing::info; pub struct PinnedProducerBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + telemetry_handles: Vec<MetricsHandle>, } impl PinnedProducerBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| { + let producers = args.producers(); + let streams = args.streams(); + let start_stream_id = args.start_stream_id(); + (1..=producers) + .map(|producer_id| { + let stream_id = start_stream_id + 1 + (producer_id % streams); + ctx.create_handle("producer", producer_id, stream_id) + }) + .collect() + }); + Self { args, client_factory, + telemetry_handles, } } } @@ -52,7 +71,8 @@ impl Benchmarkable for PinnedProducerBenchmark { let args = self.args.clone(); let mut tasks = JoinSet::new(); - let producer_futures = build_producer_futures(client_factory, &args); + let producer_futures = + build_producer_futures(client_factory, &args, &self.telemetry_handles); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs index cde6632c..7f795102 100644 --- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs +++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs @@ -18,6 +18,7 @@ use crate::args::common::IggyBenchArgs; use crate::benchmarks::benchmark::Benchmarkable; use crate::benchmarks::common::{build_consumer_futures, build_producer_futures}; +use crate::telemetry::{MetricsHandle, TelemetryContext}; use async_trait::async_trait; use bench_report::benchmark_kind::BenchmarkKind; use bench_report::individual_metrics::BenchmarkIndividualMetrics; @@ -30,13 +31,49 @@ use tracing::info; pub struct PinnedProducerAndConsumerBenchmark { args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>, + producer_telemetry_handles: Vec<MetricsHandle>, + // TODO: Use this field when consumer telemetry is implemented + #[allow(dead_code)] + consumer_telemetry_handles: Vec<MetricsHandle>, } impl PinnedProducerAndConsumerBenchmark { - pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn ClientFactory>) -> Self { + pub fn new( + args: Arc<IggyBenchArgs>, + client_factory: Arc<dyn ClientFactory>, + telemetry: Option<&TelemetryContext>, + ) -> Self { + let (producer_telemetry_handles, consumer_telemetry_handles) = telemetry.map_or_else( + || (Vec::new(), Vec::new()), + |ctx| { + let producers = args.producers(); + let consumers = args.consumers(); + let streams = args.streams(); + let start_stream_id = args.start_stream_id(); + + let producer_handles = (1..=producers) + .map(|producer_id| { + let stream_id = start_stream_id + 1 + (producer_id % streams); + ctx.create_handle("producer", producer_id, stream_id) + }) + .collect(); + + let consumer_handles = (1..=consumers) + .map(|consumer_id| { + let stream_id = start_stream_id + consumer_id; + ctx.create_handle("consumer", consumer_id, stream_id) + }) + .collect(); + + (producer_handles, consumer_handles) + }, + ); + Self { args, client_factory, + producer_telemetry_handles, + consumer_telemetry_handles, } } } @@ -51,8 +88,8 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark { let args = self.args.clone(); let mut tasks = JoinSet::new(); - let producer_futures = build_producer_futures(cf, &args); - let consumer_futures = build_consumer_futures(cf, &args); + let producer_futures = build_producer_futures(cf, &args, &self.producer_telemetry_handles); + let consumer_futures = build_consumer_futures(cf, &args, &self.consumer_telemetry_handles); for fut in producer_futures { tasks.spawn(fut); diff --git a/core/bench/src/main.rs b/core/bench/src/main.rs index 77d7d66b..289d199a 100644 --- a/core/bench/src/main.rs +++ b/core/bench/src/main.rs @@ -22,6 +22,7 @@ mod args; mod benchmarks; mod plot; mod runner; +mod telemetry; mod utils; use crate::{args::common::IggyBenchArgs, runner::BenchmarkRunner}; diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs index ab9553b3..1885e5cc 100644 --- a/core/bench/src/runner.rs +++ b/core/bench/src/runner.rs @@ -20,6 +20,7 @@ use crate::analytics::report_builder::BenchmarkReportBuilder; use crate::args::common::IggyBenchArgs; use crate::benchmarks::benchmark::Benchmarkable; use crate::plot::{ChartType, plot_chart}; +use crate::telemetry::setup::TelemetryContext; use crate::utils::cpu_name::append_cpu_name_lowercase; use crate::utils::server_starter::start_server_if_needed; use crate::utils::{collect_server_logs_and_save_to_file, params_from_args_and_metrics}; @@ -34,6 +35,7 @@ use tracing::{error, info}; pub struct BenchmarkRunner { pub args: Option<IggyBenchArgs>, pub test_server: Option<TestServer>, + pub telemetry: Option<TelemetryContext>, } impl BenchmarkRunner { @@ -41,6 +43,7 @@ impl BenchmarkRunner { Self { args: Some(args), test_server: None, + telemetry: None, } } @@ -48,13 +51,32 @@ impl BenchmarkRunner { pub async fn run(mut self) -> Result<(), IggyError> { let args = self.args.take().unwrap(); let should_open_charts = args.open_charts(); + + // Initialize OpenTelemetry if configured + if let Some(otel_config) = args.otel_config() { + match TelemetryContext::init(otel_config).await { + Ok(ctx) => { + self.telemetry = ctx; + if self.telemetry.is_some() { + info!("OpenTelemetry metrics streaming enabled and connected"); + } + } + Err(e) => { + error!("Failed to initialize OpenTelemetry: {}", e); + error!("Benchmark cannot proceed without OpenTelemetry connection"); + error!("Please ensure the OpenTelemetry collector is running and accessible"); + return Err(IggyError::CannotEstablishConnection); + } + } + } + self.test_server = start_server_if_needed(&args).await; let transport = args.transport(); let server_addr = args.server_address(); info!("Starting to benchmark: {transport} with server: {server_addr}",); - let mut benchmark: Box<dyn Benchmarkable> = args.into(); + let mut benchmark: Box<dyn Benchmarkable> = (args, self.telemetry.as_ref()).into(); benchmark.print_info(); let mut join_handles = benchmark.run().await?; @@ -131,6 +153,10 @@ impl BenchmarkRunner { })?; } + if let Some(telemetry) = self.telemetry.take() { + telemetry.shutdown().await; + } + Ok(()) } } diff --git a/core/bench/src/telemetry/buffer.rs b/core/bench/src/telemetry/buffer.rs new file mode 100644 index 00000000..d9be5280 --- /dev/null +++ b/core/bench/src/telemetry/buffer.rs @@ -0,0 +1,227 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use opentelemetry::KeyValue; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tracing::warn; + +/// Event types for metrics +#[derive(Clone, Debug)] +pub enum EventType { + BatchSent { + messages: u64, + bytes: u64, + latency_us: f64, + }, + BatchReceived { + messages: u64, + bytes: u64, + latency_us: f64, + }, + Throughput { + messages_per_sec: f64, + mb_per_sec: f64, + }, +} + +/// Single metric event to be processed +#[derive(Clone, Debug)] +pub struct MetricEvent { + pub timestamp: Instant, + pub event_type: EventType, + pub labels: Vec<KeyValue>, +} + +/// Lock-free metrics buffer using channels +/// This avoids blocking the benchmark threads +pub struct MetricsBuffer { + sender: mpsc::UnboundedSender<MetricEvent>, + dropped_count: Arc<AtomicU64>, + buffer_size: Arc<AtomicUsize>, +} + +impl MetricsBuffer { + pub fn new(batch_size: usize, flush_interval: Duration) -> (Self, MetricsProcessor) { + let (sender, receiver) = mpsc::unbounded_channel(); + + let buffer = Self { + sender, + dropped_count: Arc::new(AtomicU64::new(0)), + buffer_size: Arc::new(AtomicUsize::new(0)), + }; + + let processor = MetricsProcessor { + receiver, + batch_size, + flush_interval, + batch: Vec::with_capacity(batch_size), + last_flush: Instant::now(), + buffer_size: buffer.buffer_size.clone(), + }; + + (buffer, processor) + } + + /// Push a metric event to the buffer (non-blocking) + pub fn push(&self, event: MetricEvent) { + // Try to send, but don't block if buffer is full + match self.sender.send(event) { + Ok(()) => { + self.buffer_size.fetch_add(1, Ordering::Relaxed); + } + Err(_) => { + // Channel is full, increment dropped counter + self.dropped_count.fetch_add(1, Ordering::Relaxed); + } + } + } + + pub fn dropped_events(&self) -> u64 { + self.dropped_count.load(Ordering::Relaxed) + } + + pub fn buffer_size(&self) -> usize { + self.buffer_size.load(Ordering::Relaxed) + } +} + +/// Processes buffered metrics asynchronously +pub struct MetricsProcessor { + receiver: mpsc::UnboundedReceiver<MetricEvent>, + batch_size: usize, + flush_interval: Duration, + batch: Vec<MetricEvent>, + last_flush: Instant, + buffer_size: Arc<AtomicUsize>, +} + +impl MetricsProcessor { + /// Process metrics in a background task + pub async fn run(mut self, metrics: Arc<super::BenchMetrics>) { + loop { + // Check if we should flush based on time + let should_flush_time = self.last_flush.elapsed() >= self.flush_interval; + + // Try to receive events with a timeout + let timeout = if should_flush_time { + Duration::from_millis(1) + } else { + self.flush_interval - self.last_flush.elapsed() + }; + + match tokio::time::timeout(timeout, self.receiver.recv()).await { + Ok(Some(event)) => { + self.batch.push(event); + // Decrement buffer size since we've received the event + self.buffer_size.fetch_sub(1, Ordering::Relaxed); + + // Flush if batch is full + if self.batch.len() >= self.batch_size { + self.flush_batch(&metrics); + } + } + Ok(None) => { + // Channel closed, flush remaining and exit + if !self.batch.is_empty() { + self.flush_batch(&metrics); + } + break; + } + Err(_) => { + // Timeout - flush if we have data + if !self.batch.is_empty() && should_flush_time { + self.flush_batch(&metrics); + } + } + } + } + } + + fn flush_batch(&mut self, metrics: &super::BenchMetrics) { + // Process all events in the batch + for event in self.batch.drain(..) { + match event.event_type { + EventType::BatchSent { + messages, + bytes, + latency_us, + } => { + metrics.messages_sent.add(messages, &event.labels); + metrics.bytes_sent.add(bytes, &event.labels); + metrics.batches_sent.add(1, &event.labels); + metrics.send_latency.record(latency_us, &event.labels); + metrics.batch_latency.record(latency_us, &event.labels); + } + EventType::BatchReceived { + messages, + bytes, + latency_us, + } => { + metrics.messages_received.add(messages, &event.labels); + metrics.bytes_received.add(bytes, &event.labels); + metrics.batches_received.add(1, &event.labels); + metrics.receive_latency.record(latency_us, &event.labels); + } + EventType::Throughput { + messages_per_sec, + mb_per_sec, + } => { + metrics + .throughput_messages_per_sec + .record(messages_per_sec, &event.labels); + metrics + .throughput_mb_per_sec + .record(mb_per_sec, &event.labels); + } + } + } + + self.last_flush = Instant::now(); + } +} + +/// Periodic reporter for buffer stats +pub struct BufferStatsReporter { + buffer: Arc<MetricsBuffer>, + interval: Duration, +} + +impl BufferStatsReporter { + pub const fn new(buffer: Arc<MetricsBuffer>, interval: Duration) -> Self { + Self { buffer, interval } + } + + pub async fn run(self) { + let mut interval = tokio::time::interval(self.interval); + loop { + interval.tick().await; + + let dropped = self.buffer.dropped_events(); + if dropped > 0 { + warn!( + "Metrics buffer dropped {} events (buffer size: {})", + dropped, + self.buffer.buffer_size() + ); + } + } + } +} diff --git a/core/bench/src/telemetry/mod.rs b/core/bench/src/telemetry/mod.rs new file mode 100644 index 00000000..a64499cf --- /dev/null +++ b/core/bench/src/telemetry/mod.rs @@ -0,0 +1,238 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +pub mod buffer; +pub mod setup; + +pub use setup::{TelemetryConfig, TelemetryContext}; + +use opentelemetry::KeyValue; +use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter}; +use std::sync::Arc; +use std::time::Instant; + +/// Container for all OpenTelemetry metrics instruments +pub struct BenchMetrics { + // Counters + pub messages_sent: Counter<u64>, + pub messages_received: Counter<u64>, + pub bytes_sent: Counter<u64>, + pub bytes_received: Counter<u64>, + pub batches_sent: Counter<u64>, + pub batches_received: Counter<u64>, + + // Histograms for latency distribution + pub send_latency: Histogram<f64>, + pub receive_latency: Histogram<f64>, + pub batch_latency: Histogram<f64>, + pub end_to_end_latency: Histogram<f64>, + + // Gauges for current state + pub active_producers: Gauge<u64>, + pub active_consumers: Gauge<u64>, + pub throughput_messages_per_sec: Gauge<f64>, + pub throughput_mb_per_sec: Gauge<f64>, + pub current_rate_limit_mb_per_sec: Gauge<f64>, + + // Message size histograms + pub message_size: Histogram<u64>, + pub batch_size: Histogram<u64>, +} + +impl BenchMetrics { + pub fn new(meter: &Meter) -> Self { + Self { + // Counters + messages_sent: meter + .u64_counter("iggy.bench.messages.sent") + .with_description("Total number of messages sent") + .with_unit("messages") + .build(), + + messages_received: meter + .u64_counter("iggy.bench.messages.received") + .with_description("Total number of messages received") + .with_unit("messages") + .build(), + + bytes_sent: meter + .u64_counter("iggy.bench.bytes.sent") + .with_description("Total bytes sent") + .with_unit("bytes") + .build(), + + bytes_received: meter + .u64_counter("iggy.bench.bytes.received") + .with_description("Total bytes received") + .with_unit("bytes") + .build(), + + batches_sent: meter + .u64_counter("iggy.bench.batches.sent") + .with_description("Total number of batches sent") + .with_unit("batches") + .build(), + + batches_received: meter + .u64_counter("iggy.bench.batches.received") + .with_description("Total number of batches received") + .with_unit("batches") + .build(), + + // Histograms + send_latency: meter + .f64_histogram("iggy.bench.latency.send") + .with_description("Send operation latency") + .with_unit("microseconds") + .build(), + + receive_latency: meter + .f64_histogram("iggy.bench.latency.receive") + .with_description("Receive operation latency") + .with_unit("microseconds") + .build(), + + batch_latency: meter + .f64_histogram("iggy.bench.latency.batch") + .with_description("Batch processing latency") + .with_unit("microseconds") + .build(), + + end_to_end_latency: meter + .f64_histogram("iggy.bench.latency.e2e") + .with_description("End-to-end message latency") + .with_unit("microseconds") + .build(), + + // Gauges + active_producers: meter + .u64_gauge("iggy.bench.actors.producers") + .with_description("Number of active producers") + .with_unit("actors") + .build(), + + active_consumers: meter + .u64_gauge("iggy.bench.actors.consumers") + .with_description("Number of active consumers") + .with_unit("actors") + .build(), + + throughput_messages_per_sec: meter + .f64_gauge("iggy.bench.throughput.messages") + .with_description("Current throughput in messages per second") + .with_unit("messages/sec") + .build(), + + throughput_mb_per_sec: meter + .f64_gauge("iggy.bench.throughput.bytes") + .with_description("Current throughput in MB per second") + .with_unit("MB/sec") + .build(), + + current_rate_limit_mb_per_sec: meter + .f64_gauge("iggy.bench.rate_limit") + .with_description("Current rate limit in MB per second") + .with_unit("MB/sec") + .build(), + + // Size histograms + message_size: meter + .u64_histogram("iggy.bench.size.message") + .with_description("Distribution of message sizes") + .with_unit("bytes") + .build(), + + batch_size: meter + .u64_histogram("iggy.bench.size.batch") + .with_description("Distribution of batch sizes") + .with_unit("messages") + .build(), + } + } +} + +/// Lightweight metrics handle for actors to record metrics +#[derive(Clone)] +pub struct MetricsHandle { + // TODO: Will be used for direct metric recording in future optimizations + #[allow(dead_code)] + metrics: Arc<BenchMetrics>, + buffer: Arc<buffer::MetricsBuffer>, + actor_labels: Vec<KeyValue>, +} + +impl MetricsHandle { + pub fn new( + metrics: Arc<BenchMetrics>, + buffer: Arc<buffer::MetricsBuffer>, + actor_type: &str, + actor_id: u32, + stream_id: u32, + ) -> Self { + let actor_labels = vec![ + KeyValue::new("actor_type", actor_type.to_string()), + KeyValue::new("actor_id", i64::from(actor_id)), + KeyValue::new("stream_id", i64::from(stream_id)), + ]; + + Self { + metrics, + buffer, + actor_labels, + } + } + + /// Record a batch send operation + pub fn record_batch_sent(&self, messages: u64, bytes: u64, latency_us: f64) { + // Buffer the raw data for async processing + self.buffer.push(buffer::MetricEvent { + timestamp: Instant::now(), + event_type: buffer::EventType::BatchSent { + messages, + bytes, + latency_us, + }, + labels: self.actor_labels.clone(), + }); + } + + /// Record a batch receive operation + pub fn record_batch_received(&self, messages: u64, bytes: u64, latency_us: f64) { + self.buffer.push(buffer::MetricEvent { + timestamp: Instant::now(), + event_type: buffer::EventType::BatchReceived { + messages, + bytes, + latency_us, + }, + labels: self.actor_labels.clone(), + }); + } + + /// Record throughput measurement + pub fn record_throughput(&self, messages_per_sec: f64, mb_per_sec: f64) { + self.buffer.push(buffer::MetricEvent { + timestamp: Instant::now(), + event_type: buffer::EventType::Throughput { + messages_per_sec, + mb_per_sec, + }, + labels: self.actor_labels.clone(), + }); + } +} diff --git a/core/bench/src/telemetry/setup.rs b/core/bench/src/telemetry/setup.rs new file mode 100644 index 00000000..deaee061 --- /dev/null +++ b/core/bench/src/telemetry/setup.rs @@ -0,0 +1,302 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{BenchMetrics, MetricsHandle, buffer}; +use opentelemetry::{KeyValue, global}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + Resource, + metrics::{PeriodicReader, SdkMeterProvider}, +}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::oneshot; +use tracing::{error, info, warn}; + +pub struct TelemetryConfig { + pub enabled: bool, + pub endpoint: String, + pub export_interval: Duration, + pub export_timeout: Duration, + pub buffer_size: usize, + pub buffer_flush_interval: Duration, + pub service_name: String, + pub service_version: String, + pub benchmark_id: String, +} + +impl Default for TelemetryConfig { + fn default() -> Self { + Self { + enabled: false, + endpoint: "http://localhost:4317".to_string(), + export_interval: Duration::from_secs(1), + export_timeout: Duration::from_secs(10), + buffer_size: 10000, + buffer_flush_interval: Duration::from_millis(100), + service_name: "iggy-bench".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + benchmark_id: String::new(), // Will be set when creating config + } + } +} + +pub struct TelemetryContext { + pub metrics: Arc<BenchMetrics>, + pub buffer: Arc<buffer::MetricsBuffer>, + processor_handle: Option<tokio::task::JoinHandle<()>>, + stats_handle: Option<tokio::task::JoinHandle<()>>, + meter_provider: Arc<Mutex<Option<SdkMeterProvider>>>, + shutdown_tx: Option<oneshot::Sender<()>>, +} + +impl TelemetryContext { + // Async for API consistency with future async operations + #[allow(clippy::unused_async)] + #[allow(clippy::cognitive_complexity)] + pub async fn init(config: TelemetryConfig) -> Result<Option<Self>, Box<dyn std::error::Error>> { + if !config.enabled { + info!("OpenTelemetry metrics disabled"); + return Ok(None); + } + + info!( + "Initializing OpenTelemetry metrics with endpoint: {}", + config.endpoint + ); + + // Verify connection to OTLP endpoint with retries (up to 10 seconds) + Self::verify_connection_with_retry(&config.endpoint, 10, Duration::from_secs(1)).await?; + + // Create resource with benchmark metadata + let resource = Resource::builder() + .with_attribute(KeyValue::new("service.name", config.service_name.clone())) + .with_attribute(KeyValue::new( + "service.version", + config.service_version.clone(), + )) + .with_attribute(KeyValue::new("benchmark.id", config.benchmark_id.clone())) + .with_attribute(KeyValue::new("telemetry.sdk.name", "opentelemetry")) + .with_attribute(KeyValue::new("telemetry.sdk.language", "rust")) + .build(); + + // Configure OTLP exporter + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(config.endpoint.clone()) + .with_timeout(config.export_timeout) + .build()?; + + // Create periodic reader for metrics export + let reader = PeriodicReader::builder(exporter) + .with_interval(config.export_interval) + .build(); + + // Build meter provider + let meter_provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(resource) + .build(); + + // Set as global provider + global::set_meter_provider(meter_provider.clone()); + + // Get meter and create metrics + let meter = global::meter("iggy-bench"); + let metrics = Arc::new(BenchMetrics::new(&meter)); + + // Create metrics buffer + let (buffer, processor) = + buffer::MetricsBuffer::new(config.buffer_size, config.buffer_flush_interval); + let buffer = Arc::new(buffer); + + // Create shutdown channel + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + + // Start processor task with graceful shutdown + let metrics_clone = metrics.clone(); + let processor_handle = tokio::spawn(async move { + processor.run(metrics_clone).await; + }); + + // Start buffer stats reporter with graceful shutdown + let buffer_clone = buffer.clone(); + let stats_handle = tokio::spawn(async move { + let reporter = buffer::BufferStatsReporter::new(buffer_clone, Duration::from_secs(10)); + tokio::select! { + _ = reporter.run() => {}, + _ = &mut shutdown_rx => { + info!("Stats reporter received shutdown signal"); + } + } + }); + + info!("OpenTelemetry metrics initialized successfully"); + + Ok(Some(Self { + metrics, + buffer, + processor_handle: Some(processor_handle), + stats_handle: Some(stats_handle), + meter_provider: Arc::new(Mutex::new(Some(meter_provider))), + shutdown_tx: Some(shutdown_tx), + })) + } + + pub fn create_handle(&self, actor_type: &str, actor_id: u32, stream_id: u32) -> MetricsHandle { + MetricsHandle::new( + self.metrics.clone(), + self.buffer.clone(), + actor_type, + actor_id, + stream_id, + ) + } + + #[allow(clippy::cognitive_complexity)] + pub async fn shutdown(mut self) { + info!("Shutting down OpenTelemetry metrics"); + + // Signal shutdown to background tasks + if let Some(tx) = self.shutdown_tx.take() { + let _ = tx.send(()); + } + + // First, close the buffer to signal no more events + // This is done by dropping the buffer which drops the sender + drop(self.buffer); + + // Wait for processor to finish processing remaining events + if let Some(handle) = self.processor_handle.take() { + match tokio::time::timeout(Duration::from_secs(5), handle).await { + Ok(Ok(())) => { + info!("Metrics processor shut down cleanly"); + } + Ok(Err(e)) => { + error!("Metrics processor task failed: {}", e); + } + Err(_) => { + warn!("Metrics processor shutdown timed out after 5 seconds"); + } + } + } + + // Wait for stats reporter to finish + if let Some(handle) = self.stats_handle.take() { + // Stats reporter should exit quickly due to shutdown signal + match tokio::time::timeout(Duration::from_secs(1), handle).await { + Ok(Ok(())) => { + info!("Stats reporter shut down cleanly"); + } + Ok(Err(e)) if !e.is_cancelled() => { + error!("Stats reporter task failed: {}", e); + } + _ => { + // Task was cancelled or timed out, which is fine + } + } + } + + // Shutdown meter provider with mutex protection + let mut provider_guard = self.meter_provider.lock().unwrap(); + if let Some(provider) = provider_guard.take() { + // Force a final flush before shutdown + if let Err(e) = provider.force_flush() { + error!("Error flushing meter provider: {}", e); + } + + // Shutdown the provider + if let Err(e) = provider.shutdown() { + error!("Error shutting down meter provider: {}", e); + } + + // Clear the global meter provider + // Note: OpenTelemetry doesn't provide a way to unset the global provider, + // but shutting it down ensures it won't accept new metrics + } + drop(provider_guard); + + info!("OpenTelemetry metrics shutdown complete"); + } + + /// Verify that we can connect to the OTLP endpoint + async fn verify_connection_with_retry( + endpoint: &str, + max_attempts: u32, + retry_delay: Duration, + ) -> Result<(), Box<dyn std::error::Error>> { + info!("Verifying connection to OTLP endpoint: {}", endpoint); + + for attempt in 1..=max_attempts { + match Self::test_otlp_connection(endpoint).await { + Ok(()) => { + info!("Successfully connected to OTLP endpoint"); + return Ok(()); + } + Err(e) if attempt < max_attempts => { + warn!( + "Failed to connect to OTLP endpoint (attempt {}/{}): {}", + attempt, max_attempts, e + ); + tokio::time::sleep(retry_delay).await; + } + Err(e) => { + error!( + "Failed to connect to OTLP endpoint after {} attempts: {}", + max_attempts, e + ); + return Err(format!( + "Cannot connect to OpenTelemetry collector at {}. \ + Please ensure the collector is running and accessible.", + endpoint + ) + .into()); + } + } + } + unreachable!() + } + + /// Test connection to OTLP endpoint using TCP connection test + async fn test_otlp_connection(endpoint: &str) -> Result<(), Box<dyn std::error::Error>> { + use tokio::net::TcpStream; + use tokio::time::timeout; + + // Parse the endpoint URL to extract host and port + let endpoint = endpoint + .trim_start_matches("http://") + .trim_start_matches("https://"); + + // Default OTLP gRPC port is 4317 + let addr = if endpoint.contains(':') { + endpoint.to_string() + } else { + format!("{}:4317", endpoint) + }; + + // Try to establish TCP connection to verify the endpoint is reachable + match timeout(Duration::from_secs(2), TcpStream::connect(&addr)).await { + Ok(Ok(_)) => { + // Connection successful - endpoint is reachable + Ok(()) + } + Ok(Err(e)) => Err(format!("Cannot connect to OTLP endpoint at {}: {}", addr, e).into()), + Err(_) => Err(format!("Connection to OTLP endpoint at {} timed out", addr).into()), + } + } +} diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3a0cf94e..3213c29b 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -65,9 +65,9 @@ mimalloc = { workspace = true, optional = true } moka = { version = "0.12.10", features = ["future"] } nix = { version = "0.30", features = ["fs"] } once_cell = "1.21.3" -opentelemetry = { version = "0.30.0", features = ["trace", "logs"] } -opentelemetry-appender-tracing = { version = "0.30.1", features = ["log"] } -opentelemetry-otlp = { version = "0.30.0", features = [ +opentelemetry = { workspace = true, features = ["trace", "logs"] } +opentelemetry-appender-tracing = { workspace = true, features = ["log"] } +opentelemetry-otlp = { workspace = true, features = [ "logs", "trace", "grpc-tonic", @@ -76,8 +76,8 @@ opentelemetry-otlp = { version = "0.30.0", features = [ "reqwest-client", "tokio", ] } -opentelemetry-semantic-conventions = "0.30.0" -opentelemetry_sdk = { version = "0.30.0", features = [ +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true, features = [ "rt-tokio", "logs", "trace",