This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch bench-telemetry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit ad53d9a19be5611379272b281a39797d08d47bdf
Author: Hubert Gruszecki <h.grusze...@gmail.com>
AuthorDate: Fri Aug 8 15:24:56 2025 +0200

    feat(bench): implement otel telemetry
---
 Cargo.lock                                         |   3 +
 Cargo.toml                                         |   5 +
 core/bench/Cargo.toml                              |   3 +
 .../src/actors/consumer/benchmark_consumer.rs      |  16 ++
 .../actors/consumer/typed_benchmark_consumer.rs    |   4 +
 .../src/actors/producer/benchmark_producer.rs      |  15 +
 .../actors/producer/typed_benchmark_producer.rs    |   4 +
 .../benchmark_producing_consumer.rs                |  26 ++
 .../typed_banchmark_producing_consumer.rs          |   4 +
 core/bench/src/args/common.rs                      |  35 +++
 core/bench/src/args/output.rs                      |  24 ++
 .../src/benchmarks/balanced_consumer_group.rs      |  28 +-
 core/bench/src/benchmarks/balanced_producer.rs     |  23 +-
 .../balanced_producer_and_consumer_group.rs        |  48 +++-
 core/bench/src/benchmarks/benchmark.rs             |  50 ++--
 core/bench/src/benchmarks/common.rs                |  28 +-
 .../benchmarks/end_to_end_producing_consumer.rs    |  23 +-
 .../end_to_end_producing_consumer_group.rs         |  26 +-
 core/bench/src/benchmarks/pinned_consumer.rs       |  22 +-
 core/bench/src/benchmarks/pinned_producer.rs       |  26 +-
 .../src/benchmarks/pinned_producer_and_consumer.rs |  43 ++-
 core/bench/src/main.rs                             |   1 +
 core/bench/src/runner.rs                           |  28 +-
 core/bench/src/telemetry/buffer.rs                 | 227 ++++++++++++++++
 core/bench/src/telemetry/mod.rs                    | 238 ++++++++++++++++
 core/bench/src/telemetry/setup.rs                  | 302 +++++++++++++++++++++
 core/server/Cargo.toml                             |  10 +-
 27 files changed, 1218 insertions(+), 44 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c4bea03c..9c7ecffd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3835,6 +3835,9 @@ dependencies = [
  "iggy",
  "integration",
  "nonzero_lit",
+ "opentelemetry",
+ "opentelemetry-otlp",
+ "opentelemetry_sdk",
  "rand 0.9.2",
  "rayon",
  "serde",
diff --git a/Cargo.toml b/Cargo.toml
index 239e4b03..beba7743 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -98,6 +98,11 @@ humantime = "2.2.0"
 keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
 nonzero_lit = "0.1.2"
 once_cell = "1.21.3"
+opentelemetry = "0.30.0"
+opentelemetry-appender-tracing = "0.30.1"
+opentelemetry-otlp = "0.30.0"
+opentelemetry-semantic-conventions = "0.30.0"
+opentelemetry_sdk = "0.30.0"
 passterm = "=2.0.1"
 quinn = "0.11.8"
 postcard = { version = "1.1.3", features = ["alloc"] }
diff --git a/core/bench/Cargo.toml b/core/bench/Cargo.toml
index 40937ced..75359706 100644
--- a/core/bench/Cargo.toml
+++ b/core/bench/Cargo.toml
@@ -43,6 +43,9 @@ human-repr = { workspace = true }
 iggy = { workspace = true }
 integration = { workspace = true }
 nonzero_lit = { workspace = true }
+opentelemetry = { version = "0.30.0", features = ["metrics"] }
+opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", 
"metrics"] }
+opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio", "metrics"] }
 rand = { workspace = true }
 rayon = "1.10.0"
 serde = { workspace = true }
diff --git a/core/bench/src/actors/consumer/benchmark_consumer.rs 
b/core/bench/src/actors/consumer/benchmark_consumer.rs
index 6181fc49..7b2337f3 100644
--- a/core/bench/src/actors/consumer/benchmark_consumer.rs
+++ b/core/bench/src/actors/consumer/benchmark_consumer.rs
@@ -20,6 +20,7 @@ use crate::actors::consumer::client::BenchmarkConsumerClient;
 use crate::actors::consumer::client::interface::BenchmarkConsumerConfig;
 use crate::analytics::metrics::individual::from_records;
 use crate::analytics::record::BenchmarkRecord;
+use crate::telemetry::MetricsHandle;
 use crate::utils::finish_condition::BenchmarkFinishCondition;
 use crate::utils::rate_limiter::BenchmarkRateLimiter;
 use bench_report::actor_kind::ActorKind;
