This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch bench-expiry
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 704fb5cc962cac1285a0d9aa7d3bcf0c9bd69ac8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Feb 3 14:33:16 2026 +0100

    feat(bench): add --message-expiry CLI option for benchmark topics
    
    Benchmarks previously created topics with hardcoded NeverExpire policy,
    preventing testing of expiry-related scenarios.
    
    Add -e/--message-expiry flag to all producer benchmark kinds, allowing
    configurable message expiry (e.g., "1s", "5min", "1h"). Consumer-only
    benchmarks use the trait default since they don't create topics.
---
 core/bench/src/args/common.rs                                  |  6 +++++-
 core/bench/src/args/examples.rs                                |  1 +
 core/bench/src/args/kinds/balanced/consumer_group.rs           |  2 +-
 core/bench/src/args/kinds/balanced/producer.rs                 | 10 +++++++++-
 .../src/args/kinds/balanced/producer_and_consumer_group.rs     | 10 +++++++++-
 core/bench/src/args/kinds/end_to_end/producing_consumer.rs     | 10 +++++++++-
 .../src/args/kinds/end_to_end/producing_consumer_group.rs      | 10 +++++++++-
 core/bench/src/args/kinds/pinned/producer.rs                   | 10 +++++++++-
 core/bench/src/args/kinds/pinned/producer_and_consumer.rs      | 10 +++++++++-
 core/bench/src/args/props.rs                                   |  5 ++++-
 core/bench/src/benchmarks/benchmark.rs                         |  7 ++++---
 11 files changed, 69 insertions(+), 12 deletions(-)

diff --git a/core/bench/src/args/common.rs b/core/bench/src/args/common.rs
index ebc54e48b..be983c209 100644
--- a/core/bench/src/args/common.rs
+++ b/core/bench/src/args/common.rs
@@ -30,7 +30,7 @@ use bench_report::benchmark_kind::BenchmarkKind;
 use bench_report::numeric_parameter::BenchmarkNumericParameter;
 use clap::error::ErrorKind;
 use clap::{CommandFactory, Parser};
-use iggy::prelude::{IggyByteSize, IggyDuration, TransportProtocol};
+use iggy::prelude::{IggyByteSize, IggyDuration, IggyExpiry, TransportProtocol};
 use std::num::NonZeroU32;
 use std::str::FromStr;
 
@@ -291,6 +291,10 @@ impl IggyBenchArgs {
         self.benchmark_kind.inner().max_topic_size()
     }
 
+    pub fn message_expiry(&self) -> IggyExpiry {
+        self.benchmark_kind.inner().message_expiry()
+    }
+
     pub fn read_amplification(&self) -> Option<f32> {
         self.benchmark_kind.inner().read_amplification()
     }
diff --git a/core/bench/src/args/examples.rs b/core/bench/src/args/examples.rs
index 7a00929e5..ebd39ba91 100644
--- a/core/bench/src/args/examples.rs
+++ b/core/bench/src/args/examples.rs
@@ -68,6 +68,7 @@ const EXAMPLES: &str = r#"EXAMPLES:
     --producers (-c): Number of producers
     --consumers (-c): Number of consumers
     --max-topic-size (-T): Max topic size (e.g., "1GiB")
+    --message-expiry (-e): Message expiry time (e.g., "1s", "5min", "1h")
 
     Examples with detailed configuration:
 
