This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new d0cb3d44a test(server): add API tests for message retrieval (#2537)
d0cb3d44a is described below

commit d0cb3d44a8c38606ff1ff073e9ba92cb52f765b2
Author: xin <[email protected]>
AuthorDate: Tue Jan 20 20:39:12 2026 +0900

    test(server): add API tests for message retrieval (#2537)
---
 core/integration/tests/server/message_retrieval.rs | 152 +++++++
 core/integration/tests/server/mod.rs               |   1 +
 .../server/scenarios/get_messages_by_offset_api.rs | 424 +++++++++++++++++++
 .../scenarios/get_messages_by_timestamp_api.rs     | 449 +++++++++++++++++++++
 core/integration/tests/server/scenarios/mod.rs     |   2 +
 5 files changed, 1028 insertions(+)

diff --git a/core/integration/tests/server/message_retrieval.rs 
b/core/integration/tests/server/message_retrieval.rs
new file mode 100644
index 000000000..7061d27a0
--- /dev/null
+++ b/core/integration/tests/server/message_retrieval.rs
@@ -0,0 +1,152 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::server::scenarios::{get_messages_by_offset_api, 
get_messages_by_timestamp_api};
+use integration::{
+    tcp_client::TcpClientFactory,
+    test_server::{IpAddrKind, TestServer},
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+use test_case::test_matrix;
+
+fn segment_size_512b() -> &'static str {
+    "512B"
+}
+
+fn segment_size_1kb() -> &'static str {
+    "1KiB"
+}
+
+fn segment_size_10mb() -> &'static str {
+    "10MiB"
+}
+
+fn cache_none() -> &'static str {
+    "none"
+}
+
+fn cache_all() -> &'static str {
+    "all"
+}
+
+fn cache_open_segment() -> &'static str {
+    "open_segment"
+}
+
+fn msgs_req_32() -> u32 {
+    32
+}
+
+fn msgs_req_64() -> u32 {
+    64
+}
+
+fn msgs_req_1024() -> u32 {
+    1024
+}
+
+fn msgs_req_9984() -> u32 {
+    9984
+}
+
+#[test_matrix(
+    [segment_size_512b(), segment_size_1kb(), segment_size_10mb()],
+    [cache_none(), cache_all(), cache_open_segment()],
+    [msgs_req_32(), msgs_req_64(), msgs_req_1024(), msgs_req_9984()]
+)]
+#[tokio::test]
+#[parallel]
+async fn test_get_messages_by_offset_api(
+    segment_size: &str,
+    cache_indexes: &str,
+    messages_required_to_save: u32,
+) {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
+        segment_size.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+        cache_indexes.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+        messages_required_to_save.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    get_messages_by_offset_api::run(&client_factory).await;
+}
+
+#[test_matrix(
+    [segment_size_512b(), segment_size_1kb(), segment_size_10mb()],
+    [cache_none(), cache_all(), cache_open_segment()],
+    [msgs_req_32(), msgs_req_64(), msgs_req_1024(), msgs_req_9984()]
+)]
+#[tokio::test]
+#[parallel]
+async fn test_get_messages_by_timestamp_api(
+    segment_size: &str,
+    cache_indexes: &str,
+    messages_required_to_save: u32,
+) {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
+        segment_size.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+        cache_indexes.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+        messages_required_to_save.to_string(),
+    );
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    get_messages_by_timestamp_api::run(&client_factory).await;
+}
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index a94ed6935..1a9285640 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -19,6 +19,7 @@
 mod cg;
 mod concurrent_addition;
 mod general;
+mod message_retrieval;
 mod scenarios;
 mod specific;
 
