This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix/producer-shard-last_flush in repository https://gitbox.apache.org/repos/asf/iggy.git
commit afb44b27fdda2442f00aba1cba8e8454d1c26fd1 Author: Frank Bell <[email protected]> AuthorDate: Mon Dec 8 15:02:43 2025 +0000 test(sdk): add integration test for producer last_flush fix --- core/integration/tests/sdk/producer/background.rs | 88 +++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/core/integration/tests/sdk/producer/background.rs b/core/integration/tests/sdk/producer/background.rs index 54745e186..d8d7818ae 100644 --- a/core/integration/tests/sdk/producer/background.rs +++ b/core/integration/tests/sdk/producer/background.rs @@ -405,3 +405,91 @@ async fn background_many_parallel_producers() { assert_eq!(polled_messages.messages.len() as u32, 10); cleanup(&client).await; } + +/// Verifies that linger_time is respected after idle periods with no messages. +/// +/// This tests the fix for a bug where `last_flush` was not updated when the +/// timeout fired with an empty buffer. Without the fix, the deadline would +/// always be in the past after the first idle timeout, causing immediate +/// flushes instead of waiting for the full linger_time. +#[tokio::test] +#[parallel] +async fn background_linger_time_respected_after_idle() { + let mut test_server = TestServer::default(); + test_server.start(); + + let tcp_client_config = TcpClientConfig { + server_address: test_server.get_raw_tcp_addr().unwrap(), + ..TcpClientConfig::default() + }; + let client = ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap()); + let client = IggyClient::create(client, None, None); + + client.connect().await.unwrap(); + assert!(client.ping().await.is_ok(), "Failed to ping server"); + + login_root(&client).await; + init_system(&client).await; + + let cfg = BackgroundConfig::builder() + .linger_time(IggyDuration::from(200_000)) + .build(); + + let producer = client + .producer(STREAM_NAME, TOPIC_NAME) + .unwrap() + .partitioning(Partitioning::partition_id(PARTITION_ID)) + .background(cfg) + .build(); + + // Wait for multiple idle timeouts to fire (with empty buffer). + // With the bug: last_flush would never be updated, causing deadline to be stale. + sleep(Duration::from_millis(500)).await; + + let msg = IggyMessage::builder() + .id(1) + .payload(Bytes::from_static(b"test")) + .build() + .unwrap(); + producer.send(vec![msg]).await.unwrap(); + + // Wait less than linger_time - message should NOT be flushed yet + sleep(Duration::from_millis(100)).await; + + let consumer = Consumer::default(); + let polled_messages = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + + assert_eq!(polled_messages.messages.len() as u32, 0); + + // Wait for remaining linger_time + buffer + sleep(Duration::from_millis(150)).await; + + let polled_messages = client + .poll_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + Some(PARTITION_ID), + &consumer, + &PollingStrategy::offset(0), + 1, + false, + ) + .await + .unwrap(); + + assert_eq!(polled_messages.messages.len() as u32, 1); + + producer.shutdown().await; + cleanup(&client).await; +}