@@ -41,9 +42,11 @@ pub struct BenchmarkConsumer<C: BenchmarkConsumerClient> {
     pub moving_average_window: u32,
     pub limit_bytes_per_second: Option<IggyByteSize>,
     pub config: BenchmarkConsumerConfig,
+    pub telemetry: Option<MetricsHandle>,
 }
 
 impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> {
+    #[allow(clippy::too_many_arguments)]
     pub const fn new(
         client: C,
         benchmark_kind: BenchmarkKind,
@@ -52,6 +55,7 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> {
         moving_average_window: u32,
         limit_bytes_per_second: Option<IggyByteSize>,
         config: BenchmarkConsumerConfig,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         Self {
             client,
@@ -61,6 +65,7 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> {
             moving_average_window,
             limit_bytes_per_second,
             config,
+            telemetry,
         }
     }
 
@@ -110,6 +115,17 @@ impl<C: BenchmarkConsumerClient> BenchmarkConsumer<C> {
                 total_bytes: bytes_processed,
             });
 
+            // Record metrics to OpenTelemetry if enabled
+            if let Some(ref telemetry) = self.telemetry {
+                #[allow(clippy::cast_precision_loss)]
+                let latency_us = batch.latency.as_micros() as f64;
+                telemetry.record_batch_received(
+                    u64::from(batch.messages),
+                    batch.total_bytes,
+                    latency_us,
+                );
+            }
+
             if let Some(rate_limiter) = &rate_limiter {
                 rate_limiter
                     .wait_until_necessary(batch.user_data_bytes)
diff --git a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs 
b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
index fca492ea..7422b602 100644
--- a/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
+++ b/core/bench/src/actors/consumer/typed_benchmark_consumer.rs
@@ -26,6 +26,7 @@ use crate::{
             low_level::LowLevelConsumerClient,
         },
     },
+    telemetry::MetricsHandle,
     utils::finish_condition::BenchmarkFinishCondition,
 };
 use bench_report::{
@@ -57,6 +58,7 @@ impl TypedBenchmarkConsumer {
         polling_kind: PollingKind,
         limit_bytes_per_second: Option<IggyByteSize>,
         origin_timestamp_latency_calculation: bool,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         let config = BenchmarkConsumerConfig {
             consumer_id,
@@ -77,6 +79,7 @@ impl TypedBenchmarkConsumer {
                 moving_average_window,
                 limit_bytes_per_second,
                 config,
+                telemetry,
             ))
         } else {
             Self::Low(BenchmarkConsumer::new(
@@ -87,6 +90,7 @@ impl TypedBenchmarkConsumer {
                 moving_average_window,
                 limit_bytes_per_second,
                 config,
+                telemetry,
             ))
         }
     }
diff --git a/core/bench/src/actors/producer/benchmark_producer.rs 
b/core/bench/src/actors/producer/benchmark_producer.rs
index 9c9f2a55..df7ed853 100644
--- a/core/bench/src/actors/producer/benchmark_producer.rs
+++ b/core/bench/src/actors/producer/benchmark_producer.rs
@@ -19,6 +19,7 @@
 use crate::{
     actors::producer::client::{BenchmarkProducerClient, 
interface::BenchmarkProducerConfig},
     analytics::{metrics::individual::from_records, record::BenchmarkRecord},
+    telemetry::MetricsHandle,
     utils::{
         batch_generator::BenchmarkBatchGenerator, 
finish_condition::BenchmarkFinishCondition,
         rate_limiter::BenchmarkRateLimiter,
@@ -42,6 +43,7 @@ pub struct BenchmarkProducer<P: BenchmarkProducerClient> {
     pub moving_average_window: u32,
     pub limit_bytes_per_second: Option<IggyByteSize>,
     pub config: BenchmarkProducerConfig,
+    pub telemetry: Option<MetricsHandle>,
 }
 
 impl<P: BenchmarkProducerClient> BenchmarkProducer<P> {
@@ -54,6 +56,7 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> {
         moving_average_window: u32,
         limit_bytes_per_second: Option<IggyByteSize>,
         config: BenchmarkProducerConfig,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         Self {
             client,
@@ -63,6 +66,7 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> {
             moving_average_window,
             limit_bytes_per_second,
             config,
+            telemetry,
         }
     }
 
@@ -126,6 +130,17 @@ impl<P: BenchmarkProducerClient> BenchmarkProducer<P> {
                 total_bytes: total_bytes_processed,
             });
 
+            // Record metrics to OpenTelemetry if enabled
+            if let Some(ref telemetry) = self.telemetry {
+                #[allow(clippy::cast_precision_loss)]
+                let latency_us = batch.latency.as_micros() as f64;
+                telemetry.record_batch_sent(
+                    u64::from(batch.messages),
+                    batch.total_bytes,
+                    latency_us,
+                );
+            }
+
             if let Some(rate_limiter) = &rate_limiter {
                 rate_limiter
                     .wait_until_necessary(batch.user_data_bytes)
diff --git a/core/bench/src/actors/producer/typed_benchmark_producer.rs 
b/core/bench/src/actors/producer/typed_benchmark_producer.rs
index ee2de483..5c53a6cf 100644
--- a/core/bench/src/actors/producer/typed_benchmark_producer.rs
+++ b/core/bench/src/actors/producer/typed_benchmark_producer.rs
@@ -24,6 +24,7 @@ use crate::{
             low_level::LowLevelProducerClient,
         },
     },
+    telemetry::MetricsHandle,
     utils::finish_condition::BenchmarkFinishCondition,
 };
 use bench_report::{
@@ -55,6 +56,7 @@ impl TypedBenchmarkProducer {
         sampling_time: IggyDuration,
         moving_average_window: u32,
         limit_bytes_per_second: Option<IggyByteSize>,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         let config = BenchmarkProducerConfig {
             producer_id,
@@ -75,6 +77,7 @@ impl TypedBenchmarkProducer {
                 moving_average_window,
                 limit_bytes_per_second,
                 config,
+                telemetry,
             ))
         } else {
             let client = LowLevelProducerClient::new(client_factory, 
config.clone());
@@ -86,6 +89,7 @@ impl TypedBenchmarkProducer {
                 moving_average_window,
                 limit_bytes_per_second,
                 config,
+                telemetry,
             ))
         }
     }
diff --git 
a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs 
b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
index 0f5d323a..baa2a92f 100644
--- a/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
+++ b/core/bench/src/actors/producing_consumer/benchmark_producing_consumer.rs
@@ -24,6 +24,7 @@ use crate::{
         producer::client::{BenchmarkProducerClient, 
interface::BenchmarkProducerConfig},
     },
     analytics::{metrics::individual::from_records, record::BenchmarkRecord},
+    telemetry::MetricsHandle,
     utils::{
         batch_generator::BenchmarkBatchGenerator, 
finish_condition::BenchmarkFinishCondition,
         rate_limiter::BenchmarkRateLimiter,
@@ -53,6 +54,7 @@ where
     pub limit_bytes_per_second: Option<IggyByteSize>,
     pub producer_config: BenchmarkProducerConfig,
     pub consumer_config: BenchmarkConsumerConfig,
+    pub telemetry: Option<MetricsHandle>,
 }
 
 impl<P, C> BenchmarkProducingConsumer<P, C>
@@ -72,6 +74,7 @@ where
         limit_bytes_per_second: Option<IggyByteSize>,
         producer_config: BenchmarkProducerConfig,
         consumer_config: BenchmarkConsumerConfig,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         Self {
             producer,
@@ -84,6 +87,7 @@ where
             limit_bytes_per_second,
             producer_config,
             consumer_config,
+            telemetry,
         }
     }
     #[allow(clippy::too_many_lines)]
@@ -155,6 +159,17 @@ where
                     sent_batches += 1;
                     awaiting_reply = is_consumer;
 
+                    // Record producer metrics to OpenTelemetry if enabled
+                    if let Some(ref telemetry) = self.telemetry {
+                        #[allow(clippy::cast_precision_loss)]
+                        let latency_us = batch.latency.as_micros() as f64;
+                        telemetry.record_batch_sent(
+                            u64::from(batch.messages),
+                            batch.total_bytes,
+                            latency_us,
+                        );
+                    }
+
                     if self
                         .send_finish_condition
                         .account_and_check(batch.user_data_bytes)
@@ -193,6 +208,17 @@ where
                         total_bytes: sent_total_bytes + recv_total_bytes,
                     });
 
+                    // Record consumer metrics to OpenTelemetry if enabled
+                    if let Some(ref telemetry) = self.telemetry {
+                        #[allow(clippy::cast_precision_loss)]
+                        let latency_us = batch.latency.as_micros() as f64;
+                        telemetry.record_batch_received(
+                            u64::from(batch.messages),
+                            batch.total_bytes,
+                            latency_us,
+                        );
+                    }
+
                     if let Some(limiter) = &rate_limiter {
                         limiter.wait_until_necessary(rl_value).await;
                         rl_value = 0;
diff --git 
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
 
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
index 26e3e0fb..aec81bc3 100644
--- 
a/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
+++ 
b/core/bench/src/actors/producing_consumer/typed_banchmark_producing_consumer.rs
@@ -30,6 +30,7 @@ use crate::{
         },
         producing_consumer::BenchmarkProducingConsumer,
     },
+    telemetry::MetricsHandle,
     utils::finish_condition::BenchmarkFinishCondition,
 };
 use bench_report::{
@@ -64,6 +65,7 @@ impl TypedBenchmarkProducingConsumer {
         limit_bytes_per_second: Option<IggyByteSize>,
         polling_kind: PollingKind,
         origin_timestamp_latency_calculation: bool,
+        telemetry: Option<MetricsHandle>,
     ) -> Self {
         let producer_config = BenchmarkProducerConfig {
             producer_id: actor_id,
@@ -99,6 +101,7 @@ impl TypedBenchmarkProducingConsumer {
                 limit_bytes_per_second,
                 producer_config,
                 consumer_config,
+                telemetry,
             ))
         } else {
             let producer =
@@ -115,6 +118,7 @@ impl TypedBenchmarkProducingConsumer {
                 limit_bytes_per_second,
                 producer_config,
                 consumer_config,
+                telemetry,
             ))
         }
     }
diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index 7d475aa0..0f158eca 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -16,6 +16,8 @@
  * under the License.
  */
 
+use crate::telemetry::TelemetryConfig;
+
 use super::kind::BenchmarkKindCommand;
 use super::output::BenchmarkOutputCommand;
 use super::props::{BenchmarkKindProps, BenchmarkTransportProps};
@@ -340,6 +342,39 @@ impl IggyBenchArgs {
             })
     }
 
+    pub fn otel_config(&self) -> Option<TelemetryConfig> {
+        self.benchmark_kind
+            .inner()
+            .transport_command()
+            .output_command()
+            .as_ref()
+            .and_then(|cmd| match cmd {
+                BenchmarkOutputCommand::Output(args) => {
+                    if args.otel_enabled {
+                        Some(TelemetryConfig {
+                            enabled: true,
+                            endpoint: args.otel_endpoint.clone(),
+                            export_interval: std::time::Duration::from_secs(
+                                args.otel_export_interval_secs,
+                            ),
+                            export_timeout: std::time::Duration::from_secs(
+                                args.otel_export_timeout_secs,
+                            ),
+                            buffer_size: args.otel_buffer_size,
+                            buffer_flush_interval: 
std::time::Duration::from_millis(
+                                args.otel_buffer_flush_ms,
+                            ),
+                            service_name: "iggy-bench".to_string(),
+                            service_version: 
env!("CARGO_PKG_VERSION").to_string(),
+                            benchmark_id: uuid::Uuid::new_v4().to_string(),
+                        })
+                    } else {
+                        None
+                    }
+                }
+            })
+    }
+
     pub fn max_topic_size(&self) -> Option<IggyByteSize> {
         self.benchmark_kind.inner().max_topic_size()
     }
