This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new d0cb3d44a test(server): add API tests for message retrieval (#2537)
d0cb3d44a is described below
commit d0cb3d44a8c38606ff1ff073e9ba92cb52f765b2
Author: xin <[email protected]>
AuthorDate: Tue Jan 20 20:39:12 2026 +0900
test(server): add API tests for message retrieval (#2537)
---
core/integration/tests/server/message_retrieval.rs | 152 +++++++
core/integration/tests/server/mod.rs | 1 +
.../server/scenarios/get_messages_by_offset_api.rs | 424 +++++++++++++++++++
.../scenarios/get_messages_by_timestamp_api.rs | 449 +++++++++++++++++++++
core/integration/tests/server/scenarios/mod.rs | 2 +
5 files changed, 1028 insertions(+)
diff --git a/core/integration/tests/server/message_retrieval.rs
b/core/integration/tests/server/message_retrieval.rs
new file mode 100644
index 000000000..7061d27a0
--- /dev/null
+++ b/core/integration/tests/server/message_retrieval.rs
@@ -0,0 +1,152 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{get_messages_by_offset_api,
get_messages_by_timestamp_api};
+use integration::{
+ tcp_client::TcpClientFactory,
+ test_server::{IpAddrKind, TestServer},
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+use test_case::test_matrix;
+
+fn segment_size_512b() -> &'static str {
+ "512B"
+}
+
+fn segment_size_1kb() -> &'static str {
+ "1KiB"
+}
+
+fn segment_size_10mb() -> &'static str {
+ "10MiB"
+}
+
+fn cache_none() -> &'static str {
+ "none"
+}
+
+fn cache_all() -> &'static str {
+ "all"
+}
+
+fn cache_open_segment() -> &'static str {
+ "open_segment"
+}
+
+fn msgs_req_32() -> u32 {
+ 32
+}
+
+fn msgs_req_64() -> u32 {
+ 64
+}
+
+fn msgs_req_1024() -> u32 {
+ 1024
+}
+
+fn msgs_req_9984() -> u32 {
+ 9984
+}
+
+#[test_matrix(
+ [segment_size_512b(), segment_size_1kb(), segment_size_10mb()],
+ [cache_none(), cache_all(), cache_open_segment()],
+ [msgs_req_32(), msgs_req_64(), msgs_req_1024(), msgs_req_9984()]
+)]
+#[tokio::test]
+#[parallel]
+async fn test_get_messages_by_offset_api(
+ segment_size: &str,
+ cache_indexes: &str,
+ messages_required_to_save: u32,
+) {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
+ segment_size.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+ cache_indexes.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+ messages_required_to_save.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ get_messages_by_offset_api::run(&client_factory).await;
+}
+
+#[test_matrix(
+ [segment_size_512b(), segment_size_1kb(), segment_size_10mb()],
+ [cache_none(), cache_all(), cache_open_segment()],
+ [msgs_req_32(), msgs_req_64(), msgs_req_1024(), msgs_req_9984()]
+)]
+#[tokio::test]
+#[parallel]
+async fn test_get_messages_by_timestamp_api(
+ segment_size: &str,
+ cache_indexes: &str,
+ messages_required_to_save: u32,
+) {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
+ segment_size.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+ cache_indexes.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+ messages_required_to_save.to_string(),
+ );
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ get_messages_by_timestamp_api::run(&client_factory).await;
+}
diff --git a/core/integration/tests/server/mod.rs
b/core/integration/tests/server/mod.rs
index a94ed6935..1a9285640 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -19,6 +19,7 @@
mod cg;
mod concurrent_addition;
mod general;
+mod message_retrieval;
mod scenarios;
mod specific;
diff --git
a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
b/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
new file mode 100644
index 000000000..568fd847d
--- /dev/null
+++ b/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
@@ -0,0 +1,424 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::PARTITION_ID;
+use bytes::BytesMut;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::collections::HashMap;
+use std::str::FromStr;
+
+fn small_batches() -> Vec<u32> {
+ vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+ vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+ vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+ vec![500, 1000, 1500, 2000]
+}
+
+fn all_batch_patterns() -> Vec<(&'static str, Vec<u32>)> {
+ vec![
+ ("small", small_batches()),
+ ("medium", medium_batches()),
+ ("large", large_batches()),
+ ("very_large", very_large_batches()),
+ ]
+}
+
+fn all_message_sizes() -> Vec<u64> {
+ vec![50, 1000, 20000]
+}
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
+ login_root(&client).await;
+
+ for msg_size in all_message_sizes() {
+ for (pattern_name, batch_pattern) in all_batch_patterns() {
+ run_offset_test(&client, msg_size, &batch_pattern,
pattern_name).await;
+ }
+ }
+}
+
+async fn run_offset_test(
+ client: &IggyClient,
+ message_size: u64,
+ batch_lengths: &[u32],
+ pattern_name: &str,
+) {
+ let stream_name = format!("test-stream-{}-{}", message_size, pattern_name);
+ let topic_name = format!("test-topic-{}-{}", message_size, pattern_name);
+
+ init_system(client, &stream_name, &topic_name).await;
+
+ let total_messages_count: u32 = batch_lengths.iter().sum();
+
+ let batch_offsets = send_messages_in_batches(
+ client,
+ &stream_name,
+ &topic_name,
+ message_size,
+ batch_lengths,
+ )
+ .await;
+
+ verify_all_messages_from_start(client, &stream_name, &topic_name,
total_messages_count).await;
+ verify_messages_from_middle(
+ client,
+ &stream_name,
+ &topic_name,
+ &batch_offsets,
+ batch_lengths,
+ total_messages_count,
+ )
+ .await;
+ verify_no_messages_beyond_end(client, &stream_name, &topic_name,
&batch_offsets).await;
+ verify_small_subset_from_start(client, &stream_name, &topic_name,
total_messages_count).await;
+ verify_messages_spanning_batches(client, &stream_name, &topic_name,
&batch_offsets).await;
+ verify_message_content(
+ client,
+ &stream_name,
+ &topic_name,
+ message_size,
+ &batch_offsets,
+ )
+ .await;
+
+ cleanup(client, &stream_name).await;
+}
+
+async fn init_system(client: &IggyClient, stream_name: &str, topic_name: &str)
{
+ client.create_stream(stream_name).await.unwrap();
+ client
+ .create_topic(
+ &Identifier::named(stream_name).unwrap(),
+ topic_name,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+}
+
+async fn send_messages_in_batches(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ message_size: u64,
+ batch_lengths: &[u32],
+) -> Vec<u64> {
+ let mut batch_offsets = Vec::with_capacity(batch_lengths.len());
+ let mut current_offset = 0u64;
+ let mut message_id = 1u32;
+
+ for &batch_len in batch_lengths {
+ let mut messages_to_send = Vec::with_capacity(batch_len as usize);
+
+ for _ in 0..batch_len {
+ let msg = create_single_message(message_id, message_size);
+ messages_to_send.push(msg);
+ message_id += 1;
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages_to_send,
+ )
+ .await
+ .unwrap();
+
+ current_offset += batch_len as u64;
+ batch_offsets.push(current_offset - 1);
+ }
+
+ batch_offsets
+}
+
+fn create_single_message(id: u32, message_size: u64) -> IggyMessage {
+ let beginning_of_payload = format!("message {id}");
+ let mut payload = BytesMut::new();
+ payload.extend_from_slice(beginning_of_payload.as_bytes());
+ payload.resize(message_size as usize, 0xD);
+ let payload = payload.freeze();
+
+ let mut headers = HashMap::new();
+ headers.insert(
+ HeaderKey::new("key_1").unwrap(),
+ HeaderValue::from_str("Value 1").unwrap(),
+ );
+ headers.insert(
+ HeaderKey::new("key 2").unwrap(),
+ HeaderValue::from_bool(true).unwrap(),
+ );
+ headers.insert(
+ HeaderKey::new("key-3").unwrap(),
+ HeaderValue::from_uint64(123456).unwrap(),
+ );
+
+ IggyMessage::builder()
+ .id(id as u128)
+ .payload(payload)
+ .user_headers(headers)
+ .build()
+ .expect("Failed to create message")
+}
+
+async fn verify_all_messages_from_start(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ total_messages: u32,
+) {
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ total_messages,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ total_messages,
+ "Expected {} messages from start, got {}",
+ total_messages,
+ polled.messages.len()
+ );
+}
+
+async fn verify_messages_from_middle(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ batch_offsets: &[u64],
+ batch_lengths: &[u32],
+ total_messages: u32,
+) {
+ if batch_offsets.len() >= 3 {
+ let middle_offset = batch_offsets[2];
+ let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
+ let remaining_messages = total_messages - prior_batches_sum;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(middle_offset + 1),
+ remaining_messages,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ remaining_messages,
+ "Expected {} messages from middle, got {}",
+ remaining_messages,
+ polled.messages.len()
+ );
+ }
+}
+
+async fn verify_no_messages_beyond_end(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ batch_offsets: &[u64],
+) {
+ if let Some(&final_offset) = batch_offsets.last() {
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(final_offset + 1),
+ 1,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len(),
+ 0,
+ "Expected no messages beyond end, got {}",
+ polled.messages.len()
+ );
+ }
+}
+
+async fn verify_small_subset_from_start(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ total_messages: u32,
+) {
+ let subset_size = std::cmp::min(3, total_messages);
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(0),
+ subset_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ subset_size,
+ "Expected {} messages in subset, got {}",
+ subset_size,
+ polled.messages.len()
+ );
+}
+
+async fn verify_messages_spanning_batches(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ batch_offsets: &[u64],
+) {
+ if batch_offsets.len() >= 4 {
+ let span_offset = batch_offsets[1] + 1;
+ let span_size = 8u32;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(span_offset),
+ span_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ span_size,
+ "Expected {} messages spanning batches, got {}",
+ span_size,
+ polled.messages.len()
+ );
+ }
+}
+
+async fn verify_message_content(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ message_size: u64,
+ batch_offsets: &[u64],
+) {
+ if batch_offsets.len() >= 4 {
+ let span_offset = batch_offsets[1] + 1;
+ let span_size = 8u32;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::offset(span_offset),
+ span_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ for (i, msg) in polled.messages.iter().enumerate() {
+ let expected_offset = span_offset + i as u64;
+ assert!(
+ msg.header.offset >= expected_offset,
+ "Message offset {} should be >= {}",
+ msg.header.offset,
+ expected_offset
+ );
+
+ let message_id = (msg.header.offset + 1) as u32;
+ let expected_payload_prefix = format!("message {message_id}");
+
+ let payload_str = String::from_utf8_lossy(&msg.payload);
+ assert!(
+ payload_str.starts_with(&expected_payload_prefix),
+ "Payload at offset {} should start with '{}', got '{}'",
+ msg.header.offset,
+ expected_payload_prefix,
+ &payload_str[..std::cmp::min(payload_str.len(), 20)]
+ );
+
+ assert_eq!(
+ msg.payload.len(),
+ message_size as usize,
+ "Payload size mismatch at offset {}",
+ msg.header.offset
+ );
+
+ assert_eq!(
+ msg.header.id, message_id as u128,
+ "Message ID mismatch at offset {}",
+ msg.header.offset
+ );
+
+ if let Some(headers) = &msg.user_headers {
+ let headers_map =
+ HashMap::<HeaderKey,
HeaderValue>::from_bytes(headers.clone()).unwrap();
+ assert_eq!(headers_map.len(), 3, "Expected 3 headers");
+ }
+ }
+ }
+}
+
+async fn cleanup(client: &IggyClient, stream_name: &str) {
+ client
+ .delete_stream(&Identifier::named(stream_name).unwrap())
+ .await
+ .unwrap();
+}
diff --git
a/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
b/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
new file mode 100644
index 000000000..542ca17cc
--- /dev/null
+++ b/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
@@ -0,0 +1,449 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::PARTITION_ID;
+use bytes::BytesMut;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::collections::HashMap;
+use std::str::FromStr;
+use tokio::time::{Duration, sleep};
+
+fn small_batches() -> Vec<u32> {
+ vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+ vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+ vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+ vec![500, 1000, 1500, 2000]
+}
+
+fn all_batch_patterns() -> Vec<(&'static str, Vec<u32>)> {
+ vec![
+ ("small", small_batches()),
+ ("medium", medium_batches()),
+ ("large", large_batches()),
+ ("very_large", very_large_batches()),
+ ]
+}
+
+fn all_message_sizes() -> Vec<u64> {
+ vec![50, 1000, 20000]
+}
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+ let client = client_factory.create_client().await;
+ let client = IggyClient::create(client, None, None);
+ login_root(&client).await;
+
+ for msg_size in all_message_sizes() {
+ for (pattern_name, batch_pattern) in all_batch_patterns() {
+ run_timestamp_test(&client, msg_size, &batch_pattern,
pattern_name).await;
+ }
+ }
+}
+
+async fn run_timestamp_test(
+ client: &IggyClient,
+ message_size: u64,
+ batch_lengths: &[u32],
+ pattern_name: &str,
+) {
+ let stream_name = format!("test-stream-ts-{}-{}", message_size,
pattern_name);
+ let topic_name = format!("test-topic-ts-{}-{}", message_size,
pattern_name);
+
+ init_system(client, &stream_name, &topic_name).await;
+
+ let total_messages_count: u32 = batch_lengths.iter().sum();
+
+ let (initial_timestamp, batch_timestamps) =
send_messages_in_batches_with_timestamps(
+ client,
+ &stream_name,
+ &topic_name,
+ message_size,
+ batch_lengths,
+ )
+ .await;
+
+ verify_all_messages_from_start_timestamp(
+ client,
+ &stream_name,
+ &topic_name,
+ initial_timestamp,
+ total_messages_count,
+ )
+ .await;
+ verify_messages_from_middle_timestamp(
+ client,
+ &stream_name,
+ &topic_name,
+ &batch_timestamps,
+ batch_lengths,
+ total_messages_count,
+ )
+ .await;
+ verify_no_messages_with_future_timestamp(client, &stream_name,
&topic_name).await;
+ verify_small_subset_from_start_timestamp(
+ client,
+ &stream_name,
+ &topic_name,
+ initial_timestamp,
+ total_messages_count,
+ )
+ .await;
+ verify_messages_spanning_batches_by_timestamp(
+ client,
+ &stream_name,
+ &topic_name,
+ &batch_timestamps,
+ )
+ .await;
+ verify_message_content_by_timestamp(
+ client,
+ &stream_name,
+ &topic_name,
+ message_size,
+ &batch_timestamps,
+ )
+ .await;
+
+ cleanup(client, &stream_name).await;
+}
+
+async fn init_system(client: &IggyClient, stream_name: &str, topic_name: &str)
{
+ client.create_stream(stream_name).await.unwrap();
+ client
+ .create_topic(
+ &Identifier::named(stream_name).unwrap(),
+ topic_name,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+}
+
+async fn send_messages_in_batches_with_timestamps(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ message_size: u64,
+ batch_lengths: &[u32],
+) -> (u64, Vec<u64>) {
+ let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
+ let mut message_id = 1u32;
+
+ let initial_timestamp = IggyTimestamp::now().as_micros();
+
+ for &batch_len in batch_lengths {
+ sleep(Duration::from_millis(2)).await;
+
+ let mut messages_to_send = Vec::with_capacity(batch_len as usize);
+
+ for _ in 0..batch_len {
+ let msg = create_single_message(message_id, message_size);
+ messages_to_send.push(msg);
+ message_id += 1;
+ }
+
+ client
+ .send_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut messages_to_send,
+ )
+ .await
+ .unwrap();
+
+ batch_timestamps.push(IggyTimestamp::now().as_micros());
+
+ sleep(Duration::from_millis(2)).await;
+ }
+
+ (initial_timestamp, batch_timestamps)
+}
+
+fn create_single_message(id: u32, message_size: u64) -> IggyMessage {
+ let beginning_of_payload = format!("message {id}");
+ let mut payload = BytesMut::new();
+ payload.extend_from_slice(beginning_of_payload.as_bytes());
+ payload.resize(message_size as usize, 0xD);
+ let payload = payload.freeze();
+
+ let mut headers = HashMap::new();
+ headers.insert(
+ HeaderKey::new("key_1").unwrap(),
+ HeaderValue::from_str("Value 1").unwrap(),
+ );
+ headers.insert(
+ HeaderKey::new("key 2").unwrap(),
+ HeaderValue::from_bool(true).unwrap(),
+ );
+ headers.insert(
+ HeaderKey::new("key-3").unwrap(),
+ HeaderValue::from_uint64(123456).unwrap(),
+ );
+
+ IggyMessage::builder()
+ .id(id as u128)
+ .payload(payload)
+ .user_headers(headers)
+ .build()
+ .expect("Failed to create message")
+}
+
+async fn verify_all_messages_from_start_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ initial_timestamp: u64,
+ total_messages: u32,
+) {
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+
&PollingStrategy::timestamp(IggyTimestamp::from(initial_timestamp)),
+ total_messages,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ total_messages,
+ "Expected {} messages from initial timestamp, got {}",
+ total_messages,
+ polled.messages.len()
+ );
+}
+
+async fn verify_messages_from_middle_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ batch_timestamps: &[u64],
+ batch_lengths: &[u32],
+ total_messages: u32,
+) {
+ if batch_timestamps.len() >= 3 {
+ let middle_timestamp = batch_timestamps[2] + 1000;
+ let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
+ let remaining_messages = total_messages - prior_batches_sum;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+
&PollingStrategy::timestamp(IggyTimestamp::from(middle_timestamp)),
+ remaining_messages,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ remaining_messages,
+ "Expected {} messages from middle timestamp, got {}",
+ remaining_messages,
+ polled.messages.len()
+ );
+ }
+}
+
+async fn verify_no_messages_with_future_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+) {
+ let future_timestamp = IggyTimestamp::now().as_micros() + 3_600_000_000;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+ &PollingStrategy::timestamp(IggyTimestamp::from(future_timestamp)),
+ 10,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len(),
+ 0,
+ "Expected no messages with future timestamp, got {}",
+ polled.messages.len()
+ );
+}
+
+async fn verify_small_subset_from_start_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ initial_timestamp: u64,
+ total_messages: u32,
+) {
+ let subset_size = std::cmp::min(3, total_messages);
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+
&PollingStrategy::timestamp(IggyTimestamp::from(initial_timestamp)),
+ subset_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ subset_size,
+ "Expected {} messages in subset from initial timestamp, got {}",
+ subset_size,
+ polled.messages.len()
+ );
+}
+
+async fn verify_messages_spanning_batches_by_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ batch_timestamps: &[u64],
+) {
+ if batch_timestamps.len() >= 4 {
+ let span_timestamp = batch_timestamps[1] + 1000;
+ let span_size = 8u32;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+
&PollingStrategy::timestamp(IggyTimestamp::from(span_timestamp)),
+ span_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(
+ polled.messages.len() as u32,
+ span_size,
+ "Expected {} messages spanning multiple batches, got {}",
+ span_size,
+ polled.messages.len()
+ );
+ }
+}
+
+async fn verify_message_content_by_timestamp(
+ client: &IggyClient,
+ stream_name: &str,
+ topic_name: &str,
+ message_size: u64,
+ batch_timestamps: &[u64],
+) {
+ if batch_timestamps.len() >= 4 {
+ let span_timestamp = batch_timestamps[1] + 1000;
+ let span_size = 8u32;
+
+ let polled = client
+ .poll_messages(
+ &Identifier::named(stream_name).unwrap(),
+ &Identifier::named(topic_name).unwrap(),
+ Some(PARTITION_ID),
+ &Consumer::default(),
+
&PollingStrategy::timestamp(IggyTimestamp::from(span_timestamp)),
+ span_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ for msg in polled.messages.iter() {
+ assert!(
+ msg.header.timestamp >= span_timestamp,
+ "Message timestamp {} should be >= span timestamp {}",
+ msg.header.timestamp,
+ span_timestamp
+ );
+
+ let message_id = (msg.header.offset + 1) as u32;
+ let expected_payload_prefix = format!("message {message_id}");
+
+ let payload_str = String::from_utf8_lossy(&msg.payload);
+ assert!(
+ payload_str.starts_with(&expected_payload_prefix),
+ "Payload at offset {} should start with '{}', got '{}'",
+ msg.header.offset,
+ expected_payload_prefix,
+ &payload_str[..std::cmp::min(payload_str.len(), 20)]
+ );
+
+ assert_eq!(
+ msg.payload.len(),
+ message_size as usize,
+ "Payload size mismatch at offset {}",
+ msg.header.offset
+ );
+
+ assert_eq!(
+ msg.header.id, message_id as u128,
+ "Message ID mismatch at offset {}",
+ msg.header.offset
+ );
+
+ if let Some(headers) = &msg.user_headers {
+ let headers_map =
+ HashMap::<HeaderKey,
HeaderValue>::from_bytes(headers.clone()).unwrap();
+ assert_eq!(headers_map.len(), 3, "Expected 3 headers");
+ }
+ }
+ }
+}
+
+async fn cleanup(client: &IggyClient, stream_name: &str) {
+ client
+ .delete_stream(&Identifier::named(stream_name).unwrap())
+ .await
+ .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index a61636931..e61594a91 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -28,6 +28,8 @@ pub mod consumer_timestamp_polling_scenario;
pub mod create_message_payload;
pub mod delete_segments_scenario;
pub mod encryption_scenario;
+pub mod get_messages_by_offset_api;
+pub mod get_messages_by_timestamp_api;
pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod permissions_scenario;