diff --git a/core/bench/src/args/kinds/balanced/consumer_group.rs 
b/core/bench/src/args/kinds/balanced/consumer_group.rs
index a3c0fe2cc..ceb1d5ad8 100644
--- a/core/bench/src/args/kinds/balanced/consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/consumer_group.rs
@@ -82,7 +82,7 @@ impl BenchmarkKindProps for BalancedConsumerGroupArgs {
             cmd.error(
                 ErrorKind::ArgumentConflict,
                 format!(
-                    "In balanced consumer group, consumer groups number 
({cg_number}) must be less than the number of streams ({streams})"
+                    "In balanced consumer group, consumer groups number 
({cg_number}) must be greater than or equal to the number of streams 
({streams})"
                 ),
             )
             .exit();
diff --git a/core/bench/src/args/kinds/balanced/producer.rs 
b/core/bench/src/args/kinds/balanced/producer.rs
index 02a222288..3190fc9fd 100644
--- a/core/bench/src/args/kinds/balanced/producer.rs
+++ b/core/bench/src/args/kinds/balanced/producer.rs
@@ -26,7 +26,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 /// N producers sending to N separated stream-topic with single partition (one 
stream per one producer)
@@ -50,6 +50,10 @@ pub struct BalancedProducerArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". 
If not provided then the server default will be used.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
 }
 
 impl BenchmarkKindProps for BalancedProducerArgs {
@@ -81,6 +85,10 @@ impl BenchmarkKindProps for BalancedProducerArgs {
         self.max_topic_size
     }
 
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
+
     fn validate(&self) {
         let partitions = self.partitions();
         let mut cmd = IggyBenchArgs::command();
diff --git a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs 
b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
index a18c869e8..cedf3fff6 100644
--- a/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
+++ b/core/bench/src/args/kinds/balanced/producer_and_consumer_group.rs
@@ -27,7 +27,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 /// Polling benchmark with consumer group
@@ -60,6 +60,10 @@ pub struct BalancedProducerAndConsumerGroupArgs {
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
 
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
+
     /// Consumer rate limit multiplier relative to producer rate.
     /// When measuring E2E latency, consumers may need higher throughput to 
prevent queue buildup.
     /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
@@ -124,4 +128,8 @@ impl BenchmarkKindProps for 
BalancedProducerAndConsumerGroupArgs {
     fn max_topic_size(&self) -> Option<IggyByteSize> {
         self.max_topic_size
     }
+
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
 }
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs 
b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
index 57c0908d6..d4b20645e 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer.rs
@@ -23,7 +23,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 #[derive(Parser, Debug, Clone)]
@@ -42,6 +42,10 @@ pub struct EndToEndProducingConsumerArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". 
If not provided then the server default will be used.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
 }
 
 impl BenchmarkKindProps for EndToEndProducingConsumerArgs {
@@ -73,6 +77,10 @@ impl BenchmarkKindProps for EndToEndProducingConsumerArgs {
         self.max_topic_size
     }
 
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
+
     fn validate(&self) {
         let mut cmd = IggyBenchArgs::command();
         let streams = self.streams();
diff --git a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs 
b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
index 88cb58c2c..adcfd7e60 100644
--- a/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
+++ b/core/bench/src/args/kinds/end_to_end/producing_consumer_group.rs
@@ -27,7 +27,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 #[derive(Parser, Debug, Clone)]
@@ -58,6 +58,10 @@ pub struct EndToEndProducingConsumerGroupArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MB", "1GiB". 
If not provided then topic size will be unlimited.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
 }
 
 impl BenchmarkKindProps for EndToEndProducingConsumerGroupArgs {
@@ -89,6 +93,10 @@ impl BenchmarkKindProps for 
EndToEndProducingConsumerGroupArgs {
         self.max_topic_size
     }
 
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
+
     fn validate(&self) {
         let mut cmd = IggyBenchArgs::command();
         let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer.rs 
b/core/bench/src/args/kinds/pinned/producer.rs
index 138988fb6..e338745fa 100644
--- a/core/bench/src/args/kinds/pinned/producer.rs
+++ b/core/bench/src/args/kinds/pinned/producer.rs
@@ -21,7 +21,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 #[derive(Parser, Debug, Clone)]
@@ -41,6 +41,10 @@ pub struct PinnedProducerArgs {
     /// Max topic size in human readable format, e.g. "1GiB", "2MiB", "1GiB". 
If not provided then the server default will be used.
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
+
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
 }
 
 impl BenchmarkKindProps for PinnedProducerArgs {
@@ -72,6 +76,10 @@ impl BenchmarkKindProps for PinnedProducerArgs {
         self.max_topic_size
     }
 
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
+
     fn validate(&self) {
         let mut cmd = IggyBenchArgs::command();
         let streams = self.streams();
diff --git a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs 
b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
index 6777e253c..3e3eb7d47 100644
--- a/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
+++ b/core/bench/src/args/kinds/pinned/producer_and_consumer.rs
@@ -26,7 +26,7 @@ use crate::args::{
     transport::BenchmarkTransportCommand,
 };
 use clap::{CommandFactory, Parser, error::ErrorKind};
-use iggy::prelude::IggyByteSize;
+use iggy::prelude::{IggyByteSize, IggyExpiry};
 use std::num::NonZeroU32;
 
 #[derive(Parser, Debug, Clone)]
@@ -55,6 +55,10 @@ pub struct PinnedProducerAndConsumerArgs {
     #[arg(long, short = 'T')]
     pub max_topic_size: Option<IggyByteSize>,
 
+    /// Message expiry time in human readable format, e.g. "1s", "5min", "1h". 
If not provided, messages never expire.
+    #[arg(long, short = 'e')]
+    pub message_expiry: Option<IggyExpiry>,
+
     /// Consumer rate limit multiplier relative to producer rate.
     /// When measuring E2E latency, consumers may need higher throughput to 
prevent queue buildup.
     /// Default is 1.05 (5% higher than producer rate). Set to 1.0 to disable.
@@ -91,6 +95,10 @@ impl BenchmarkKindProps for PinnedProducerAndConsumerArgs {
         self.max_topic_size
     }
 
+    fn message_expiry(&self) -> IggyExpiry {
+        self.message_expiry.unwrap_or(IggyExpiry::NeverExpire)
+    }
+
     fn read_amplification(&self) -> Option<f32> {
         Some(self.read_amplification)
     }
diff --git a/core/bench/src/args/props.rs b/core/bench/src/args/props.rs
index 2230163e8..aaff93db1 100644
--- a/core/bench/src/args/props.rs
+++ b/core/bench/src/args/props.rs
@@ -17,7 +17,7 @@
  */
 
 use super::{output::BenchmarkOutputCommand, 
transport::BenchmarkTransportCommand};
-use iggy::prelude::{IggyByteSize, TransportProtocol};
+use iggy::prelude::{IggyByteSize, IggyExpiry, TransportProtocol};
 
 pub trait BenchmarkKindProps {
     fn streams(&self) -> u32;
@@ -27,6 +27,9 @@ pub trait BenchmarkKindProps {
     fn producers(&self) -> u32;
     fn transport_command(&self) -> &BenchmarkTransportCommand;
     fn max_topic_size(&self) -> Option<IggyByteSize>;
+    fn message_expiry(&self) -> IggyExpiry {
+        IggyExpiry::NeverExpire
+    }
     fn validate(&self);
 
     /// Consumer rate limit multiplier relative to producer rate.
diff --git a/core/bench/src/benchmarks/benchmark.rs 
b/core/bench/src/benchmarks/benchmark.rs
index 6a334b4f0..594a00101 100644
--- a/core/bench/src/benchmarks/benchmark.rs
+++ b/core/bench/src/benchmarks/benchmark.rs
@@ -112,10 +112,11 @@ pub trait Benchmarkable: Send {
                     .args()
                     .max_topic_size()
                     .map_or(MaxTopicSize::Unlimited, MaxTopicSize::Custom);
+                let message_expiry = self.args().message_expiry();
 
                 info!(
-                    "Creating the test topic '{}' for stream '{}' with max 
topic size: {:?}",
-                    topic_name, stream_name, max_topic_size
+                    "Creating the test topic '{}' for stream '{}' with max 
topic size: {:?}, message expiry: {}",
+                    topic_name, stream_name, max_topic_size, message_expiry
                 );
 
                 client
@@ -125,7 +126,7 @@ pub trait Benchmarkable: Send {
                         partitions_count,
                         CompressionAlgorithm::default(),
                         None,
-                        IggyExpiry::NeverExpire,
+                        message_expiry,
                         max_topic_size,
                     )
                     .await?;

Reply via email to