diff --git a/core/bench/src/args/output.rs b/core/bench/src/args/output.rs
index 82e73c4f..d0dd3676 100644
--- a/core/bench/src/args/output.rs
+++ b/core/bench/src/args/output.rs
@@ -53,4 +53,28 @@ pub struct BenchmarkOutputArgs {
     /// Open generated charts in browser after benchmark is finished
     #[arg(long, short = 'c', default_value_t = false)]
     pub open_charts: bool,
+
+    /// Enable OpenTelemetry metrics export
+    #[arg(long, default_value_t = false)]
+    pub otel_enabled: bool,
+
+    /// OpenTelemetry collector endpoint (gRPC)
+    #[arg(long, default_value = "http://localhost:4317";)]
+    pub otel_endpoint: String,
+
+    /// OpenTelemetry metrics export interval in seconds
+    #[arg(long, default_value_t = 1)]
+    pub otel_export_interval_secs: u64,
+
+    /// OpenTelemetry metrics export timeout in seconds
+    #[arg(long, default_value_t = 10)]
+    pub otel_export_timeout_secs: u64,
+
+    /// Metrics buffer size (number of events)
+    #[arg(long, default_value_t = 10000)]
+    pub otel_buffer_size: usize,
+
+    /// Metrics buffer flush interval in milliseconds
+    #[arg(long, default_value_t = 100)]
+    pub otel_buffer_flush_ms: u64,
 }
diff --git a/core/bench/src/benchmarks/balanced_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_consumer_group.rs
index ac60fc6f..9329b54c 100644
--- a/core/bench/src/benchmarks/balanced_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_consumer_group.rs
@@ -20,6 +20,7 @@ use super::benchmark::Benchmarkable;
 use crate::{
     args::common::IggyBenchArgs,
     benchmarks::common::{build_consumer_futures, init_consumer_groups},
+    telemetry::{MetricsHandle, TelemetryContext},
 };
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
@@ -32,13 +33,36 @@ use tracing::info;
 pub struct BalancedConsumerGroupBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl BalancedConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let consumers = args.consumers();
