This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch fix/producer-shard-last_flush
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit afb44b27fdda2442f00aba1cba8e8454d1c26fd1
Author: Frank Bell <[email protected]>
AuthorDate: Mon Dec 8 15:02:43 2025 +0000

    test(sdk): add integration test for producer last_flush fix
---
 core/integration/tests/sdk/producer/background.rs | 88 +++++++++++++++++++++++
 1 file changed, 88 insertions(+)

diff --git a/core/integration/tests/sdk/producer/background.rs 
b/core/integration/tests/sdk/producer/background.rs
index 54745e186..d8d7818ae 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -405,3 +405,91 @@ async fn background_many_parallel_producers() {
     assert_eq!(polled_messages.messages.len() as u32, 10);
     cleanup(&client).await;
 }
+
+/// Verifies that linger_time is respected after idle periods with no messages.
+///
+/// This tests the fix for a bug where `last_flush` was not updated when the
+/// timeout fired with an empty buffer. Without the fix, the deadline would
+/// always be in the past after the first idle timeout, causing immediate
+/// flushes instead of waiting for the full linger_time.
+#[tokio::test]
+#[parallel]
+async fn background_linger_time_respected_after_idle() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+    login_root(&client).await;
+    init_system(&client).await;
+
+    let cfg = BackgroundConfig::builder()
+        .linger_time(IggyDuration::from(200_000))
+        .build();
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .partitioning(Partitioning::partition_id(PARTITION_ID))
+        .background(cfg)
+        .build();
+
+    // Wait for multiple idle timeouts to fire (with empty buffer).
+    // With the bug: last_flush would never be updated, causing deadline to be 
stale.
+    sleep(Duration::from_millis(500)).await;
+
+    let msg = IggyMessage::builder()
+        .id(1)
+        .payload(Bytes::from_static(b"test"))
+        .build()
+        .unwrap();
+    producer.send(vec![msg]).await.unwrap();
+
+    // Wait less than linger_time - message should NOT be flushed yet
+    sleep(Duration::from_millis(100)).await;
+
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, 0);
+
+    // Wait for remaining linger_time + buffer
+    sleep(Duration::from_millis(150)).await;
+
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, 1);
+
+    producer.shutdown().await;
+    cleanup(&client).await;
+}

Reply via email to