This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 340a7f435 fix(sdk): fix high-level consumer polling strategies (#2495)
340a7f435 is described below
commit 340a7f43543897400891c3688eac699d954a8e7e
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Sun Dec 21 11:10:53 2025 +0100
fix(sdk): fix high-level consumer polling strategies (#2495)
Fix the high-level consumer to properly transition polling strategy after
the initial poll.
Previously, only Offset strategy triggered the update, causing
Timestamp/First/Last
strategies to poll the same messages in a busy loop.
Besides that, fixed background producer (ported code from #2456) added
shutdown
for task that is responsible for storing background offsets
and removed duplicated code in wait_before_polling.
---------
Co-authored-by: Hubert Gruszecki <[email protected]>
---
Cargo.lock | 2 +-
DEPENDENCIES.md | 2 +-
core/integration/tests/sdk/producer/background.rs | 88 +++++++++
core/integration/tests/server/general.rs | 6 +-
core/integration/tests/server/mod.rs | 9 +-
.../consumer_timestamp_polling_scenario.rs | 217 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 1 +
core/sdk/Cargo.toml | 2 +-
core/sdk/src/clients/consumer.rs | 39 ++--
core/sdk/src/clients/producer_sharding.rs | 2 +-
10 files changed, 348 insertions(+), 20 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 085a4802e..c69a46130 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4442,7 +4442,7 @@ dependencies = [
[[package]]
name = "iggy"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
dependencies = [
"async-broadcast",
"async-dropper",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index e967141dd..8b06b77ac 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -390,7 +390,7 @@ icu_provider: 2.1.1, "Unicode-3.0",
ident_case: 1.0.1, "Apache-2.0 OR MIT",
idna: 1.1.0, "Apache-2.0 OR MIT",
idna_adapter: 1.2.1, "Apache-2.0 OR MIT",
-iggy: 0.8.1-edge.1, "Apache-2.0",
+iggy: 0.8.1-edge.2, "Apache-2.0",
iggy-bench: 0.3.1-edge.1, "Apache-2.0",
iggy-bench-dashboard-server: 0.5.1-edge.1, "Apache-2.0",
iggy-cli: 0.10.1-edge.1, "Apache-2.0",
diff --git a/core/integration/tests/sdk/producer/background.rs
b/core/integration/tests/sdk/producer/background.rs
index 54745e186..d8d7818ae 100644
--- a/core/integration/tests/sdk/producer/background.rs
+++ b/core/integration/tests/sdk/producer/background.rs
@@ -405,3 +405,91 @@ async fn background_many_parallel_producers() {
assert_eq!(polled_messages.messages.len() as u32, 10);
cleanup(&client).await;
}
+
+/// Verifies that linger_time is respected after idle periods with no messages.
+///
+/// This tests the fix for a bug where `last_flush` was not updated when the
+/// timeout fired with an empty buffer. Without the fix, the deadline would
+/// always be in the past after the first idle timeout, causing immediate
+/// flushes instead of waiting for the full linger_time.
+#[tokio::test]
+#[parallel]
+async fn background_linger_time_respected_after_idle() {
+ let mut test_server = TestServer::default();
+ test_server.start();
+
+ let tcp_client_config = TcpClientConfig {
+ server_address: test_server.get_raw_tcp_addr().unwrap(),
+ ..TcpClientConfig::default()
+ };
+ let client =
ClientWrapper::Tcp(TcpClient::create(Arc::new(tcp_client_config)).unwrap());
+ let client = IggyClient::create(client, None, None);
+
+ client.connect().await.unwrap();
+ assert!(client.ping().await.is_ok(), "Failed to ping server");
+
+ login_root(&client).await;
+ init_system(&client).await;
+
+ let cfg = BackgroundConfig::builder()
+ .linger_time(IggyDuration::from(200_000))
+ .build();
+
+ let producer = client
+ .producer(STREAM_NAME, TOPIC_NAME)
+ .unwrap()
+ .partitioning(Partitioning::partition_id(PARTITION_ID))
+ .background(cfg)
+ .build();
+
+ // Wait for multiple idle timeouts to fire (with empty buffer).
+ // With the bug: last_flush would never be updated, causing deadline to be
stale.
+ sleep(Duration::from_millis(500)).await;
+
+ let msg = IggyMessage::builder()
+ .id(1)
+ .payload(Bytes::from_static(b"test"))
+ .build()
+ .unwrap();
+ producer.send(vec![msg]).await.unwrap();
+
+ // Wait less than linger_time - message should NOT be flushed yet
+ sleep(Duration::from_millis(100)).await;
+
+ let consumer = Consumer::default();
+ let polled_messages = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::offset(0),
+ 1,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled_messages.messages.len() as u32, 0);
+
+ // Wait for remaining linger_time + buffer
+ sleep(Duration::from_millis(150)).await;
+
+ let polled_messages = client
+ .poll_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::offset(0),
+ 1,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(polled_messages.messages.len() as u32, 1);
+
+ producer.shutdown().await;
+ cleanup(&client).await;
+}
diff --git a/core/integration/tests/server/general.rs
b/core/integration/tests/server/general.rs
index 37e47d9b6..9aa0ef04c 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -16,8 +16,9 @@
// under the License.
use crate::server::{
- ScenarioFn, bench_scenario, create_message_payload_scenario,
message_headers_scenario,
- run_scenario, stream_size_validation_scenario, system_scenario,
user_scenario,
+ ScenarioFn, bench_scenario, consumer_timestamp_polling_scenario,
+ create_message_payload_scenario, message_headers_scenario, run_scenario,
+ stream_size_validation_scenario, system_scenario, user_scenario,
};
use iggy_common::TransportProtocol;
use serial_test::parallel;
@@ -32,6 +33,7 @@ use test_case::test_matrix;
create_message_payload_scenario(),
stream_size_validation_scenario(),
bench_scenario(),
+ consumer_timestamp_polling_scenario(),
]
)]
#[tokio::test]
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index a8473c502..d4d4ad293 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -34,8 +34,9 @@ use scenarios::{
bench_scenario, consumer_group_auto_commit_reconnection_scenario,
consumer_group_join_scenario,
consumer_group_offset_cleanup_scenario,
consumer_group_with_multiple_clients_polling_messages_scenario,
- consumer_group_with_single_client_polling_messages_scenario,
create_message_payload,
- message_headers_scenario, stream_size_validation_scenario,
system_scenario, user_scenario,
+ consumer_group_with_single_client_polling_messages_scenario,
+ consumer_timestamp_polling_scenario, create_message_payload,
message_headers_scenario,
+ stream_size_validation_scenario, system_scenario, user_scenario,
};
use std::pin::Pin;
use std::{collections::HashMap, future::Future};
@@ -90,6 +91,10 @@ fn bench_scenario() -> ScenarioFn {
|factory| Box::pin(bench_scenario::run(factory))
}
+fn consumer_timestamp_polling_scenario() -> ScenarioFn {
+ |factory| Box::pin(consumer_timestamp_polling_scenario::run(factory))
+}
+
async fn run_scenario(transport: TransportProtocol, scenario: ScenarioFn) {
// TODO: Need to enable `TCP_NODELAY` flag for TCP transports, due to
small messages being used in the test.
// For some reason TCP in compio can't deal with it, but in tokio it works
fine.
diff --git
a/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
b/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
new file mode 100644
index 000000000..4beb9d2cf
--- /dev/null
+++
b/core/integration/tests/server/scenarios/consumer_timestamp_polling_scenario.rs
@@ -0,0 +1,217 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{PARTITION_ID, STREAM_NAME, TOPIC_NAME,
create_client};
+use futures::StreamExt;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::str::FromStr;
+use tokio::time::{Duration, timeout};
+
+const MESSAGES_COUNT: u32 = 2000;
+const BATCH_LENGTH: u32 = 100;
+const CONSUME_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = create_client(client_factory).await;
+ login_root(&client).await;
+
+ test_offset_strategy(&client).await;
+ test_timestamp_strategy(&client).await;
+ test_first_strategy(&client).await;
+ test_last_strategy(&client).await;
+}
+
+async fn test_offset_strategy(client: &IggyClient) {
+ init_system(client).await;
+ produce_messages(client).await;
+
+ let received =
+ consume_with_strategy(client, "offset-consumer",
PollingStrategy::offset(0)).await;
+ assert_received_messages(&received, "Offset");
+
+ cleanup(client).await;
+}
+
+async fn test_timestamp_strategy(client: &IggyClient) {
+ init_system(client).await;
+ let start_timestamp = IggyTimestamp::now();
+ produce_messages(client).await;
+
+ let received = consume_with_strategy(
+ client,
+ "timestamp-consumer",
+ PollingStrategy::timestamp(start_timestamp),
+ )
+ .await;
+ assert_received_messages(&received, "Timestamp");
+
+ cleanup(client).await;
+}
+
+async fn test_first_strategy(client: &IggyClient) {
+ init_system(client).await;
+ produce_messages(client).await;
+
+ let received = consume_with_strategy(client, "first-consumer",
PollingStrategy::first()).await;
+ assert_received_messages(&received, "First");
+
+ cleanup(client).await;
+}
+
+async fn test_last_strategy(client: &IggyClient) {
+ init_system(client).await;
+ produce_messages(client).await;
+
+ // Last strategy with batch_length=100 returns the last 100 messages
+ let received = consume_with_strategy(client, "last-consumer",
PollingStrategy::last()).await;
+
+ assert_eq!(
+ received.len(),
+ BATCH_LENGTH as usize,
+ "Last strategy: expected {} messages, received {}",
+ BATCH_LENGTH,
+ received.len()
+ );
+
+ // Verify messages are from the last batch (offsets 1900-1999 ->
message_1901 to message_2000)
+ let start_message_num = MESSAGES_COUNT - BATCH_LENGTH + 1;
+ for (i, msg) in received.iter().enumerate() {
+ let expected_payload = format!("message_{}", start_message_num + i as
u32);
+ let actual_payload =
+ String::from_utf8(msg.payload.to_vec()).expect("Payload should be
valid UTF-8");
+ assert_eq!(
+ actual_payload, expected_payload,
+ "Last strategy: message {} payload mismatch",
+ i
+ );
+ }
+
+ cleanup(client).await;
+}
+
+fn assert_received_messages(received: &[IggyMessage], strategy_name: &str) {
+ assert_eq!(
+ received.len(),
+ MESSAGES_COUNT as usize,
+ "{} strategy: expected {} messages, received {}",
+ strategy_name,
+ MESSAGES_COUNT,
+ received.len()
+ );
+
+ for (i, msg) in received.iter().enumerate() {
+ let expected_payload = format!("message_{}", i + 1);
+ let actual_payload =
+ String::from_utf8(msg.payload.to_vec()).expect("Payload should be
valid UTF-8");
+ assert_eq!(
+ actual_payload, expected_payload,
+ "{} strategy: message {} payload mismatch",
+ strategy_name, i
+ );
+ }
+}
+
+async fn init_system(client: &IggyClient) {
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+}
+
+async fn produce_messages(client: &IggyClient) {
+ let mut messages = Vec::with_capacity(MESSAGES_COUNT as usize);
+ for i in 1..=MESSAGES_COUNT {
+ let payload = format!("message_{}", i);
+ let message = IggyMessage::from_str(&payload).unwrap();
+ messages.push(message);
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+}
+
+async fn consume_with_strategy(
+ client: &IggyClient,
+ consumer_name: &str,
+ strategy: PollingStrategy,
+) -> Vec<IggyMessage> {
+ let expected_count = if strategy.kind == PollingKind::Last {
+ BATCH_LENGTH
+ } else {
+ MESSAGES_COUNT
+ };
+
+ let mut consumer = client
+ .consumer(consumer_name, STREAM_NAME, TOPIC_NAME, PARTITION_ID)
+ .unwrap()
+ .auto_commit(AutoCommit::IntervalOrWhen(
+ IggyDuration::from_str("2ms").unwrap(),
+ AutoCommitWhen::ConsumingAllMessages,
+ ))
+ .polling_strategy(strategy)
+ .poll_interval(IggyDuration::from_str("2ms").unwrap())
+ .batch_length(BATCH_LENGTH)
+ .build();
+
+ consumer.init().await.unwrap();
+
+ let mut received_messages = Vec::with_capacity(expected_count as usize);
+ let mut consumed_count = 0u32;
+
+ while consumed_count < expected_count {
+ match timeout(CONSUME_TIMEOUT, consumer.next()).await {
+ Ok(Some(Ok(received))) => {
+ received_messages.push(received.message);
+ consumed_count += 1;
+ }
+ Ok(Some(Err(e))) => panic!("Error consuming message: {}", e),
+ Ok(None) => break,
+ Err(_) => panic!(
+ "Timeout after {:?} waiting for message {}/{} with {:?}
strategy",
+ CONSUME_TIMEOUT, consumed_count, expected_count, strategy.kind
+ ),
+ }
+ }
+
+ received_messages
+}
+
+async fn cleanup(client: &IggyClient) {
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index c8eb5a142..a940c5155 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -23,6 +23,7 @@ pub mod consumer_group_join_scenario;
pub mod consumer_group_offset_cleanup_scenario;
pub mod consumer_group_with_multiple_clients_polling_messages_scenario;
pub mod consumer_group_with_single_client_polling_messages_scenario;
+pub mod consumer_timestamp_polling_scenario;
pub mod create_message_payload;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml
index 81e8ef617..3f65fcaff 100644
--- a/core/sdk/Cargo.toml
+++ b/core/sdk/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "iggy"
-version = "0.8.1-edge.1"
+version = "0.8.1-edge.2"
description = "Iggy is the persistent message streaming platform written in
Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing
millions of messages per second."
edition = "2024"
license = "Apache-2.0"
diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs
index 0760d3f8a..3c2a23666 100644
--- a/core/sdk/src/clients/consumer.rs
+++ b/core/sdk/src/clients/consumer.rs
@@ -41,7 +41,7 @@ use tokio::time::sleep;
use tracing::{error, info, trace, warn};
const ORDERING: std::sync::atomic::Ordering =
std::sync::atomic::Ordering::SeqCst;
-type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages,
IggyError>>>>;
+type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages,
IggyError>> + Send>>;
/// The auto-commit configuration for storing the offset on the server.
#[derive(Debug, PartialEq, Copy, Clone)]
@@ -90,11 +90,18 @@ pub enum AutoCommitAfter {
ConsumingEveryNthMessage(u32),
}
-unsafe impl Send for IggyConsumer {}
+// SAFETY: IggyConsumer is Sync because:
+// 1. The only non-Sync field is `poll_future: Option<PollMessagesFuture>`
+// 2. `poll_future` is only accessed through `poll_next()` which requires
`Pin<&mut Self>`
+// (exclusive mutable access), so concurrent access to `poll_future` is
impossible
+// 3. All other fields are inherently Sync (Arc<AtomicX>, Arc<DashMap>, etc.)
or
+// only accessed through `&mut self` methods
+// 4. All `&self` methods only access Sync-safe fields
unsafe impl Sync for IggyConsumer {}
pub struct IggyConsumer {
initialized: bool,
+ shutdown: Arc<AtomicBool>,
can_poll: Arc<AtomicBool>,
client: IggyRwLock<ClientWrapper>,
consumer_name: String,
@@ -153,6 +160,7 @@ impl IggyConsumer {
let (store_offset_sender, _) = flume::unbounded();
Self {
initialized: false,
+ shutdown: Arc::new(AtomicBool::new(false)),
is_consumer_group: consumer.kind == ConsumerKind::ConsumerGroup,
joined_consumer_group: Arc::new(AtomicBool::new(false)),
can_poll: Arc::new(AtomicBool::new(true)),
@@ -463,9 +471,14 @@ impl IggyConsumer {
let topic_id = self.topic_id.clone();
let last_consumed_offsets = self.last_consumed_offsets.clone();
let last_stored_offsets = self.last_stored_offsets.clone();
+ let shutdown = self.shutdown.clone();
tokio::spawn(async move {
loop {
sleep(interval.get_duration()).await;
+ if shutdown.load(ORDERING) {
+ trace!("Shutdown signal received, stopping background
offset storage");
+ break;
+ }
for entry in last_consumed_offsets.iter() {
let partition_id = *entry.key();
let consumed_offset = entry.load(ORDERING);
@@ -784,14 +797,6 @@ impl IggyConsumer {
return;
}
- if now < last_sent_at {
- warn!(
- "Returned monotonic time went backwards, now < last_sent_at:
({now} < {last_sent_at})"
- );
- sleep(Duration::from_micros(interval)).await;
- return;
- }
-
let elapsed = now - last_sent_at;
if elapsed >= interval {
trace!("No need to wait before polling messages. {now} -
{last_sent_at} = {elapsed}");
@@ -923,7 +928,7 @@ impl Stream for IggyConsumer {
}
if self.buffered_messages.is_empty() {
- if self.polling_strategy.kind == PollingKind::Offset {
+ if self.polling_strategy.kind != PollingKind::Next {
self.polling_strategy =
PollingStrategy::offset(message.header.offset + 1);
}
@@ -990,7 +995,7 @@ impl Stream for IggyConsumer {
let message = polled_messages.messages.remove(0);
self.buffered_messages.extend(polled_messages.messages);
- if self.polling_strategy.kind == PollingKind::Offset {
+ if self.polling_strategy.kind != PollingKind::Next {
self.polling_strategy =
PollingStrategy::offset(message.header.offset
+ 1);
}
@@ -1035,3 +1040,13 @@ impl Stream for IggyConsumer {
Poll::Pending
}
}
+
+impl Drop for IggyConsumer {
+ fn drop(&mut self) {
+ self.shutdown.store(true, ORDERING);
+ trace!(
+ "Consumer {} has been dropped, shutdown signal sent",
+ self.consumer_name
+ );
+ }
+}
diff --git a/core/sdk/src/clients/producer_sharding.rs
b/core/sdk/src/clients/producer_sharding.rs
index 49c2e17a7..8bdca2213 100644
--- a/core/sdk/src/clients/producer_sharding.rs
+++ b/core/sdk/src/clients/producer_sharding.rs
@@ -164,8 +164,8 @@ impl Shard {
_ = tokio::time::sleep_until(deadline) => {
if !buffer.is_empty() {
Self::flush_buffer(&core, &mut buffer, &mut
buffer_bytes, &err_sender).await;
- last_flush = tokio::time::Instant::now();
}
+ last_flush = tokio::time::Instant::now();
}
_ = stop_rx.recv() => {
closed_clone.store(true, Ordering::Release);