+            let cg_count = args.number_of_consumer_groups();
+            let start_stream_id = args.start_stream_id();
+
+            (1..=consumers)
+                .map(|consumer_id| {
+                    let stream_id = if cg_count > 0 {
+                        start_stream_id + 1 + (consumer_id % cg_count)
+                    } else {
+                        start_stream_id + consumer_id
+                    };
+                    ctx.create_handle("consumer", consumer_id, stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -55,7 +79,7 @@ impl Benchmarkable for BalancedConsumerGroupBenchmark {
 
         init_consumer_groups(cf, &args).await?;
 
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let consumer_futures = build_consumer_futures(cf, &args, 
&self.telemetry_handles);
         for fut in consumer_futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/balanced_producer.rs 
b/core/bench/src/benchmarks/balanced_producer.rs
index 6e81ab5e..3353eadf 100644
--- a/core/bench/src/benchmarks/balanced_producer.rs
+++ b/core/bench/src/benchmarks/balanced_producer.rs
@@ -19,6 +19,7 @@
 use super::benchmark::Benchmarkable;
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::build_producer_futures;
+use crate::telemetry::{MetricsHandle, TelemetryContext};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -31,13 +32,31 @@ use tracing::info;
 pub struct BalancedProducerBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl BalancedProducerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let producers = args.producers();
+            let streams = args.streams();
+            let start_stream_id = args.start_stream_id();
+            (1..=producers)
+                .map(|producer_id| {
+                    let stream_id = start_stream_id + 1 + (producer_id % 
streams);
+                    ctx.create_handle("producer", producer_id, stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -50,7 +69,7 @@ impl Benchmarkable for BalancedProducerBenchmark {
         self.init_streams().await?;
         let cf = &self.client_factory;
         let args = self.args.clone();
-        let producer_futures = build_producer_futures(cf, &args);
+        let producer_futures = build_producer_futures(cf, &args, 
&self.telemetry_handles);
         let mut tasks = JoinSet::new();
 
         for fut in producer_futures {
diff --git a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs 
b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
index 0eab5db8..d44569c1 100644
--- a/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
+++ b/core/bench/src/benchmarks/balanced_producer_and_consumer_group.rs
@@ -20,6 +20,7 @@ use super::benchmark::Benchmarkable;
 use crate::{
     args::common::IggyBenchArgs,
     benchmarks::common::{build_consumer_futures, build_producer_futures, 
init_consumer_groups},
+    telemetry::{MetricsHandle, TelemetryContext},
 };
 use async_trait::async_trait;
 use bench_report::{benchmark_kind::BenchmarkKind, 
individual_metrics::BenchmarkIndividualMetrics};
@@ -32,13 +33,54 @@ use tracing::info;
 pub struct BalancedProducerAndConsumerGroupBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    producer_telemetry_handles: Vec<MetricsHandle>,
+    // TODO: Use this field when consumer telemetry is implemented
+    #[allow(dead_code)]
+    consumer_telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl BalancedProducerAndConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let (producer_telemetry_handles, consumer_telemetry_handles) = 
telemetry.map_or_else(
+            || (Vec::new(), Vec::new()),
+            |ctx| {
+                let producers = args.producers();
+                let consumers = args.consumers();
+                let streams = args.streams();
+                let cg_count = args.number_of_consumer_groups();
+                let start_stream_id = args.start_stream_id();
+
+                let producer_handles = (1..=producers)
+                    .map(|producer_id| {
+                        let stream_id = start_stream_id + 1 + (producer_id % 
streams);
+                        ctx.create_handle("producer", producer_id, stream_id)
+                    })
+                    .collect();
+
+                let consumer_handles = (1..=consumers)
+                    .map(|consumer_id| {
+                        let stream_id = if cg_count > 0 {
+                            start_stream_id + 1 + (consumer_id % cg_count)
+                        } else {
+                            start_stream_id + consumer_id
+                        };
+                        ctx.create_handle("consumer", consumer_id, stream_id)
+                    })
+                    .collect();
+
+                (producer_handles, consumer_handles)
+            },
+        );
+
         Self {
             args,
             client_factory,
+            producer_telemetry_handles,
+            consumer_telemetry_handles,
         }
     }
 }
@@ -55,8 +97,8 @@ impl Benchmarkable for 
BalancedProducerAndConsumerGroupBenchmark {
 
         init_consumer_groups(cf, &args).await?;
 
-        let producer_futures = build_producer_futures(cf, &args);
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let producer_futures = build_producer_futures(cf, &args, 
&self.producer_telemetry_handles);
+        let consumer_futures = build_consumer_futures(cf, &args, 
&self.consumer_telemetry_handles);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 9b96a1b8..25a63aac 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -17,6 +17,7 @@
  */
 
 use crate::args::kind::BenchmarkKindCommand;
+use crate::telemetry::TelemetryContext;
 use crate::{args::common::IggyBenchArgs, 
utils::client_factory::create_client_factory};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
@@ -37,43 +38,58 @@ use super::pinned_consumer::PinnedConsumerBenchmark;
 use super::pinned_producer::PinnedProducerBenchmark;
 use super::pinned_producer_and_consumer::PinnedProducerAndConsumerBenchmark;
 
-impl From<IggyBenchArgs> for Box<dyn Benchmarkable> {
-    fn from(args: IggyBenchArgs) -> Self {
+impl From<(IggyBenchArgs, Option<&TelemetryContext>)> for Box<dyn 
Benchmarkable> {
+    fn from(
+        (args, telemetry): (IggyBenchArgs, 
Option<&crate::telemetry::TelemetryContext>),
+    ) -> Self {
         let client_factory = create_client_factory(&args);
 
         match args.benchmark_kind {
-            BenchmarkKindCommand::PinnedProducer(_) => {
-                Box::new(PinnedProducerBenchmark::new(Arc::new(args), 
client_factory))
-            }
+            BenchmarkKindCommand::PinnedProducer(_) => 
Box::new(PinnedProducerBenchmark::new(
+                Arc::new(args),
+                client_factory,
+                telemetry,
+            )),
 
-            BenchmarkKindCommand::PinnedConsumer(_) => {
-                Box::new(PinnedConsumerBenchmark::new(Arc::new(args), 
client_factory))
-            }
+            BenchmarkKindCommand::PinnedConsumer(_) => 
Box::new(PinnedConsumerBenchmark::new(
+                Arc::new(args),
+                client_factory,
+                telemetry,
+            )),
 
             BenchmarkKindCommand::PinnedProducerAndConsumer(_) => Box::new(
-                PinnedProducerAndConsumerBenchmark::new(Arc::new(args), 
client_factory),
+                PinnedProducerAndConsumerBenchmark::new(Arc::new(args), 
client_factory, telemetry),
             ),
 
             BenchmarkKindCommand::BalancedProducer(_) => 
Box::new(BalancedProducerBenchmark::new(
                 Arc::new(args),
                 client_factory,
+                telemetry,
             )),
 
             BenchmarkKindCommand::BalancedConsumerGroup(_) => Box::new(
-                BalancedConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
+                BalancedConsumerGroupBenchmark::new(Arc::new(args), 
client_factory, telemetry),
             ),
 
-            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => 
Box::new(
-                BalancedProducerAndConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
-            ),
+            BenchmarkKindCommand::BalancedProducerAndConsumerGroup(_) => {
+                Box::new(BalancedProducerAndConsumerGroupBenchmark::new(
+                    Arc::new(args),
+                    client_factory,
+                    telemetry,
+                ))
+            }
 
             BenchmarkKindCommand::EndToEndProducingConsumer(_) => Box::new(
-                EndToEndProducingConsumerBenchmark::new(Arc::new(args), 
client_factory),
+                EndToEndProducingConsumerBenchmark::new(Arc::new(args), 
client_factory, telemetry),
             ),
 
-            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => 
Box::new(
-                EndToEndProducingConsumerGroupBenchmark::new(Arc::new(args), 
client_factory),
-            ),
+            BenchmarkKindCommand::EndToEndProducingConsumerGroup(_) => {
+                Box::new(EndToEndProducingConsumerGroupBenchmark::new(
+                    Arc::new(args),
+                    client_factory,
+                    telemetry,
+                ))
+            }
             BenchmarkKindCommand::Examples => {
                 unreachable!("Examples should be handled before this point")
             }
diff --git a/core/bench/src/benchmarks/common.rs 
b/core/bench/src/benchmarks/common.rs
index d5436b56..d44d46cb 100644
--- a/core/bench/src/benchmarks/common.rs
+++ b/core/bench/src/benchmarks/common.rs
@@ -114,6 +114,7 @@ pub async fn init_consumer_groups(
 pub fn build_producer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
+    telemetry_handles: &[crate::telemetry::MetricsHandle],
 ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let streams = args.streams();
     let partitions = args.number_of_partitions();
@@ -143,6 +144,10 @@ pub fn build_producer_futures(
 
             let stream_id = start_stream_id + 1 + (producer_id % streams);
 
+            let metrics_handle = telemetry_handles
+                .get((producer_id as usize).saturating_sub(1))
+                .cloned();
+
             async move {
                 let producer = TypedBenchmarkProducer::new(
                     use_high_level_api,
@@ -158,6 +163,7 @@ pub fn build_producer_futures(
                     sampling_time,
                     moving_average_window,
                     rate_limit,
+                    metrics_handle,
                 );
                 producer.run().await
             }
@@ -168,6 +174,7 @@ pub fn build_producer_futures(
 pub fn build_consumer_futures(
     client_factory: &Arc<dyn ClientFactory>,
     args: &IggyBenchArgs,
+    telemetry_handles: &[crate::telemetry::MetricsHandle],
 ) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let start_stream_id = args.start_stream_id();
     let cg_count = args.number_of_consumer_groups();
@@ -215,6 +222,10 @@ pub fn build_consumer_futures(
                 None
             };
 
+            let metrics_handle = telemetry_handles
+                .get((consumer_id as usize).saturating_sub(1))
+                .cloned();
+
             async move {
                 let consumer = TypedBenchmarkConsumer::new(
                     use_high_level_api,
@@ -231,6 +242,7 @@ pub fn build_consumer_futures(
                     polling_kind,
                     rate_limit,
                     origin_timestamp_latency_calculation,
+                    metrics_handle,
                 );
                 consumer.run().await
             }
@@ -242,7 +254,8 @@ pub fn build_consumer_futures(
 pub fn build_producing_consumers_futures(
     client_factory: Arc<dyn ClientFactory>,
     args: Arc<IggyBenchArgs>,
-) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send> {
+    telemetry_handles: &[crate::telemetry::MetricsHandle],
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let producing_consumers = args.producers();
     let streams = args.streams();
     let partitions = args.number_of_partitions();
@@ -277,6 +290,10 @@ pub fn build_producing_consumers_futures(
             let use_high_level_api = args.high_level_api();
             let rate_limit = rate_limit_per_actor(args.rate_limit(), 
producing_consumers);
 
+            let metrics_handle = telemetry_handles
+                .get((actor_id as usize).saturating_sub(1))
+                .cloned();
+
             async move {
                 info!(
                     "Executing producing consumer #{}, stream_id={}",
@@ -300,6 +317,7 @@ pub fn build_producing_consumers_futures(
                     rate_limit,
                     polling_kind,
                     origin_timestamp_latency_calculation,
+                    metrics_handle,
                 );
                 actor.run().await
             }
@@ -311,7 +329,8 @@ pub fn build_producing_consumers_futures(
 pub fn build_producing_consumer_groups_futures(
     client_factory: Arc<dyn ClientFactory>,
     args: Arc<IggyBenchArgs>,
-) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send> {
+    telemetry_handles: &[crate::telemetry::MetricsHandle],
+) -> Vec<impl Future<Output = Result<BenchmarkIndividualMetrics, IggyError>> + 
Send + use<>> {
     let producers = args.producers();
     let consumers = args.consumers();
     let total_actors = producers.max(consumers);
@@ -371,6 +390,10 @@ pub fn build_producing_consumer_groups_futures(
                 _ => unreachable!(),
             };
 
+            let metrics_handle = telemetry_handles
+                .get((actor_id as usize).saturating_sub(1))
+                .cloned();
+
             async move {
                 let actor_type = match (should_produce, should_consume) {
                     (true, true) => "(producer+consumer)",
@@ -408,6 +431,7 @@ pub fn build_producing_consumer_groups_futures(
                     rate_limit,
                     polling_kind,
                     origin_timestamp_latency_calculation,
+                    metrics_handle,
                 );
 
                 actor.run().await
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
index 2818fea6..e3f83e3e 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer.rs
@@ -18,6 +18,7 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::build_producing_consumers_futures;
+use crate::telemetry::{MetricsHandle, TelemetryContext};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -32,13 +33,31 @@ use super::benchmark::Benchmarkable;
 pub struct EndToEndProducingConsumerBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl EndToEndProducingConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let producing_consumers = args.producers();
+            let streams = args.streams();
+            let start_stream_id = args.start_stream_id();
+            (1..=producing_consumers)
+                .map(|actor_id| {
+                    let stream_id = start_stream_id + 1 + (actor_id % streams);
+                    ctx.create_handle("producing_consumer", actor_id, 
stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -53,7 +72,7 @@ impl Benchmarkable for EndToEndProducingConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let futures = build_producing_consumers_futures(cf, args);
+        let futures = build_producing_consumers_futures(cf, args, 
&self.telemetry_handles);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs 
b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
index 2adda4d5..0ae0b78f 100644
--- a/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
+++ b/core/bench/src/benchmarks/end_to_end_producing_consumer_group.rs
@@ -18,6 +18,7 @@
 
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::common::{build_producing_consumer_groups_futures, 
init_consumer_groups};
+use crate::telemetry::{MetricsHandle, TelemetryContext};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -32,13 +33,34 @@ use super::benchmark::Benchmarkable;
 pub struct EndToEndProducingConsumerGroupBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl EndToEndProducingConsumerGroupBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let producers = args.producers();
+            let consumers = args.consumers();
+            let total_actors = producers.max(consumers);
+            let cg_count = args.number_of_consumer_groups();
+            let start_stream_id = args.start_stream_id();
+
+            (1..=total_actors)
+                .map(|actor_id| {
+                    let stream_id = start_stream_id + 1 + (actor_id % 
cg_count);
+                    ctx.create_handle("producing_consumer_group", actor_id, 
stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -55,7 +77,7 @@ impl Benchmarkable for 
EndToEndProducingConsumerGroupBenchmark {
 
         init_consumer_groups(&cf, &args).await?;
 
-        let futures = build_producing_consumer_groups_futures(cf, args);
+        let futures = build_producing_consumer_groups_futures(cf, args, 
&self.telemetry_handles);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/pinned_consumer.rs 
b/core/bench/src/benchmarks/pinned_consumer.rs
index 870df77a..18716274 100644
--- a/core/bench/src/benchmarks/pinned_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_consumer.rs
@@ -19,6 +19,7 @@
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_consumer_futures;
+use crate::telemetry::{MetricsHandle, TelemetryContext};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -31,13 +32,30 @@ use tracing::info;
 pub struct PinnedConsumerBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl PinnedConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let consumers = args.consumers();
+            let start_stream_id = args.start_stream_id();
+            (1..=consumers)
+                .map(|consumer_id| {
+                    let stream_id = start_stream_id + consumer_id;
+                    ctx.create_handle("consumer", consumer_id, stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -52,7 +70,7 @@ impl Benchmarkable for PinnedConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks: JoinSet<_> = JoinSet::new();
 
-        let futures = build_consumer_futures(cf, &args);
+        let futures = build_consumer_futures(cf, &args, 
&self.telemetry_handles);
         for fut in futures {
             tasks.spawn(fut);
         }
diff --git a/core/bench/src/benchmarks/pinned_producer.rs 
b/core/bench/src/benchmarks/pinned_producer.rs
index 3514e24d..5cfd1ca8 100644
--- a/core/bench/src/benchmarks/pinned_producer.rs
+++ b/core/bench/src/benchmarks/pinned_producer.rs
@@ -16,9 +16,10 @@
  * under the License.
  */
 
-use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::build_producer_futures;
+use crate::telemetry::TelemetryContext;
+use crate::{args::common::IggyBenchArgs, telemetry::MetricsHandle};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -31,13 +32,31 @@ use tracing::info;
 pub struct PinnedProducerBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl PinnedProducerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let telemetry_handles = telemetry.map_or_else(Vec::new, |ctx| {
+            let producers = args.producers();
+            let streams = args.streams();
+            let start_stream_id = args.start_stream_id();
+            (1..=producers)
+                .map(|producer_id| {
+                    let stream_id = start_stream_id + 1 + (producer_id % 
streams);
+                    ctx.create_handle("producer", producer_id, stream_id)
+                })
+                .collect()
+        });
+
         Self {
             args,
             client_factory,
+            telemetry_handles,
         }
     }
 }
@@ -52,7 +71,8 @@ impl Benchmarkable for PinnedProducerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let producer_futures = build_producer_futures(client_factory, &args);
+        let producer_futures =
+            build_producer_futures(client_factory, &args, 
&self.telemetry_handles);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs 
b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
index cde6632c..7f795102 100644
--- a/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
+++ b/core/bench/src/benchmarks/pinned_producer_and_consumer.rs
@@ -18,6 +18,7 @@
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::benchmarks::common::{build_consumer_futures, 
build_producer_futures};
+use crate::telemetry::{MetricsHandle, TelemetryContext};
 use async_trait::async_trait;
 use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::individual_metrics::BenchmarkIndividualMetrics;
@@ -30,13 +31,49 @@ use tracing::info;
 pub struct PinnedProducerAndConsumerBenchmark {
     args: Arc<IggyBenchArgs>,
     client_factory: Arc<dyn ClientFactory>,
+    producer_telemetry_handles: Vec<MetricsHandle>,
+    // TODO: Use this field when consumer telemetry is implemented
+    #[allow(dead_code)]
+    consumer_telemetry_handles: Vec<MetricsHandle>,
 }
 
 impl PinnedProducerAndConsumerBenchmark {
-    pub fn new(args: Arc<IggyBenchArgs>, client_factory: Arc<dyn 
ClientFactory>) -> Self {
+    pub fn new(
+        args: Arc<IggyBenchArgs>,
+        client_factory: Arc<dyn ClientFactory>,
+        telemetry: Option<&TelemetryContext>,
+    ) -> Self {
+        let (producer_telemetry_handles, consumer_telemetry_handles) = 
telemetry.map_or_else(
+            || (Vec::new(), Vec::new()),
+            |ctx| {
+                let producers = args.producers();
+                let consumers = args.consumers();
+                let streams = args.streams();
+                let start_stream_id = args.start_stream_id();
+
+                let producer_handles = (1..=producers)
+                    .map(|producer_id| {
+                        let stream_id = start_stream_id + 1 + (producer_id % 
streams);
+                        ctx.create_handle("producer", producer_id, stream_id)
+                    })
+                    .collect();
+
+                let consumer_handles = (1..=consumers)
+                    .map(|consumer_id| {
+                        let stream_id = start_stream_id + consumer_id;
+                        ctx.create_handle("consumer", consumer_id, stream_id)
+                    })
+                    .collect();
+
+                (producer_handles, consumer_handles)
+            },
+        );
+
         Self {
             args,
             client_factory,
+            producer_telemetry_handles,
+            consumer_telemetry_handles,
         }
     }
 }
@@ -51,8 +88,8 @@ impl Benchmarkable for PinnedProducerAndConsumerBenchmark {
         let args = self.args.clone();
         let mut tasks = JoinSet::new();
 
-        let producer_futures = build_producer_futures(cf, &args);
-        let consumer_futures = build_consumer_futures(cf, &args);
+        let producer_futures = build_producer_futures(cf, &args, 
&self.producer_telemetry_handles);
+        let consumer_futures = build_consumer_futures(cf, &args, 
&self.consumer_telemetry_handles);
 
         for fut in producer_futures {
             tasks.spawn(fut);
diff --git a/core/bench/src/main.rs b/core/bench/src/main.rs
index 77d7d66b..289d199a 100644
--- a/core/bench/src/main.rs
+++ b/core/bench/src/main.rs
@@ -22,6 +22,7 @@ mod args;
 mod benchmarks;
 mod plot;
 mod runner;
+mod telemetry;
 mod utils;
 
 use crate::{args::common::IggyBenchArgs, runner::BenchmarkRunner};
diff --git a/core/bench/src/runner.rs b/core/bench/src/runner.rs
index ab9553b3..1885e5cc 100644
--- a/core/bench/src/runner.rs
+++ b/core/bench/src/runner.rs
@@ -20,6 +20,7 @@ use crate::analytics::report_builder::BenchmarkReportBuilder;
 use crate::args::common::IggyBenchArgs;
 use crate::benchmarks::benchmark::Benchmarkable;
 use crate::plot::{ChartType, plot_chart};
+use crate::telemetry::setup::TelemetryContext;
 use crate::utils::cpu_name::append_cpu_name_lowercase;
 use crate::utils::server_starter::start_server_if_needed;
 use crate::utils::{collect_server_logs_and_save_to_file, 
params_from_args_and_metrics};
@@ -34,6 +35,7 @@ use tracing::{error, info};
 pub struct BenchmarkRunner {
     pub args: Option<IggyBenchArgs>,
     pub test_server: Option<TestServer>,
+    pub telemetry: Option<TelemetryContext>,
 }
 
 impl BenchmarkRunner {
@@ -41,6 +43,7 @@ impl BenchmarkRunner {
         Self {
             args: Some(args),
             test_server: None,
+            telemetry: None,
         }
     }
 
@@ -48,13 +51,32 @@ impl BenchmarkRunner {
     pub async fn run(mut self) -> Result<(), IggyError> {
         let args = self.args.take().unwrap();
         let should_open_charts = args.open_charts();
+
+        // Initialize OpenTelemetry if configured
+        if let Some(otel_config) = args.otel_config() {
+            match TelemetryContext::init(otel_config).await {
+                Ok(ctx) => {
+                    self.telemetry = ctx;
+                    if self.telemetry.is_some() {
+                        info!("OpenTelemetry metrics streaming enabled and 
connected");
+                    }
+                }
+                Err(e) => {
+                    error!("Failed to initialize OpenTelemetry: {}", e);
+                    error!("Benchmark cannot proceed without OpenTelemetry 
connection");
+                    error!("Please ensure the OpenTelemetry collector is 
running and accessible");
+                    return Err(IggyError::CannotEstablishConnection);
+                }
+            }
+        }
+
         self.test_server = start_server_if_needed(&args).await;
 
         let transport = args.transport();
         let server_addr = args.server_address();
         info!("Starting to benchmark: {transport} with server: 
{server_addr}",);
 
-        let mut benchmark: Box<dyn Benchmarkable> = args.into();
+        let mut benchmark: Box<dyn Benchmarkable> = (args, 
self.telemetry.as_ref()).into();
         benchmark.print_info();
         let mut join_handles = benchmark.run().await?;
 
@@ -131,6 +153,10 @@ impl BenchmarkRunner {
             })?;
         }
 
+        if let Some(telemetry) = self.telemetry.take() {
+            telemetry.shutdown().await;
+        }
+
         Ok(())
     }
 }
diff --git a/core/bench/src/telemetry/buffer.rs 
b/core/bench/src/telemetry/buffer.rs
new file mode 100644
index 00000000..d9be5280
--- /dev/null
+++ b/core/bench/src/telemetry/buffer.rs
@@ -0,0 +1,227 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use opentelemetry::KeyValue;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
+use std::time::{Duration, Instant};
+use tokio::sync::mpsc;
+use tracing::warn;
+
+/// Event types for metrics
+#[derive(Clone, Debug)]
+pub enum EventType {
+    BatchSent {
+        messages: u64,
+        bytes: u64,
+        latency_us: f64,
+    },
+    BatchReceived {
+        messages: u64,
+        bytes: u64,
+        latency_us: f64,
+    },
+    Throughput {
+        messages_per_sec: f64,
+        mb_per_sec: f64,
+    },
+}
+
+/// Single metric event to be processed
+#[derive(Clone, Debug)]
+pub struct MetricEvent {
+    pub timestamp: Instant,
+    pub event_type: EventType,
+    pub labels: Vec<KeyValue>,
+}
+
+/// Lock-free metrics buffer using channels
+/// This avoids blocking the benchmark threads
+pub struct MetricsBuffer {
+    sender: mpsc::UnboundedSender<MetricEvent>,
+    dropped_count: Arc<AtomicU64>,
+    buffer_size: Arc<AtomicUsize>,
+}
+
+impl MetricsBuffer {
+    pub fn new(batch_size: usize, flush_interval: Duration) -> (Self, 
MetricsProcessor) {
+        let (sender, receiver) = mpsc::unbounded_channel();
+
+        let buffer = Self {
+            sender,
+            dropped_count: Arc::new(AtomicU64::new(0)),
+            buffer_size: Arc::new(AtomicUsize::new(0)),
+        };
+
+        let processor = MetricsProcessor {
+            receiver,
+            batch_size,
+            flush_interval,
+            batch: Vec::with_capacity(batch_size),
+            last_flush: Instant::now(),
+            buffer_size: buffer.buffer_size.clone(),
+        };
+
+        (buffer, processor)
+    }
+
+    /// Push a metric event to the buffer (non-blocking)
+    pub fn push(&self, event: MetricEvent) {
+        // Try to send, but don't block if buffer is full
+        match self.sender.send(event) {
+            Ok(()) => {
+                self.buffer_size.fetch_add(1, Ordering::Relaxed);
+            }
+            Err(_) => {
+                // Channel is full, increment dropped counter
+                self.dropped_count.fetch_add(1, Ordering::Relaxed);
+            }
+        }
+    }
+
+    pub fn dropped_events(&self) -> u64 {
+        self.dropped_count.load(Ordering::Relaxed)
+    }
+
+    pub fn buffer_size(&self) -> usize {
+        self.buffer_size.load(Ordering::Relaxed)
+    }
+}
+
+/// Processes buffered metrics asynchronously
+pub struct MetricsProcessor {
+    receiver: mpsc::UnboundedReceiver<MetricEvent>,
+    batch_size: usize,
+    flush_interval: Duration,
+    batch: Vec<MetricEvent>,
+    last_flush: Instant,
+    buffer_size: Arc<AtomicUsize>,
+}
+
+impl MetricsProcessor {
+    /// Process metrics in a background task
+    pub async fn run(mut self, metrics: Arc<super::BenchMetrics>) {
+        loop {
+            // Check if we should flush based on time
+            let should_flush_time = self.last_flush.elapsed() >= 
self.flush_interval;
+
+            // Try to receive events with a timeout
+            let timeout = if should_flush_time {
+                Duration::from_millis(1)
+            } else {
+                self.flush_interval - self.last_flush.elapsed()
+            };
+
+            match tokio::time::timeout(timeout, self.receiver.recv()).await {
+                Ok(Some(event)) => {
+                    self.batch.push(event);
+                    // Decrement buffer size since we've received the event
+                    self.buffer_size.fetch_sub(1, Ordering::Relaxed);
+
+                    // Flush if batch is full
+                    if self.batch.len() >= self.batch_size {
+                        self.flush_batch(&metrics);
+                    }
+                }
+                Ok(None) => {
+                    // Channel closed, flush remaining and exit
+                    if !self.batch.is_empty() {
+                        self.flush_batch(&metrics);
+                    }
+                    break;
+                }
+                Err(_) => {
+                    // Timeout - flush if we have data
+                    if !self.batch.is_empty() && should_flush_time {
+                        self.flush_batch(&metrics);
+                    }
+                }
+            }
+        }
+    }
+
+    fn flush_batch(&mut self, metrics: &super::BenchMetrics) {
+        // Process all events in the batch
+        for event in self.batch.drain(..) {
+            match event.event_type {
+                EventType::BatchSent {
+                    messages,
+                    bytes,
+                    latency_us,
+                } => {
+                    metrics.messages_sent.add(messages, &event.labels);
+                    metrics.bytes_sent.add(bytes, &event.labels);
+                    metrics.batches_sent.add(1, &event.labels);
+                    metrics.send_latency.record(latency_us, &event.labels);
+                    metrics.batch_latency.record(latency_us, &event.labels);
+                }
+                EventType::BatchReceived {
+                    messages,
+                    bytes,
+                    latency_us,
+                } => {
+                    metrics.messages_received.add(messages, &event.labels);
+                    metrics.bytes_received.add(bytes, &event.labels);
+                    metrics.batches_received.add(1, &event.labels);
+                    metrics.receive_latency.record(latency_us, &event.labels);
+                }
+                EventType::Throughput {
+                    messages_per_sec,
+                    mb_per_sec,
+                } => {
+                    metrics
+                        .throughput_messages_per_sec
+                        .record(messages_per_sec, &event.labels);
+                    metrics
+                        .throughput_mb_per_sec
+                        .record(mb_per_sec, &event.labels);
+                }
+            }
+        }
+
+        self.last_flush = Instant::now();
+    }
+}
+
+/// Periodic reporter for buffer stats
+pub struct BufferStatsReporter {
+    buffer: Arc<MetricsBuffer>,
+    interval: Duration,
+}
+
+impl BufferStatsReporter {
+    pub const fn new(buffer: Arc<MetricsBuffer>, interval: Duration) -> Self {
+        Self { buffer, interval }
+    }
+
+    pub async fn run(self) {
+        let mut interval = tokio::time::interval(self.interval);
+        loop {
+            interval.tick().await;
+
+            let dropped = self.buffer.dropped_events();
+            if dropped > 0 {
+                warn!(
+                    "Metrics buffer dropped {} events (buffer size: {})",
+                    dropped,
+                    self.buffer.buffer_size()
+                );
+            }
+        }
+    }
+}
diff --git a/core/bench/src/telemetry/mod.rs b/core/bench/src/telemetry/mod.rs
new file mode 100644
index 00000000..a64499cf
--- /dev/null
+++ b/core/bench/src/telemetry/mod.rs
@@ -0,0 +1,238 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+pub mod buffer;
+pub mod setup;
+
+pub use setup::{TelemetryConfig, TelemetryContext};
+
+use opentelemetry::KeyValue;
+use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter};
+use std::sync::Arc;
+use std::time::Instant;
+
+/// Container for all OpenTelemetry metrics instruments
+pub struct BenchMetrics {
+    // Counters
+    pub messages_sent: Counter<u64>,
+    pub messages_received: Counter<u64>,
+    pub bytes_sent: Counter<u64>,
+    pub bytes_received: Counter<u64>,
+    pub batches_sent: Counter<u64>,
+    pub batches_received: Counter<u64>,
+
+    // Histograms for latency distribution
+    pub send_latency: Histogram<f64>,
+    pub receive_latency: Histogram<f64>,
+    pub batch_latency: Histogram<f64>,
+    pub end_to_end_latency: Histogram<f64>,
+
+    // Gauges for current state
+    pub active_producers: Gauge<u64>,
+    pub active_consumers: Gauge<u64>,
+    pub throughput_messages_per_sec: Gauge<f64>,
+    pub throughput_mb_per_sec: Gauge<f64>,
+    pub current_rate_limit_mb_per_sec: Gauge<f64>,
+
+    // Message size histograms
+    pub message_size: Histogram<u64>,
+    pub batch_size: Histogram<u64>,
+}
+
+impl BenchMetrics {
+    pub fn new(meter: &Meter) -> Self {
+        Self {
+            // Counters
+            messages_sent: meter
+                .u64_counter("iggy.bench.messages.sent")
+                .with_description("Total number of messages sent")
+                .with_unit("messages")
+                .build(),
+
+            messages_received: meter
+                .u64_counter("iggy.bench.messages.received")
+                .with_description("Total number of messages received")
+                .with_unit("messages")
+                .build(),
+
+            bytes_sent: meter
+                .u64_counter("iggy.bench.bytes.sent")
+                .with_description("Total bytes sent")
+                .with_unit("bytes")
+                .build(),
+
+            bytes_received: meter
+                .u64_counter("iggy.bench.bytes.received")
+                .with_description("Total bytes received")
+                .with_unit("bytes")
+                .build(),
+
+            batches_sent: meter
+                .u64_counter("iggy.bench.batches.sent")
+                .with_description("Total number of batches sent")
+                .with_unit("batches")
+                .build(),
+
+            batches_received: meter
+                .u64_counter("iggy.bench.batches.received")
+                .with_description("Total number of batches received")
+                .with_unit("batches")
+                .build(),
+
+            // Histograms
+            send_latency: meter
+                .f64_histogram("iggy.bench.latency.send")
+                .with_description("Send operation latency")
+                .with_unit("microseconds")
+                .build(),
+
+            receive_latency: meter
+                .f64_histogram("iggy.bench.latency.receive")
+                .with_description("Receive operation latency")
+                .with_unit("microseconds")
+                .build(),
+
+            batch_latency: meter
+                .f64_histogram("iggy.bench.latency.batch")
+                .with_description("Batch processing latency")
+                .with_unit("microseconds")
+                .build(),
+
+            end_to_end_latency: meter
+                .f64_histogram("iggy.bench.latency.e2e")
+                .with_description("End-to-end message latency")
+                .with_unit("microseconds")
+                .build(),
+
+            // Gauges
+            active_producers: meter
+                .u64_gauge("iggy.bench.actors.producers")
+                .with_description("Number of active producers")
+                .with_unit("actors")
+                .build(),
+
+            active_consumers: meter
+                .u64_gauge("iggy.bench.actors.consumers")
+                .with_description("Number of active consumers")
+                .with_unit("actors")
+                .build(),
+
+            throughput_messages_per_sec: meter
+                .f64_gauge("iggy.bench.throughput.messages")
+                .with_description("Current throughput in messages per second")
+                .with_unit("messages/sec")
+                .build(),
+
+            throughput_mb_per_sec: meter
+                .f64_gauge("iggy.bench.throughput.bytes")
+                .with_description("Current throughput in MB per second")
+                .with_unit("MB/sec")
+                .build(),
+
+            current_rate_limit_mb_per_sec: meter
+                .f64_gauge("iggy.bench.rate_limit")
+                .with_description("Current rate limit in MB per second")
+                .with_unit("MB/sec")
+                .build(),
+
+            // Size histograms
+            message_size: meter
+                .u64_histogram("iggy.bench.size.message")
+                .with_description("Distribution of message sizes")
+                .with_unit("bytes")
+                .build(),
+
+            batch_size: meter
+                .u64_histogram("iggy.bench.size.batch")
+                .with_description("Distribution of batch sizes")
+                .with_unit("messages")
+                .build(),
+        }
+    }
+}
+
+/// Lightweight metrics handle for actors to record metrics
+#[derive(Clone)]
+pub struct MetricsHandle {
+    // TODO: Will be used for direct metric recording in future optimizations
+    #[allow(dead_code)]
+    metrics: Arc<BenchMetrics>,
+    buffer: Arc<buffer::MetricsBuffer>,
+    actor_labels: Vec<KeyValue>,
+}
+
+impl MetricsHandle {
+    pub fn new(
+        metrics: Arc<BenchMetrics>,
+        buffer: Arc<buffer::MetricsBuffer>,
+        actor_type: &str,
+        actor_id: u32,
+        stream_id: u32,
+    ) -> Self {
+        let actor_labels = vec![
+            KeyValue::new("actor_type", actor_type.to_string()),
+            KeyValue::new("actor_id", i64::from(actor_id)),
+            KeyValue::new("stream_id", i64::from(stream_id)),
+        ];
+
+        Self {
+            metrics,
+            buffer,
+            actor_labels,
+        }
+    }
+
+    /// Record a batch send operation
+    pub fn record_batch_sent(&self, messages: u64, bytes: u64, latency_us: 
f64) {
+        // Buffer the raw data for async processing
+        self.buffer.push(buffer::MetricEvent {
+            timestamp: Instant::now(),
+            event_type: buffer::EventType::BatchSent {
+                messages,
+                bytes,
+                latency_us,
+            },
+            labels: self.actor_labels.clone(),
+        });
+    }
+
+    /// Record a batch receive operation
+    pub fn record_batch_received(&self, messages: u64, bytes: u64, latency_us: 
f64) {
+        self.buffer.push(buffer::MetricEvent {
+            timestamp: Instant::now(),
+            event_type: buffer::EventType::BatchReceived {
+                messages,
+                bytes,
+                latency_us,
+            },
+            labels: self.actor_labels.clone(),
+        });
+    }
+
+    /// Record throughput measurement
+    pub fn record_throughput(&self, messages_per_sec: f64, mb_per_sec: f64) {
+        self.buffer.push(buffer::MetricEvent {
+            timestamp: Instant::now(),
+            event_type: buffer::EventType::Throughput {
+                messages_per_sec,
+                mb_per_sec,
+            },
+            labels: self.actor_labels.clone(),
+        });
+    }
+}
diff --git a/core/bench/src/telemetry/setup.rs 
b/core/bench/src/telemetry/setup.rs
new file mode 100644
index 00000000..deaee061
--- /dev/null
+++ b/core/bench/src/telemetry/setup.rs
@@ -0,0 +1,302 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{BenchMetrics, MetricsHandle, buffer};
+use opentelemetry::{KeyValue, global};
+use opentelemetry_otlp::WithExportConfig;
+use opentelemetry_sdk::{
+    Resource,
+    metrics::{PeriodicReader, SdkMeterProvider},
+};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+use tokio::sync::oneshot;
+use tracing::{error, info, warn};
+
+pub struct TelemetryConfig {
+    pub enabled: bool,
+    pub endpoint: String,
+    pub export_interval: Duration,
+    pub export_timeout: Duration,
+    pub buffer_size: usize,
+    pub buffer_flush_interval: Duration,
+    pub service_name: String,
+    pub service_version: String,
+    pub benchmark_id: String,
+}
+
+impl Default for TelemetryConfig {
+    fn default() -> Self {
+        Self {
+            enabled: false,
+            endpoint: "http://localhost:4317".to_string(),
+            export_interval: Duration::from_secs(1),
+            export_timeout: Duration::from_secs(10),
+            buffer_size: 10000,
+            buffer_flush_interval: Duration::from_millis(100),
+            service_name: "iggy-bench".to_string(),
+            service_version: env!("CARGO_PKG_VERSION").to_string(),
+            benchmark_id: String::new(), // Will be set when creating config
+        }
+    }
+}
+
+pub struct TelemetryContext {
+    pub metrics: Arc<BenchMetrics>,
+    pub buffer: Arc<buffer::MetricsBuffer>,
+    processor_handle: Option<tokio::task::JoinHandle<()>>,
+    stats_handle: Option<tokio::task::JoinHandle<()>>,
+    meter_provider: Arc<Mutex<Option<SdkMeterProvider>>>,
+    shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl TelemetryContext {
+    // Async for API consistency with future async operations
+    #[allow(clippy::unused_async)]
+    #[allow(clippy::cognitive_complexity)]
+    pub async fn init(config: TelemetryConfig) -> Result<Option<Self>, Box<dyn 
std::error::Error>> {
+        if !config.enabled {
+            info!("OpenTelemetry metrics disabled");
+            return Ok(None);
+        }
+
+        info!(
+            "Initializing OpenTelemetry metrics with endpoint: {}",
+            config.endpoint
+        );
+
+        // Verify connection to OTLP endpoint with retries (up to 10 seconds)
+        Self::verify_connection_with_retry(&config.endpoint, 10, 
Duration::from_secs(1)).await?;
+
+        // Create resource with benchmark metadata
+        let resource = Resource::builder()
+            .with_attribute(KeyValue::new("service.name", 
config.service_name.clone()))
+            .with_attribute(KeyValue::new(
+                "service.version",
+                config.service_version.clone(),
+            ))
+            .with_attribute(KeyValue::new("benchmark.id", 
config.benchmark_id.clone()))
+            .with_attribute(KeyValue::new("telemetry.sdk.name", 
"opentelemetry"))
+            .with_attribute(KeyValue::new("telemetry.sdk.language", "rust"))
+            .build();
+
+        // Configure OTLP exporter
+        let exporter = opentelemetry_otlp::MetricExporter::builder()
+            .with_tonic()
+            .with_endpoint(config.endpoint.clone())
+            .with_timeout(config.export_timeout)
+            .build()?;
+
+        // Create periodic reader for metrics export
+        let reader = PeriodicReader::builder(exporter)
+            .with_interval(config.export_interval)
+            .build();
+
+        // Build meter provider
+        let meter_provider = SdkMeterProvider::builder()
+            .with_reader(reader)
+            .with_resource(resource)
+            .build();
+
+        // Set as global provider
+        global::set_meter_provider(meter_provider.clone());
+
+        // Get meter and create metrics
+        let meter = global::meter("iggy-bench");
+        let metrics = Arc::new(BenchMetrics::new(&meter));
+
+        // Create metrics buffer
+        let (buffer, processor) =
+            buffer::MetricsBuffer::new(config.buffer_size, 
config.buffer_flush_interval);
+        let buffer = Arc::new(buffer);
+
+        // Create shutdown channel
+        let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+
+        // Start processor task with graceful shutdown
+        let metrics_clone = metrics.clone();
+        let processor_handle = tokio::spawn(async move {
+            processor.run(metrics_clone).await;
+        });
+
+        // Start buffer stats reporter with graceful shutdown
+        let buffer_clone = buffer.clone();
+        let stats_handle = tokio::spawn(async move {
+            let reporter = buffer::BufferStatsReporter::new(buffer_clone, 
Duration::from_secs(10));
+            tokio::select! {
+                _ = reporter.run() => {},
+                _ = &mut shutdown_rx => {
+                    info!("Stats reporter received shutdown signal");
+                }
+            }
+        });
+
+        info!("OpenTelemetry metrics initialized successfully");
+
+        Ok(Some(Self {
+            metrics,
+            buffer,
+            processor_handle: Some(processor_handle),
+            stats_handle: Some(stats_handle),
+            meter_provider: Arc::new(Mutex::new(Some(meter_provider))),
+            shutdown_tx: Some(shutdown_tx),
+        }))
+    }
+
+    pub fn create_handle(&self, actor_type: &str, actor_id: u32, stream_id: 
u32) -> MetricsHandle {
+        MetricsHandle::new(
+            self.metrics.clone(),
+            self.buffer.clone(),
+            actor_type,
+            actor_id,
+            stream_id,
+        )
+    }
+
+    #[allow(clippy::cognitive_complexity)]
+    pub async fn shutdown(mut self) {
+        info!("Shutting down OpenTelemetry metrics");
+
+        // Signal shutdown to background tasks
+        if let Some(tx) = self.shutdown_tx.take() {
+            let _ = tx.send(());
+        }
+
+        // First, close the buffer to signal no more events
+        // This is done by dropping the buffer which drops the sender
+        drop(self.buffer);
+
+        // Wait for processor to finish processing remaining events
+        if let Some(handle) = self.processor_handle.take() {
+            match tokio::time::timeout(Duration::from_secs(5), handle).await {
+                Ok(Ok(())) => {
+                    info!("Metrics processor shut down cleanly");
+                }
+                Ok(Err(e)) => {
+                    error!("Metrics processor task failed: {}", e);
+                }
+                Err(_) => {
+                    warn!("Metrics processor shutdown timed out after 5 
seconds");
+                }
+            }
+        }
+
+        // Wait for stats reporter to finish
+        if let Some(handle) = self.stats_handle.take() {
+            // Stats reporter should exit quickly due to shutdown signal
+            match tokio::time::timeout(Duration::from_secs(1), handle).await {
+                Ok(Ok(())) => {
+                    info!("Stats reporter shut down cleanly");
+                }
+                Ok(Err(e)) if !e.is_cancelled() => {
+                    error!("Stats reporter task failed: {}", e);
+                }
+                _ => {
+                    // Task was cancelled or timed out, which is fine
+                }
+            }
+        }
+
+        // Shutdown meter provider with mutex protection
+        let mut provider_guard = self.meter_provider.lock().unwrap();
+        if let Some(provider) = provider_guard.take() {
+            // Force a final flush before shutdown
+            if let Err(e) = provider.force_flush() {
+                error!("Error flushing meter provider: {}", e);
+            }
+
+            // Shutdown the provider
+            if let Err(e) = provider.shutdown() {
+                error!("Error shutting down meter provider: {}", e);
+            }
+
+            // Clear the global meter provider
+            // Note: OpenTelemetry doesn't provide a way to unset the global 
provider,
+            // but shutting it down ensures it won't accept new metrics
+        }
+        drop(provider_guard);
+
+        info!("OpenTelemetry metrics shutdown complete");
+    }
+
+    /// Verify that we can connect to the OTLP endpoint
+    async fn verify_connection_with_retry(
+        endpoint: &str,
+        max_attempts: u32,
+        retry_delay: Duration,
+    ) -> Result<(), Box<dyn std::error::Error>> {
+        info!("Verifying connection to OTLP endpoint: {}", endpoint);
+
+        for attempt in 1..=max_attempts {
+            match Self::test_otlp_connection(endpoint).await {
+                Ok(()) => {
+                    info!("Successfully connected to OTLP endpoint");
+                    return Ok(());
+                }
+                Err(e) if attempt < max_attempts => {
+                    warn!(
+                        "Failed to connect to OTLP endpoint (attempt {}/{}): 
{}",
+                        attempt, max_attempts, e
+                    );
+                    tokio::time::sleep(retry_delay).await;
+                }
+                Err(e) => {
+                    error!(
+                        "Failed to connect to OTLP endpoint after {} attempts: 
{}",
+                        max_attempts, e
+                    );
+                    return Err(format!(
+                        "Cannot connect to OpenTelemetry collector at {}. \
+                        Please ensure the collector is running and 
accessible.",
+                        endpoint
+                    )
+                    .into());
+                }
+            }
+        }
+        unreachable!()
+    }
+
+    /// Test connection to OTLP endpoint using TCP connection test
+    async fn test_otlp_connection(endpoint: &str) -> Result<(), Box<dyn 
std::error::Error>> {
+        use tokio::net::TcpStream;
+        use tokio::time::timeout;
+
+        // Parse the endpoint URL to extract host and port
+        let endpoint = endpoint
+            .trim_start_matches("http://";)
+            .trim_start_matches("https://";);
+
+        // Default OTLP gRPC port is 4317
+        let addr = if endpoint.contains(':') {
+            endpoint.to_string()
+        } else {
+            format!("{}:4317", endpoint)
+        };
+
+        // Try to establish TCP connection to verify the endpoint is reachable
+        match timeout(Duration::from_secs(2), TcpStream::connect(&addr)).await 
{
+            Ok(Ok(_)) => {
+                // Connection successful - endpoint is reachable
+                Ok(())
+            }
+            Ok(Err(e)) => Err(format!("Cannot connect to OTLP endpoint at {}: 
{}", addr, e).into()),
+            Err(_) => Err(format!("Connection to OTLP endpoint at {} timed 
out", addr).into()),
+        }
+    }
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3a0cf94e..3213c29b 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -65,9 +65,9 @@ mimalloc = { workspace = true, optional = true }
 moka = { version = "0.12.10", features = ["future"] }
 nix = { version = "0.30", features = ["fs"] }
 once_cell = "1.21.3"
-opentelemetry = { version = "0.30.0", features = ["trace", "logs"] }
-opentelemetry-appender-tracing = { version = "0.30.1", features = ["log"] }
-opentelemetry-otlp = { version = "0.30.0", features = [
+opentelemetry = { workspace = true, features = ["trace", "logs"] }
+opentelemetry-appender-tracing = { workspace = true, features = ["log"] }
+opentelemetry-otlp = { workspace = true, features = [
     "logs",
     "trace",
     "grpc-tonic",
@@ -76,8 +76,8 @@ opentelemetry-otlp = { version = "0.30.0", features = [
     "reqwest-client",
     "tokio",
 ] }
-opentelemetry-semantic-conventions = "0.30.0"
-opentelemetry_sdk = { version = "0.30.0", features = [
+opentelemetry-semantic-conventions = { workspace = true }
+opentelemetry_sdk = { workspace = true, features = [
     "rt-tokio",
     "logs",
     "trace",

Reply via email to