diff --git 
a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs 
b/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
new file mode 100644
index 000000000..568fd847d
--- /dev/null
+++ b/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
@@ -0,0 +1,424 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::PARTITION_ID;
+use bytes::BytesMut;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::collections::HashMap;
+use std::str::FromStr;
+
+fn small_batches() -> Vec<u32> {
+    vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+    vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+    vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+    vec![500, 1000, 1500, 2000]
+}
+
+fn all_batch_patterns() -> Vec<(&'static str, Vec<u32>)> {
+    vec![
+        ("small", small_batches()),
+        ("medium", medium_batches()),
+        ("large", large_batches()),
+        ("very_large", very_large_batches()),
+    ]
+}
+
+fn all_message_sizes() -> Vec<u64> {
+    vec![50, 1000, 20000]
+}
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = client_factory.create_client().await;
+    let client = IggyClient::create(client, None, None);
+    login_root(&client).await;
+
+    for msg_size in all_message_sizes() {
+        for (pattern_name, batch_pattern) in all_batch_patterns() {
+            run_offset_test(&client, msg_size, &batch_pattern, 
pattern_name).await;
+        }
+    }
+}
+
+async fn run_offset_test(
+    client: &IggyClient,
+    message_size: u64,
+    batch_lengths: &[u32],
+    pattern_name: &str,
+) {
+    let stream_name = format!("test-stream-{}-{}", message_size, pattern_name);
+    let topic_name = format!("test-topic-{}-{}", message_size, pattern_name);
+
+    init_system(client, &stream_name, &topic_name).await;
+
+    let total_messages_count: u32 = batch_lengths.iter().sum();
+
+    let batch_offsets = send_messages_in_batches(
+        client,
+        &stream_name,
+        &topic_name,
+        message_size,
+        batch_lengths,
+    )
+    .await;
+
+    verify_all_messages_from_start(client, &stream_name, &topic_name, 
total_messages_count).await;
+    verify_messages_from_middle(
+        client,
+        &stream_name,
+        &topic_name,
+        &batch_offsets,
+        batch_lengths,
+        total_messages_count,
+    )
+    .await;
+    verify_no_messages_beyond_end(client, &stream_name, &topic_name, 
&batch_offsets).await;
+    verify_small_subset_from_start(client, &stream_name, &topic_name, 
total_messages_count).await;
+    verify_messages_spanning_batches(client, &stream_name, &topic_name, 
&batch_offsets).await;
+    verify_message_content(
+        client,
+        &stream_name,
+        &topic_name,
+        message_size,
+        &batch_offsets,
+    )
+    .await;
+
+    cleanup(client, &stream_name).await;
+}
+
+async fn init_system(client: &IggyClient, stream_name: &str, topic_name: &str) 
{
+    client.create_stream(stream_name).await.unwrap();
+    client
+        .create_topic(
+            &Identifier::named(stream_name).unwrap(),
+            topic_name,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+}
+
+async fn send_messages_in_batches(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    message_size: u64,
+    batch_lengths: &[u32],
+) -> Vec<u64> {
+    let mut batch_offsets = Vec::with_capacity(batch_lengths.len());
+    let mut current_offset = 0u64;
+    let mut message_id = 1u32;
+
+    for &batch_len in batch_lengths {
+        let mut messages_to_send = Vec::with_capacity(batch_len as usize);
+
+        for _ in 0..batch_len {
+            let msg = create_single_message(message_id, message_size);
+            messages_to_send.push(msg);
+            message_id += 1;
+        }
+
+        client
+            .send_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages_to_send,
+            )
+            .await
+            .unwrap();
+
+        current_offset += batch_len as u64;
+        batch_offsets.push(current_offset - 1);
+    }
+
+    batch_offsets
+}
+
+fn create_single_message(id: u32, message_size: u64) -> IggyMessage {
+    let beginning_of_payload = format!("message {id}");
+    let mut payload = BytesMut::new();
+    payload.extend_from_slice(beginning_of_payload.as_bytes());
+    payload.resize(message_size as usize, 0xD);
+    let payload = payload.freeze();
+
+    let mut headers = HashMap::new();
+    headers.insert(
+        HeaderKey::new("key_1").unwrap(),
+        HeaderValue::from_str("Value 1").unwrap(),
+    );
+    headers.insert(
+        HeaderKey::new("key 2").unwrap(),
+        HeaderValue::from_bool(true).unwrap(),
+    );
+    headers.insert(
+        HeaderKey::new("key-3").unwrap(),
+        HeaderValue::from_uint64(123456).unwrap(),
+    );
+
+    IggyMessage::builder()
+        .id(id as u128)
+        .payload(payload)
+        .user_headers(headers)
+        .build()
+        .expect("Failed to create message")
+}
+
+async fn verify_all_messages_from_start(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    total_messages: u32,
+) {
+    let polled = client
+        .poll_messages(
+            &Identifier::named(stream_name).unwrap(),
+            &Identifier::named(topic_name).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            total_messages,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled.messages.len() as u32,
+        total_messages,
+        "Expected {} messages from start, got {}",
+        total_messages,
+        polled.messages.len()
+    );
+}
+
+async fn verify_messages_from_middle(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    batch_offsets: &[u64],
+    batch_lengths: &[u32],
+    total_messages: u32,
+) {
+    if batch_offsets.len() >= 3 {
+        let middle_offset = batch_offsets[2];
+        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
+        let remaining_messages = total_messages - prior_batches_sum;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                &PollingStrategy::offset(middle_offset + 1),
+                remaining_messages,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len() as u32,
+            remaining_messages,
+            "Expected {} messages from middle, got {}",
+            remaining_messages,
+            polled.messages.len()
+        );
+    }
+}
+
+async fn verify_no_messages_beyond_end(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    batch_offsets: &[u64],
+) {
+    if let Some(&final_offset) = batch_offsets.last() {
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                &PollingStrategy::offset(final_offset + 1),
+                1,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len(),
+            0,
+            "Expected no messages beyond end, got {}",
+            polled.messages.len()
+        );
+    }
+}
+
+async fn verify_small_subset_from_start(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    total_messages: u32,
+) {
+    let subset_size = std::cmp::min(3, total_messages);
+
+    let polled = client
+        .poll_messages(
+            &Identifier::named(stream_name).unwrap(),
+            &Identifier::named(topic_name).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::offset(0),
+            subset_size,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled.messages.len() as u32,
+        subset_size,
+        "Expected {} messages in subset, got {}",
+        subset_size,
+        polled.messages.len()
+    );
+}
+
+async fn verify_messages_spanning_batches(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    batch_offsets: &[u64],
+) {
+    if batch_offsets.len() >= 4 {
+        let span_offset = batch_offsets[1] + 1;
+        let span_size = 8u32;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                &PollingStrategy::offset(span_offset),
+                span_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len() as u32,
+            span_size,
+            "Expected {} messages spanning batches, got {}",
+            span_size,
+            polled.messages.len()
+        );
+    }
+}
+
+async fn verify_message_content(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    message_size: u64,
+    batch_offsets: &[u64],
+) {
+    if batch_offsets.len() >= 4 {
+        let span_offset = batch_offsets[1] + 1;
+        let span_size = 8u32;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                &PollingStrategy::offset(span_offset),
+                span_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        for (i, msg) in polled.messages.iter().enumerate() {
+            let expected_offset = span_offset + i as u64;
+            assert!(
+                msg.header.offset >= expected_offset,
+                "Message offset {} should be >= {}",
+                msg.header.offset,
+                expected_offset
+            );
+
+            let message_id = (msg.header.offset + 1) as u32;
+            let expected_payload_prefix = format!("message {message_id}");
+
+            let payload_str = String::from_utf8_lossy(&msg.payload);
+            assert!(
+                payload_str.starts_with(&expected_payload_prefix),
+                "Payload at offset {} should start with '{}', got '{}'",
+                msg.header.offset,
+                expected_payload_prefix,
+                &payload_str[..std::cmp::min(payload_str.len(), 20)]
+            );
+
+            assert_eq!(
+                msg.payload.len(),
+                message_size as usize,
+                "Payload size mismatch at offset {}",
+                msg.header.offset
+            );
+
+            assert_eq!(
+                msg.header.id, message_id as u128,
+                "Message ID mismatch at offset {}",
+                msg.header.offset
+            );
+
+            if let Some(headers) = &msg.user_headers {
+                let headers_map =
+                    HashMap::<HeaderKey, 
HeaderValue>::from_bytes(headers.clone()).unwrap();
+                assert_eq!(headers_map.len(), 3, "Expected 3 headers");
+            }
+        }
+    }
+}
+
+async fn cleanup(client: &IggyClient, stream_name: &str) {
+    client
+        .delete_stream(&Identifier::named(stream_name).unwrap())
+        .await
+        .unwrap();
+}
diff --git 
a/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs 
b/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
new file mode 100644
index 000000000..542ca17cc
--- /dev/null
+++ b/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
@@ -0,0 +1,449 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::PARTITION_ID;
+use bytes::BytesMut;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::collections::HashMap;
+use std::str::FromStr;
+use tokio::time::{Duration, sleep};
+
+fn small_batches() -> Vec<u32> {
+    vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+    vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+    vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+    vec![500, 1000, 1500, 2000]
+}
+
+fn all_batch_patterns() -> Vec<(&'static str, Vec<u32>)> {
+    vec![
+        ("small", small_batches()),
+        ("medium", medium_batches()),
+        ("large", large_batches()),
+        ("very_large", very_large_batches()),
+    ]
+}
+
+fn all_message_sizes() -> Vec<u64> {
+    vec![50, 1000, 20000]
+}
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = client_factory.create_client().await;
+    let client = IggyClient::create(client, None, None);
+    login_root(&client).await;
+
+    for msg_size in all_message_sizes() {
+        for (pattern_name, batch_pattern) in all_batch_patterns() {
+            run_timestamp_test(&client, msg_size, &batch_pattern, 
pattern_name).await;
+        }
+    }
+}
+
+async fn run_timestamp_test(
+    client: &IggyClient,
+    message_size: u64,
+    batch_lengths: &[u32],
+    pattern_name: &str,
+) {
+    let stream_name = format!("test-stream-ts-{}-{}", message_size, 
pattern_name);
+    let topic_name = format!("test-topic-ts-{}-{}", message_size, 
pattern_name);
+
+    init_system(client, &stream_name, &topic_name).await;
+
+    let total_messages_count: u32 = batch_lengths.iter().sum();
+
+    let (initial_timestamp, batch_timestamps) = 
send_messages_in_batches_with_timestamps(
+        client,
+        &stream_name,
+        &topic_name,
+        message_size,
+        batch_lengths,
+    )
+    .await;
+
+    verify_all_messages_from_start_timestamp(
+        client,
+        &stream_name,
+        &topic_name,
+        initial_timestamp,
+        total_messages_count,
+    )
+    .await;
+    verify_messages_from_middle_timestamp(
+        client,
+        &stream_name,
+        &topic_name,
+        &batch_timestamps,
+        batch_lengths,
+        total_messages_count,
+    )
+    .await;
+    verify_no_messages_with_future_timestamp(client, &stream_name, 
&topic_name).await;
+    verify_small_subset_from_start_timestamp(
+        client,
+        &stream_name,
+        &topic_name,
+        initial_timestamp,
+        total_messages_count,
+    )
+    .await;
+    verify_messages_spanning_batches_by_timestamp(
+        client,
+        &stream_name,
+        &topic_name,
+        &batch_timestamps,
+    )
+    .await;
+    verify_message_content_by_timestamp(
+        client,
+        &stream_name,
+        &topic_name,
+        message_size,
+        &batch_timestamps,
+    )
+    .await;
+
+    cleanup(client, &stream_name).await;
+}
+
+async fn init_system(client: &IggyClient, stream_name: &str, topic_name: &str) 
{
+    client.create_stream(stream_name).await.unwrap();
+    client
+        .create_topic(
+            &Identifier::named(stream_name).unwrap(),
+            topic_name,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+}
+
+async fn send_messages_in_batches_with_timestamps(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    message_size: u64,
+    batch_lengths: &[u32],
+) -> (u64, Vec<u64>) {
+    let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
+    let mut message_id = 1u32;
+
+    let initial_timestamp = IggyTimestamp::now().as_micros();
+
+    for &batch_len in batch_lengths {
+        sleep(Duration::from_millis(2)).await;
+
+        let mut messages_to_send = Vec::with_capacity(batch_len as usize);
+
+        for _ in 0..batch_len {
+            let msg = create_single_message(message_id, message_size);
+            messages_to_send.push(msg);
+            message_id += 1;
+        }
+
+        client
+            .send_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages_to_send,
+            )
+            .await
+            .unwrap();
+
+        batch_timestamps.push(IggyTimestamp::now().as_micros());
+
+        sleep(Duration::from_millis(2)).await;
+    }
+
+    (initial_timestamp, batch_timestamps)
+}
+
+fn create_single_message(id: u32, message_size: u64) -> IggyMessage {
+    let beginning_of_payload = format!("message {id}");
+    let mut payload = BytesMut::new();
+    payload.extend_from_slice(beginning_of_payload.as_bytes());
+    payload.resize(message_size as usize, 0xD);
+    let payload = payload.freeze();
+
+    let mut headers = HashMap::new();
+    headers.insert(
+        HeaderKey::new("key_1").unwrap(),
+        HeaderValue::from_str("Value 1").unwrap(),
+    );
+    headers.insert(
+        HeaderKey::new("key 2").unwrap(),
+        HeaderValue::from_bool(true).unwrap(),
+    );
+    headers.insert(
+        HeaderKey::new("key-3").unwrap(),
+        HeaderValue::from_uint64(123456).unwrap(),
+    );
+
+    IggyMessage::builder()
+        .id(id as u128)
+        .payload(payload)
+        .user_headers(headers)
+        .build()
+        .expect("Failed to create message")
+}
+
+async fn verify_all_messages_from_start_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    initial_timestamp: u64,
+    total_messages: u32,
+) {
+    let polled = client
+        .poll_messages(
+            &Identifier::named(stream_name).unwrap(),
+            &Identifier::named(topic_name).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            
&PollingStrategy::timestamp(IggyTimestamp::from(initial_timestamp)),
+            total_messages,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled.messages.len() as u32,
+        total_messages,
+        "Expected {} messages from initial timestamp, got {}",
+        total_messages,
+        polled.messages.len()
+    );
+}
+
+async fn verify_messages_from_middle_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    batch_timestamps: &[u64],
+    batch_lengths: &[u32],
+    total_messages: u32,
+) {
+    if batch_timestamps.len() >= 3 {
+        let middle_timestamp = batch_timestamps[2] + 1000;
+        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
+        let remaining_messages = total_messages - prior_batches_sum;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                
&PollingStrategy::timestamp(IggyTimestamp::from(middle_timestamp)),
+                remaining_messages,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len() as u32,
+            remaining_messages,
+            "Expected {} messages from middle timestamp, got {}",
+            remaining_messages,
+            polled.messages.len()
+        );
+    }
+}
+
+async fn verify_no_messages_with_future_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+) {
+    let future_timestamp = IggyTimestamp::now().as_micros() + 3_600_000_000;
+
+    let polled = client
+        .poll_messages(
+            &Identifier::named(stream_name).unwrap(),
+            &Identifier::named(topic_name).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            &PollingStrategy::timestamp(IggyTimestamp::from(future_timestamp)),
+            10,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled.messages.len(),
+        0,
+        "Expected no messages with future timestamp, got {}",
+        polled.messages.len()
+    );
+}
+
+async fn verify_small_subset_from_start_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    initial_timestamp: u64,
+    total_messages: u32,
+) {
+    let subset_size = std::cmp::min(3, total_messages);
+
+    let polled = client
+        .poll_messages(
+            &Identifier::named(stream_name).unwrap(),
+            &Identifier::named(topic_name).unwrap(),
+            Some(PARTITION_ID),
+            &Consumer::default(),
+            
&PollingStrategy::timestamp(IggyTimestamp::from(initial_timestamp)),
+            subset_size,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert_eq!(
+        polled.messages.len() as u32,
+        subset_size,
+        "Expected {} messages in subset from initial timestamp, got {}",
+        subset_size,
+        polled.messages.len()
+    );
+}
+
+async fn verify_messages_spanning_batches_by_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    batch_timestamps: &[u64],
+) {
+    if batch_timestamps.len() >= 4 {
+        let span_timestamp = batch_timestamps[1] + 1000;
+        let span_size = 8u32;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                
&PollingStrategy::timestamp(IggyTimestamp::from(span_timestamp)),
+                span_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len() as u32,
+            span_size,
+            "Expected {} messages spanning multiple batches, got {}",
+            span_size,
+            polled.messages.len()
+        );
+    }
+}
+
+async fn verify_message_content_by_timestamp(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    message_size: u64,
+    batch_timestamps: &[u64],
+) {
+    if batch_timestamps.len() >= 4 {
+        let span_timestamp = batch_timestamps[1] + 1000;
+        let span_size = 8u32;
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                
&PollingStrategy::timestamp(IggyTimestamp::from(span_timestamp)),
+                span_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        for msg in polled.messages.iter() {
+            assert!(
+                msg.header.timestamp >= span_timestamp,
+                "Message timestamp {} should be >= span timestamp {}",
+                msg.header.timestamp,
+                span_timestamp
+            );
+
+            let message_id = (msg.header.offset + 1) as u32;
+            let expected_payload_prefix = format!("message {message_id}");
+
+            let payload_str = String::from_utf8_lossy(&msg.payload);
+            assert!(
+                payload_str.starts_with(&expected_payload_prefix),
+                "Payload at offset {} should start with '{}', got '{}'",
+                msg.header.offset,
+                expected_payload_prefix,
+                &payload_str[..std::cmp::min(payload_str.len(), 20)]
+            );
+
+            assert_eq!(
+                msg.payload.len(),
+                message_size as usize,
+                "Payload size mismatch at offset {}",
+                msg.header.offset
+            );
+
+            assert_eq!(
+                msg.header.id, message_id as u128,
+                "Message ID mismatch at offset {}",
+                msg.header.offset
+            );
+
+            if let Some(headers) = &msg.user_headers {
+                let headers_map =
+                    HashMap::<HeaderKey, 
HeaderValue>::from_bytes(headers.clone()).unwrap();
+                assert_eq!(headers_map.len(), 3, "Expected 3 headers");
+            }
+        }
+    }
+}
+
+async fn cleanup(client: &IggyClient, stream_name: &str) {
+    client
+        .delete_stream(&Identifier::named(stream_name).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index a61636931..e61594a91 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -28,6 +28,8 @@ pub mod consumer_timestamp_polling_scenario;
 pub mod create_message_payload;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
+pub mod get_messages_by_offset_api;
+pub mod get_messages_by_timestamp_api;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod permissions_scenario;

Reply via email to