This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 340a7f435 fix(sdk): fix high-level consumer polling strategies (#2495)
340a7f435 is described below

commit 340a7f43543897400891c3688eac699d954a8e7e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sun Dec 21 11:10:53 2025 +0100

    fix(sdk): fix high-level consumer polling strategies (#2495)
    
    Fix the high-level consumer to properly transition polling strategy after 
the initial poll.
    Previously, only Offset strategy triggered the update, causing 
Timestamp/First/Last
    strategies to poll the same messages in a busy loop.
    
    Besides that, fixed background producer (ported code from  #2456) added 
shutdown
    for task that is responsible for storing background offsets
    and removed duplicated code in wait_before_polling.
    
    ---------
    
    Co-authored-by: Hubert Gruszecki <[email protected]>
---
 Cargo.lock                                         |   2 +-
 DEPENDENCIES.md                                    |   2 +-
 core/integration/tests/sdk/producer/background.rs  |  88 +++++++++
 core/integration/tests/server/general.rs           |   6 +-
 core/integration/tests/server/mod.rs               |   9 +-
 .../consumer_timestamp_polling_scenario.rs         | 217 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 core/sdk/Cargo.toml                                |   2 +-
 core/sdk/src/clients/consumer.rs                   |  39 ++--
 core/sdk/src/clients/producer_sharding.rs          |   2 +-
 10 files changed, 348 insertions(+), 20 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 085a4802e..c69a46130 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4442,7 +4442,7 @@ dependencies = [
 
 [[package]]
 name = "iggy"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
 dependencies = [
  "async-broadcast",
  "async-dropper",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e967141dd..8b06b77ac 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -390,7 +390,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
 ident_case: 1.0.1, "Apache-2.0 OR MIT",
 idna: 1.1.0, "Apache-2.0 OR MIT",
 idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.1, "Apache-2.0",
+iggy: 0.8.1-edge.2, "Apache-2.0",
 iggy-bench: 0.3.1-edge.1, "Apache-2.0",
 iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
 iggy-cli: 0.10.1-edge.1, "Apache-2.0",
diff --git a/core/integration/tests/sdk/producer/background.rs 
b/core/integration/tests/sdk/producer/background.rs
index 54745e186..d8d7818ae 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -405,3 +405,91 @@ async fn background_many_parallel_producers() {
     assert_eq!(polled_messages.messages.len() as u32, 10);
     cleanup(&client).await;
 }
+
+/// Verifies that linger_time is respected after idle periods with no messages.
+///
+/// This tests the fix for a bug where `last_flush` was not updated when the
+/// timeout fired with an empty buffer. Without the fix, the deadline would
+/// always be in the past after the first idle timeout, causing immediate
+/// flushes instead of waiting for the full linger_time.
+#[tokio::test]
+#[parallel]
+async fn background_linger_time_respected_after_idle() {
+    let mut test_server = TestServer::default();
+    test_server.start();
+
+    let tcp_client_config = TcpClientConfig {
+        server_address: test_server.get_raw_tcp_addr().unwrap(),
+        ..TcpClientConfig::default()
+    };
+    let client = 
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+    let client = IggyClient::create(client, None, None);
+
+    client.connect().await.unwrap();
+    assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+    login_root(&client).await;
+    init_system(&client).await;
+
+    let cfg = BackgroundConfig::builder()
+        .linger_time(IggyDuration::from(200_000))
+        .build();
+
+    let producer = client
+        .producer(STREAM_NAME, TOPIC_NAME)
+        .unwrap()
+        .partitioning(Partitioning::partition_id(PARTITION_ID))
+        .background(cfg)
+        .build();
+
+    // Wait for multiple idle timeouts to fire (with empty buffer).
+    // With the bug: last_flush would never be updated, causing deadline to be 
stale.
+    sleep(Duration::from_millis(500)).await;
+
+    let msg = IggyMessage::builder()
+        .id(1)
+        .payload(Bytes::from_static(b"test"))
+        .build()
+        .unwrap();
+    producer.send(vec![msg]).await.unwrap();
+
+    // Wait less than linger_time - message should NOT be flushed yet
+    sleep(Duration::from_millis(100)).await;
+
+    let consumer = Consumer::default();
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, 0);
+
+    // Wait for remaining linger_time + buffer
+    sleep(Duration::from_millis(150)).await;
+
+    let polled_messages = client
+        .poll_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::offset(0),
+            1,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(polled_messages.messages.len() as u32, 1);
+
+    producer.shutdown().await;
+    cleanup(&client).await;
+}
diff --git a/core/integration/tests/server/general.rs 
b/core/integration/tests/server/general.rs
index 37e47d9b6..9aa0ef04c 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::server::{
-    ScenarioFn, bench_scenario, create_message_payload_scenario, 
message_headers_scenario,
-    run_scenario, stream_size_validation_scenario, system_scenario, 
user_scenario,
+    ScenarioFn, bench_scenario, consumer_timestamp_polling_scenario,
+    create_message_payload_scenario, message_headers_scenario, run_scenario,
+    stream_size_validation_scenario, system_scenario, user_scenario,
 };
 use iggy_common::TransportProtocol;
 use serial_test::parallel;
@@ -32,6 +33,7 @@ use test_case::test_matrix;
         create_message_payload_scenario(),
         stream_size_validation_scenario(),
         bench_scenario(),
+        consumer_timestamp_polling_scenario(),
     ]
 )]
 #[tokio::test]
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index a8473c502..d4d4ad293 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -34,8 +34,9 @@ use scenarios::{
     bench_scenario, consumer_group_auto_commit_reconnection_scenario, 
consumer_group_join_scenario,
     consumer_group_offset_cleanup_scenario,
     consumer_group_with_multiple_clients_polling_messages_scenario,
-    consumer_group_with_single_client_polling_messages_scenario, 
create_message_payload,
-    message_headers_scenario, stream_size_validation_scenario, 
system_scenario, user_scenario,
+    consumer_group_with_single_client_polling_messages_scenario,
+    consumer_timestamp_polling_scenario, create_message_payload, 
message_headers_scenario,
+    stream_size_validation_scenario, system_scenario, user_scenario,
 };
 use std::pin::Pin;
 use std::{collections::HashMap, future::Future};
@@ -90,6 +91,10 @@ fn bench_scenario() -> ScenarioFn {
     |factory| Box::pin(bench_scenario::run(factory))
 }
 
+fn consumer_timestamp_polling_scenario() -> ScenarioFn {
+    |factory| Box::pin(consumer_timestamp_polling_scenario::run(factory))
+}
+
 async fn run_scenario(transport: TransportProtocol, scenario: ScenarioFn) {
     // TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to 
small messages being used in the test.
     // For some reason TCP in compio can't deal with it, but in tokio it works 
fine.
diff --git 
a/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
 
b/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
new file mode 100644
index 000000000..4beb9d2cf
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
@@ -0,0 +1,217 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME, 
create_client};
+use futures::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, timeout};
+
+const MESSAGES_COUNT: u32 = 2000;
+const BATCH_LENGTH: u32 = 100;
+const CONSUME_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = create_client(client_factory).await;
+    login_root(&client).await;
+
+    test_offset_strategy(&client).await;
+    test_timestamp_strategy(&client).await;
+    test_first_strategy(&client).await;
+    test_last_strategy(&client).await;
+}
+
+async fn test_offset_strategy(client: &IggyClient) {
+    init_system(client).await;
+    produce_messages(client).await;
+
+    let received =
+        consume_with_strategy(client, "offset-consumer", 
PollingStrategy::offset(0)).await;
+    assert_received_messages(&received, "Offset");
+
+    cleanup(client).await;
+}
+
+async fn test_timestamp_strategy(client: &IggyClient) {
+    init_system(client).await;
+    let start_timestamp = IggyTimestamp::now();
+    produce_messages(client).await;
+
+    let received = consume_with_strategy(
+        client,
+        "timestamp-consumer",
+        PollingStrategy::timestamp(start_timestamp),
+    )
+    .await;
+    assert_received_messages(&received, "Timestamp");
+
+    cleanup(client).await;
+}
+
+async fn test_first_strategy(client: &IggyClient) {
+    init_system(client).await;
+    produce_messages(client).await;
+
+    let received = consume_with_strategy(client, "first-consumer", 
PollingStrategy::first()).await;
+    assert_received_messages(&received, "First");
+
+    cleanup(client).await;
+}
+
+async fn test_last_strategy(client: &IggyClient) {
+    init_system(client).await;
+    produce_messages(client).await;
+
+    // Last strategy with batch_length=100 returns the last 100 messages
+    let received = consume_with_strategy(client, "last-consumer", 
PollingStrategy::last()).await;
+
+    assert_eq!(
+        received.len(),
+        BATCH_LENGTH as usize,
+        "Last strategy: expected {} messages, received {}",
+        BATCH_LENGTH,
+        received.len()
+    );
+
+    // Verify messages are from the last batch (offsets 1900-1999 -> 
message_1901 to message_2000)
+    let start_message_num = MESSAGES_COUNT - BATCH_LENGTH + 1;
+    for (i, msg) in received.iter().enumerate() {
+        let expected_payload = format!("message_{}", start_message_num + i as 
u32);
+        let actual_payload =
+            String::from_utf8(msg.payload.to_vec()).expect("Payload should be 
valid UTF-8");
+        assert_eq!(
+            actual_payload, expected_payload,
+            "Last strategy: message {} payload mismatch",
+            i
+        );
+    }
+
+    cleanup(client).await;
+}
+
+fn assert_received_messages(received: &[IggyMessage], strategy_name: &str) {
+    assert_eq!(
+        received.len(),
+        MESSAGES_COUNT as usize,
+        "{} strategy: expected {} messages, received {}",
+        strategy_name,
+        MESSAGES_COUNT,
+        received.len()
+    );
+
+    for (i, msg) in received.iter().enumerate() {
+        let expected_payload = format!("message_{}", i + 1);
+        let actual_payload =
+            String::from_utf8(msg.payload.to_vec()).expect("Payload should be 
valid UTF-8");
+        assert_eq!(
+            actual_payload, expected_payload,
+            "{} strategy: message {} payload mismatch",
+            strategy_name, i
+        );
+    }
+}
+
+async fn init_system(client: &IggyClient) {
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+}
+
+async fn produce_messages(client: &IggyClient) {
+    let mut messages = Vec::with_capacity(MESSAGES_COUNT as usize);
+    for i in 1..=MESSAGES_COUNT {
+        let payload = format!("message_{}", i);
+        let message = IggyMessage::from_str(&payload).unwrap();
+        messages.push(message);
+    }
+
+    client
+        .send_messages(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            &Identifier::named(TOPIC_NAME).unwrap(),
+            &Partitioning::partition_id(PARTITION_ID),
+            &mut messages,
+        )
+        .await
+        .unwrap();
+}
+
+async fn consume_with_strategy(
+    client: &IggyClient,
+    consumer_name: &str,
+    strategy: PollingStrategy,
+) -> Vec<IggyMessage> {
+    let expected_count = if strategy.kind == PollingKind::Last {
+        BATCH_LENGTH
+    } else {
+        MESSAGES_COUNT
+    };
+
+    let mut consumer = client
+        .consumer(consumer_name, STREAM_NAME, TOPIC_NAME, PARTITION_ID)
+        .unwrap()
+        .auto_commit(AutoCommit::IntervalOrWhen(
+            IggyDuration::from_str("2ms").unwrap(),
+            AutoCommitWhen::ConsumingAllMessages,
+        ))
+        .polling_strategy(strategy)
+        .poll_interval(IggyDuration::from_str("2ms").unwrap())
+        .batch_length(BATCH_LENGTH)
+        .build();
+
+    consumer.init().await.unwrap();
+
+    let mut received_messages = Vec::with_capacity(expected_count as usize);
+    let mut consumed_count = 0u32;
+
+    while consumed_count < expected_count {
+        match timeout(CONSUME_TIMEOUT, consumer.next()).await {
+            Ok(Some(Ok(received))) => {
+                received_messages.push(received.message);
+                consumed_count += 1;
+            }
+            Ok(Some(Err(e))) => panic!("Error consuming message: {}", e),
+            Ok(None) => break,
+            Err(_) => panic!(
+                "Timeout after {:?} waiting for message {}/{} with {:?} 
strategy",
+                CONSUME_TIMEOUT, consumed_count, expected_count, strategy.kind
+            ),
+        }
+    }
+
+    received_messages
+}
+
+async fn cleanup(client: &IggyClient) {
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index c8eb5a142..a940c5155 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -23,6 +23,7 @@ pub mod consumer_group_join_scenario;
 pub mod consumer_group_offset_cleanup_scenario;
 pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
 pub mod consumer_group_with_single_client_polling_messages_scenario;
+pub mod consumer_timestamp_polling_scenario;
 pub mod create_message_payload;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 81e8ef617..3f65fcaff 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "iggy"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
 description = "Iggy is the persistent message streaming platform written in 
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing 
millions of messages per second."
 edition = "2024"
 license = "Apache-2.0"
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 0760d3f8a..3c2a23666 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -41,7 +41,7 @@ use tokio::time::sleep;
 use tracing::{error, info, trace, warn};
 
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages, 
IggyError>>>>;
+type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages, 
IggyError>> + Send>>;
 
 /// The auto-commit configuration for storing the offset on the server.
 #[derive(Debug, PartialEq, Copy, Clone)]
@@ -90,11 +90,18 @@ pub enum AutoCommitAfter {
     ConsumingEveryNthMessage(u32),
 }
 
-unsafe impl Send for IggyConsumer {}
+// SAFETY: IggyConsumer is Sync because:
+// 1. The only non-Sync field is `poll_future: Option<PollMessagesFuture>`
+// 2. `poll_future` is only accessed through `poll_next()` which requires 
`Pin<&mut Self>`
+//    (exclusive mutable access), so concurrent access to `poll_future` is 
impossible
+// 3. All other fields are inherently Sync (Arc<AtomicX>, Arc<DashMap>, etc.) 
or
+//    only accessed through `&mut self` methods
+// 4. All `&self` methods only access Sync-safe fields
 unsafe impl Sync for IggyConsumer {}
 
 pub struct IggyConsumer {
     initialized: bool,
+    shutdown: Arc<AtomicBool>,
     can_poll: Arc<AtomicBool>,
     client: IggyRwLock<ClientWrapper>,
     consumer_name: String,
@@ -153,6 +160,7 @@ impl IggyConsumer {
         let (store_offset_sender, _) = flume::unbounded();
         Self {
             initialized: false,
+            shutdown: Arc::new(AtomicBool::new(false)),
             is_consumer_group: consumer.kind == ConsumerKind::ConsumerGroup,
             joined_consumer_group: Arc::new(AtomicBool::new(false)),
             can_poll: Arc::new(AtomicBool::new(true)),
@@ -463,9 +471,14 @@ impl IggyConsumer {
         let topic_id = self.topic_id.clone();
         let last_consumed_offsets = self.last_consumed_offsets.clone();
         let last_stored_offsets = self.last_stored_offsets.clone();
+        let shutdown = self.shutdown.clone();
         tokio::spawn(async move {
             loop {
                 sleep(interval.get_duration()).await;
+                if shutdown.load(ORDERING) {
+                    trace!("Shutdown signal received, stopping background 
offset storage");
+                    break;
+                }
                 for entry in last_consumed_offsets.iter() {
                     let partition_id = *entry.key();
                     let consumed_offset = entry.load(ORDERING);
@@ -784,14 +797,6 @@ impl IggyConsumer {
             return;
         }
 
-        if now < last_sent_at {
-            warn!(
-                "Returned monotonic time went backwards, now < last_sent_at: 
({now} < {last_sent_at})"
-            );
-            sleep(Duration::from_micros(interval)).await;
-            return;
-        }
-
         let elapsed = now - last_sent_at;
         if elapsed >= interval {
             trace!("No need to wait before polling messages. {now} - 
{last_sent_at} = {elapsed}");
@@ -923,7 +928,7 @@ impl Stream for IggyConsumer {
             }
 
             if self.buffered_messages.is_empty() {
-                if self.polling_strategy.kind == PollingKind::Offset {
+                if self.polling_strategy.kind != PollingKind::Next {
                     self.polling_strategy = 
PollingStrategy::offset(message.header.offset + 1);
                 }
 
@@ -990,7 +995,7 @@ impl Stream for IggyConsumer {
                         let message = polled_messages.messages.remove(0);
                         
self.buffered_messages.extend(polled_messages.messages);
 
-                        if self.polling_strategy.kind == PollingKind::Offset {
+                        if self.polling_strategy.kind != PollingKind::Next {
                             self.polling_strategy =
                                 PollingStrategy::offset(message.header.offset 
+ 1);
                         }
@@ -1035,3 +1040,13 @@ impl Stream for IggyConsumer {
         Poll::Pending
     }
 }
+
+impl Drop for IggyConsumer {
+    fn drop(&mut self) {
+        self.shutdown.store(true, ORDERING);
+        trace!(
+            "Consumer {} has been dropped, shutdown signal sent",
+            self.consumer_name
+        );
+    }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs 
b/core/sdk/src/clients/producer_sharding.rs
index 49c2e17a7..8bdca2213 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -164,8 +164,8 @@ impl Shard {
                     _ = tokio::time::sleep_until(deadline) => {
                         if !buffer.is_empty() {
                             Self::flush_buffer(&core, &mut buffer, &mut 
buffer_bytes, &err_sender).await;
-                            last_flush = tokio::time::Instant::now();
                         }
+                        last_flush = tokio::time::Instant::now();
                     }
                     _ = stop_rx.recv() => {
                         closed_clone.store(true, Ordering::Release);

Reply via email to