This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch get-by-offset-timestamp-improvement
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b0eaa90ff4cd97f549ddf597685d77a4f6dafebd
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jan 20 15:16:44 2026 +0100

    test(server): remove streaming tests superseded by API-level coverage
    
    The streaming-layer tests (get_by_offset, get_by_timestamp)
    are redundant now that get_messages_by_offset_api.rs covers
    the same scenarios through the full client stack.
    
    Besides that, migrated snapshot test to API-level scenario.
---
 core/integration/tests/mod.rs                      |   1 -
 core/integration/tests/server/cg.rs                |  10 +-
 core/integration/tests/server/general.rs           |   3 +-
 core/integration/tests/server/message_retrieval.rs |  10 +-
 core/integration/tests/server/mod.rs               |   7 +-
 core/integration/tests/server/scenarios/mod.rs     |   5 +-
 ...essages_by_offset_api.rs => offset_scenario.rs} |  43 +++
 .../scenarios/snapshot_scenario.rs}                |  44 +--
 ...s_by_timestamp_api.rs => timestamp_scenario.rs} |   0
 core/integration/tests/streaming/common/mod.rs     |  19 -
 .../tests/streaming/common/test_setup.rs           |  52 ---
 core/integration/tests/streaming/get_by_offset.rs  | 396 ---------------------
 .../tests/streaming/get_by_timestamp.rs            | 372 -------------------
 core/integration/tests/streaming/mod.rs            | 132 -------
 14 files changed, 84 insertions(+), 1010 deletions(-)

diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs
index fb2156d8e..b451f4499 100644
--- a/core/integration/tests/mod.rs
+++ b/core/integration/tests/mod.rs
@@ -33,7 +33,6 @@ mod mcp;
 mod sdk;
 mod server;
 mod state;
-mod streaming;
 
 lazy_static! {
     static ref TESTS_FAILED: AtomicBool = AtomicBool::new(false);
diff --git a/core/integration/tests/server/cg.rs 
b/core/integration/tests/server/cg.rs
index 736024c25..bf12c871e 100644
--- a/core/integration/tests/server/cg.rs
+++ b/core/integration/tests/server/cg.rs
@@ -23,10 +23,18 @@ use iggy_common::TransportProtocol;
 use serial_test::parallel;
 use test_case::test_matrix;
 
+fn tcp() -> TransportProtocol {
+    TransportProtocol::Tcp
+}
+
+fn websocket() -> TransportProtocol {
+    TransportProtocol::WebSocket
+}
+
 // TODO: Add `QUIC`.
 // Consumer group scenarios do not support HTTP
 #[test_matrix(
-    [TransportProtocol::Tcp, TransportProtocol::WebSocket],
+    [tcp(), websocket()],
     [
         join_scenario(),
         single_client_scenario(),
diff --git a/core/integration/tests/server/general.rs 
b/core/integration/tests/server/general.rs
index 50277cc3c..ec88d6ffc 100644
--- a/core/integration/tests/server/general.rs
+++ b/core/integration/tests/server/general.rs
@@ -18,7 +18,7 @@
 use crate::server::{
     ScenarioFn, authentication_scenario, bench_scenario, 
consumer_timestamp_polling_scenario,
     create_message_payload_scenario, message_headers_scenario, 
permissions_scenario, run_scenario,
-    stream_size_validation_scenario, system_scenario, user_scenario,
+    snapshot_scenario, stream_size_validation_scenario, system_scenario, 
user_scenario,
 };
 use iggy_common::TransportProtocol;
 use serial_test::parallel;
@@ -36,6 +36,7 @@ use test_case::test_matrix;
         stream_size_validation_scenario(),
         bench_scenario(),
         consumer_timestamp_polling_scenario(),
+        snapshot_scenario(),
     ]
 )]
 #[tokio::test]
diff --git a/core/integration/tests/server/message_retrieval.rs 
b/core/integration/tests/server/message_retrieval.rs
index 7061d27a0..24a4ff0d0 100644
--- a/core/integration/tests/server/message_retrieval.rs
+++ b/core/integration/tests/server/message_retrieval.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::server::scenarios::{get_messages_by_offset_api, 
get_messages_by_timestamp_api};
+use crate::server::scenarios::{offset_scenario, timestamp_scenario};
 use integration::{
     tcp_client::TcpClientFactory,
     test_server::{IpAddrKind, TestServer},
@@ -72,7 +72,7 @@ fn msgs_req_9984() -> u32 {
 )]
 #[tokio::test]
 #[parallel]
-async fn test_get_messages_by_offset_api(
+async fn get_by_offset_scenario(
     segment_size: &str,
     cache_indexes: &str,
     messages_required_to_save: u32,
@@ -105,7 +105,7 @@ async fn test_get_messages_by_offset_api(
         ..Default::default()
     };
 
-    get_messages_by_offset_api::run(&client_factory).await;
+    offset_scenario::run(&client_factory).await;
 }
 
 #[test_matrix(
@@ -115,7 +115,7 @@ async fn test_get_messages_by_offset_api(
 )]
 #[tokio::test]
 #[parallel]
-async fn test_get_messages_by_timestamp_api(
+async fn get_by_timestamp_scenario(
     segment_size: &str,
     cache_indexes: &str,
     messages_required_to_save: u32,
@@ -148,5 +148,5 @@ async fn test_get_messages_by_timestamp_api(
         ..Default::default()
     };
 
-    get_messages_by_timestamp_api::run(&client_factory).await;
+    timestamp_scenario::run(&client_factory).await;
 }
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 1a9285640..65ee5d2c4 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -37,7 +37,8 @@ use scenarios::{
     consumer_group_with_multiple_clients_polling_messages_scenario,
     consumer_group_with_single_client_polling_messages_scenario,
     consumer_timestamp_polling_scenario, create_message_payload, 
message_headers_scenario,
-    permissions_scenario, stream_size_validation_scenario, system_scenario, 
user_scenario,
+    permissions_scenario, snapshot_scenario, stream_size_validation_scenario, 
system_scenario,
+    user_scenario,
 };
 use std::pin::Pin;
 use std::{collections::HashMap, future::Future};
@@ -100,6 +101,10 @@ fn permissions_scenario() -> ScenarioFn {
     |factory| Box::pin(permissions_scenario::run(factory))
 }
 
+fn snapshot_scenario() -> ScenarioFn {
+    |factory| Box::pin(snapshot_scenario::run(factory))
+}
+
 fn consumer_timestamp_polling_scenario() -> ScenarioFn {
     |factory| Box::pin(consumer_timestamp_polling_scenario::run(factory))
 }
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 58cc8a3b2..3253493c4 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -29,18 +29,19 @@ pub mod create_message_payload;
 pub mod cross_protocol_pat_scenario;
 pub mod delete_segments_scenario;
 pub mod encryption_scenario;
-pub mod get_messages_by_offset_api;
-pub mod get_messages_by_timestamp_api;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
+pub mod offset_scenario;
 pub mod permissions_scenario;
 pub mod read_during_persistence_scenario;
 pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
+pub mod snapshot_scenario;
 pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
 pub mod tcp_tls_scenario;
+pub mod timestamp_scenario;
 pub mod user_scenario;
 pub mod websocket_tls_scenario;
 
diff --git 
a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs 
b/core/integration/tests/server/scenarios/offset_scenario.rs
similarity index 90%
rename from 
core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
rename to core/integration/tests/server/scenarios/offset_scenario.rs
index 568fd847d..cc519bd63 100644
--- a/core/integration/tests/server/scenarios/get_messages_by_offset_api.rs
+++ b/core/integration/tests/server/scenarios/offset_scenario.rs
@@ -107,6 +107,7 @@ async fn run_offset_test(
         &batch_offsets,
     )
     .await;
+    verify_sequential_chunk_reads(client, &stream_name, &topic_name, 
total_messages_count).await;
 
     cleanup(client, &stream_name).await;
 }
@@ -416,6 +417,48 @@ async fn verify_message_content(
     }
 }
 
+async fn verify_sequential_chunk_reads(
+    client: &IggyClient,
+    stream_name: &str,
+    topic_name: &str,
+    total_messages: u32,
+) {
+    let chunk_size = 500u32;
+    let mut verified_count = 0u32;
+
+    for chunk_start in (0..total_messages).step_by(chunk_size as usize) {
+        let read_size = std::cmp::min(chunk_size, total_messages - 
chunk_start);
+
+        let polled = client
+            .poll_messages(
+                &Identifier::named(stream_name).unwrap(),
+                &Identifier::named(topic_name).unwrap(),
+                Some(PARTITION_ID),
+                &Consumer::default(),
+                &PollingStrategy::offset(chunk_start as u64),
+                read_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(
+            polled.messages.len() as u32,
+            read_size,
+            "Failed to read chunk at offset {} with size {}",
+            chunk_start,
+            read_size
+        );
+
+        verified_count += polled.messages.len() as u32;
+    }
+
+    assert_eq!(
+        verified_count, total_messages,
+        "Sequential chunk reads didn't cover all messages"
+    );
+}
+
 async fn cleanup(client: &IggyClient, stream_name: &str) {
     client
         .delete_stream(&Identifier::named(stream_name).unwrap())
diff --git a/core/integration/tests/streaming/snapshot.rs 
b/core/integration/tests/server/scenarios/snapshot_scenario.rs
similarity index 56%
rename from core/integration/tests/streaming/snapshot.rs
rename to core/integration/tests/server/scenarios/snapshot_scenario.rs
index 76c9fa971..0780ca6c3 100644
--- a/core/integration/tests/streaming/snapshot.rs
+++ b/core/integration/tests/server/scenarios/snapshot_scenario.rs
@@ -16,40 +16,28 @@
  * under the License.
  */
 
-//TODO: Fix me use shard instead of system
-/*
-use crate::streaming::common::test_setup::TestSetup;
-use iggy::prelude::{SnapshotCompression, SystemSnapshotType};
-use server::configs::cluster::ClusterConfig;
-use server::configs::server::{DataMaintenanceConfig, 
PersonalAccessTokenConfig};
-use server::streaming::session::Session;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, assert_clean_system};
 use std::io::{Cursor, Read};
-use std::net::{Ipv4Addr, SocketAddr};
 use zip::ZipArchive;
 
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let client = client_factory.create_client().await;
+    let client = IggyClient::create(client, None, None);
 
-#[tokio::test]
-async fn should_create_snapshot_file() {
-    let setup = TestSetup::init().await;
-    let mut system = System::new(
-        setup.config.clone(),
-        ClusterConfig::default(),
-        DataMaintenanceConfig::default(),
-        PersonalAccessTokenConfig::default(),
-    );
-
-    system.init().await.unwrap();
-
-    let session = Session::new(1, 1, 
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 1234));
+    client
+        .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
+        .await
+        .unwrap();
 
-    let snapshot = system
-        .get_snapshot(
-            &session,
+    let snapshot = client
+        .snapshot(
             SnapshotCompression::Deflated,
-            &vec![SystemSnapshotType::Test],
+            vec![SystemSnapshotType::Test],
         )
         .await
         .unwrap();
+
     assert!(!snapshot.0.is_empty());
 
     let cursor = Cursor::new(snapshot.0);
@@ -57,7 +45,7 @@ async fn should_create_snapshot_file() {
     let mut test_file = zip.by_name("test.txt").unwrap();
     let mut test_content = String::new();
     test_file.read_to_string(&mut test_content).unwrap();
-    assert_eq!(test_content, "test\n");
-}
+    assert_eq!(test_content.trim(), "test");
 
-*/
+    assert_clean_system(&client).await;
+}
diff --git 
a/core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs 
b/core/integration/tests/server/scenarios/timestamp_scenario.rs
similarity index 100%
rename from 
core/integration/tests/server/scenarios/get_messages_by_timestamp_api.rs
rename to core/integration/tests/server/scenarios/timestamp_scenario.rs
diff --git a/core/integration/tests/streaming/common/mod.rs 
b/core/integration/tests/streaming/common/mod.rs
deleted file mode 100644
index 6bab3f356..000000000
--- a/core/integration/tests/streaming/common/mod.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-pub mod test_setup;
diff --git a/core/integration/tests/streaming/common/test_setup.rs 
b/core/integration/tests/streaming/common/test_setup.rs
deleted file mode 100644
index f85aa73ea..000000000
--- a/core/integration/tests/streaming/common/test_setup.rs
+++ /dev/null
@@ -1,52 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use compio::fs;
-use iggy_common::MemoryPool;
-use server::bootstrap::create_directories;
-use server::configs::system::SystemConfig;
-use std::sync::Arc;
-use uuid::Uuid;
-
-pub struct TestSetup {
-    pub config: Arc<SystemConfig>,
-}
-
-impl TestSetup {
-    pub async fn init() -> TestSetup {
-        Self::init_with_config(SystemConfig::default()).await
-    }
-
-    pub async fn init_with_config(mut config: SystemConfig) -> TestSetup {
-        config.path = format!("local_data_{}", Uuid::now_v7().to_u128_le());
-        config.partition.enforce_fsync = true;
-        config.state.enforce_fsync = true;
-
-        let config = Arc::new(config);
-        fs::create_dir(config.get_system_path()).await.unwrap();
-        create_directories(&config).await.unwrap();
-        MemoryPool::init_pool(&config.memory_pool.into_other());
-        TestSetup { config }
-    }
-}
-
-impl Drop for TestSetup {
-    fn drop(&mut self) {
-        std::fs::remove_dir_all(self.config.get_system_path()).unwrap();
-    }
-}
diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
deleted file mode 100644
index ca5438529..000000000
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ /dev/null
@@ -1,396 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses         let consumer = 
PollingConsumer::consumer(&Identifier::numeric(2).unwrap(), partition_id as 
usize);
-       let args = PollingArgs::new(PollingStrategy::offset(middle_offset + 1), 
remaining_messages, false);
-       let (_, middle_messages) = streams
-           .poll_messages(&namespace, consumer, args)s file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-use super::bootstrap_test_environment;
-use crate::streaming::common::test_setup::TestSetup;
-use bytes::BytesMut;
-use iggy::prelude::*;
-use server::configs::cache_indexes::CacheIndexesConfig;
-use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
-use server::shard::namespace::IggyFullNamespace;
-use server::shard::system::messages::PollingArgs;
-use server::streaming::polling_consumer::PollingConsumer;
-use server::streaming::segments::IggyMessagesBatchMut;
-use server::streaming::traits::MainOps;
-use std::collections::HashMap;
-use std::str::FromStr;
-use test_case::test_matrix;
-
-/*
- * Below helper functions are here only to make test function name more 
readable.
- */
-
-fn msg_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn segment_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn msgs_req_to_save(count: u32) -> u32 {
-    count
-}
-
-fn index_cache_all() -> CacheIndexesConfig {
-    CacheIndexesConfig::All
-}
-
-fn index_cache_none() -> CacheIndexesConfig {
-    CacheIndexesConfig::None
-}
-
-fn index_cache_open_segment() -> CacheIndexesConfig {
-    CacheIndexesConfig::OpenSegment
-}
-
-fn small_batches() -> Vec<u32> {
-    vec![3, 4, 5, 6, 7]
-}
-
-fn medium_batches() -> Vec<u32> {
-    vec![10, 20, 30, 40]
-}
-
-fn large_batches() -> Vec<u32> {
-    vec![100, 200, 300, 400]
-}
-
-fn very_large_batches() -> Vec<u32> {
-    vec![500, 1000, 1500, 1000]
-}
-
-#[test_matrix(
-    [msg_size(50), msg_size(1000), msg_size(20000)],
-    [small_batches(), medium_batches(), large_batches(), very_large_batches()],
-    [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000), 
msgs_req_to_save(10000)],
-    [segment_size(10), segment_size(200), segment_size(10000000)],
-    [index_cache_none(), index_cache_all(), index_cache_open_segment()])]
-#[compio::test]
-async fn test_get_messages_by_offset(
-    message_size: IggyByteSize,
-    batch_lengths: Vec<u32>,
-    messages_required_to_save: u32,
-    segment_size: IggyByteSize,
-    cache_indexes: CacheIndexesConfig,
-) {
-    println!(
-        "Running test with message_size: {message_size}, batches: 
{batch_lengths:?}, messages_required_to_save: {messages_required_to_save}, 
segment_size: {segment_size}, cache_indexes: {cache_indexes}"
-    );
-
-    let shard_id = 0;
-    let setup = TestSetup::init().await;
-
-    let total_messages_count = batch_lengths.iter().sum();
-
-    let config = SystemConfig {
-        path: setup.config.path.to_string(),
-        partition: PartitionConfig {
-            messages_required_to_save,
-            enforce_fsync: true,
-            ..Default::default()
-        },
-        segment: SegmentConfig {
-            cache_indexes,
-            size: segment_size,
-            ..Default::default()
-        },
-        ..Default::default()
-    };
-
-    // Use the bootstrap method to create streams with proper slab structure
-    let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
-        .await
-        .unwrap();
-    let streams = bootstrap_result.streams;
-    let stream_identifier = bootstrap_result.stream_id;
-    let topic_identifier = bootstrap_result.topic_id;
-    let partition_id = bootstrap_result.partition_id;
-    let task_registry = bootstrap_result.task_registry;
-
-    // Create namespace for MainOps calls
-    let namespace = IggyFullNamespace::new(
-        stream_identifier.clone(),
-        topic_identifier.clone(),
-        partition_id,
-    );
-
-    let mut all_messages = Vec::with_capacity(total_messages_count as usize);
-
-    // Generate all messages as defined in the test matrix
-    for i in 1..=total_messages_count {
-        let id = i as u128;
-        let beginning_of_payload = format!("message {i}");
-        let mut payload = BytesMut::new();
-        payload.extend_from_slice(beginning_of_payload.as_bytes());
-        payload.resize(message_size.as_bytes_usize(), 0xD);
-        let payload = payload.freeze();
-
-        let mut headers = HashMap::new();
-        headers.insert(
-            HeaderKey::new("key_1").unwrap(),
-            HeaderValue::from_str("Value 1").unwrap(),
-        );
-        headers.insert(
-            HeaderKey::new("key 2").unwrap(),
-            HeaderValue::from_bool(true).unwrap(),
-        );
-        headers.insert(
-            HeaderKey::new("key-3").unwrap(),
-            HeaderValue::from_uint64(123456).unwrap(),
-        );
-
-        let message = IggyMessage::builder()
-            .id(id)
-            .payload(payload)
-            .user_headers(headers)
-            .build()
-            .expect("Failed to create message with valid payload and headers");
-        all_messages.push(message);
-    }
-
-    // Keep track of offsets after each batch
-    let mut batch_offsets = Vec::with_capacity(batch_lengths.len());
-    let mut current_pos = 0;
-
-    // Append all batches as defined in the test matrix
-    for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
-        // If we've generated too many messages, skip the rest
-        if current_pos + batch_len as usize > all_messages.len() {
-            break;
-        }
-
-        println!(
-            "Appending batch {}/{} with {} messages",
-            batch_idx + 1,
-            batch_lengths.len(),
-            batch_len
-        );
-
-        let batch_end_pos = current_pos + batch_len as usize;
-        let messages_slice_to_append = 
&all_messages[current_pos..batch_end_pos];
-
-        let messages_size = messages_slice_to_append
-            .iter()
-            .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
-            .sum();
-
-        let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
-        assert_eq!(batch.count(), batch_len);
-        streams
-            .append_messages(&config, &task_registry, &namespace, batch)
-            .await
-            .unwrap();
-
-        // Get current offset after appending
-        let current_offset = streams.with_partition_by_id(
-            &stream_identifier,
-            &topic_identifier,
-            partition_id,
-            |(_, _, _, offset, ..)| 
offset.load(std::sync::atomic::Ordering::Relaxed),
-        );
-        batch_offsets.push(current_offset);
-        current_pos += batch_len as usize;
-    }
-
-    // Use the exact total messages count from the test matrix
-    let total_sent_messages = total_messages_count;
-
-    // Create a single consumer to reuse throughout the test
-    let consumer =
-        PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
-
-    // Test 1: All messages from start
-    let args = PollingArgs::new(PollingStrategy::offset(0), 
total_sent_messages, false);
-    let (_, all_loaded_messages) = streams
-        .poll_messages(&namespace, consumer, args)
-        .await
-        .unwrap();
-    assert_eq!(
-        all_loaded_messages.count(),
-        total_sent_messages,
-        "Expected {} messages from start, but got {}",
-        total_sent_messages,
-        all_loaded_messages.count()
-    );
-
-    // Test 2: Get messages from middle (after 3rd batch)
-    if batch_offsets.len() >= 3 {
-        let middle_offset = batch_offsets[2];
-        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
-        let remaining_messages = total_sent_messages - prior_batches_sum;
-
-        let args = PollingArgs::new(
-            PollingStrategy::offset(middle_offset + 1),
-            remaining_messages,
-            false,
-        );
-        let (_, middle_messages) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-
-        assert_eq!(
-            middle_messages.count(),
-            remaining_messages,
-            "Expected {} messages from middle offset, but got {}",
-            remaining_messages,
-            middle_messages.count()
-        );
-    }
-
-    // Test 3: No messages beyond final offset
-    if !batch_offsets.is_empty() {
-        let final_offset = *batch_offsets.last().unwrap();
-        let args = PollingArgs::new(PollingStrategy::offset(final_offset + 1), 
1, false);
-        let (_, no_messages) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-        assert_eq!(
-            no_messages.count(),
-            0,
-            "Expected no messages beyond final offset, but got {}",
-            no_messages.count()
-        );
-    }
-
-    // Test 4: Small subset from start
-    let subset_size = std::cmp::min(3, total_sent_messages);
-    let args = PollingArgs::new(PollingStrategy::offset(0), subset_size, 
false);
-    let (_, subset_messages) = streams
-        .poll_messages(&namespace, consumer, args)
-        .await
-        .unwrap();
-    assert_eq!(
-        subset_messages.count(),
-        subset_size,
-        "Expected {} messages in subset from start, but got {}",
-        subset_size,
-        subset_messages.count()
-    );
-
-    // Test 5: Messages spanning multiple batches
-    if batch_offsets.len() >= 4 {
-        let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd 
batch
-        let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
-        let args = PollingArgs::new(PollingStrategy::offset(span_offset), 
span_size, false);
-        let (_, batches) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-        assert_eq!(
-            batches.count(),
-            span_size,
-            "Expected {} messages spanning multiple batches, but got {}",
-            span_size,
-            batches.count()
-        );
-
-        // Test 6: Validate message content and ordering for all messages
-        let mut i = 0;
-
-        for batch in batches.iter() {
-            for msg in batch.iter() {
-                let expected_offset = span_offset + i as u64;
-                assert!(
-                    msg.header().offset() >= expected_offset,
-                    "Message offset {} at position {} should be >= expected 
offset {}",
-                    msg.header().offset(),
-                    i,
-                    expected_offset
-                );
-
-                let original_offset = msg.header().offset() as usize;
-                if original_offset < all_messages.len() {
-                    let original_message = &all_messages[original_offset];
-
-                    let loaded_id = msg.header().id();
-                    let original_id = original_message.header.id;
-                    assert_eq!(
-                        loaded_id,
-                        original_id,
-                        "Message ID mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-
-                    let loaded_payload = msg.payload();
-                    let original_payload = &original_message.payload;
-                    assert_eq!(
-                        loaded_payload,
-                        original_payload,
-                        "Payload mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-
-                    let loaded_headers = 
msg.user_headers_map().unwrap().unwrap();
-                    let original_headers =
-                        
HashMap::from_bytes(original_message.user_headers.clone().unwrap())
-                            .unwrap();
-                    assert_eq!(
-                        loaded_headers,
-                        original_headers,
-                        "Headers mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-                }
-                i += 1;
-            }
-        }
-    }
-
-    // Add sequential read test for all batch sizes
-    println!("Verifying sequential reads, expecting {total_sent_messages} 
messages");
-
-    let chunk_size = 500;
-    let mut verified_count = 0;
-
-    for chunk_start in (0..total_sent_messages).step_by(chunk_size as usize) {
-        let read_size = std::cmp::min(chunk_size, total_sent_messages - 
chunk_start);
-
-        let args = PollingArgs::new(
-            PollingStrategy::offset(chunk_start as u64),
-            read_size,
-            false,
-        );
-        let (_, chunk) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-
-        assert_eq!(
-            chunk.count(),
-            read_size,
-            "Failed to read chunk at offset {chunk_start} with size 
{read_size}"
-        );
-
-        verified_count += chunk.count();
-
-        println!(
-            "Read chunk at offset {chunk_start} with size {read_size}, 
verified count: {verified_count}"
-        );
-    }
-
-    assert_eq!(
-        verified_count, total_sent_messages,
-        "Sequential chunk reads didn't cover all messages"
-    );
-}
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
deleted file mode 100644
index 90afc5967..000000000
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ /dev/null
@@ -1,372 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use super::bootstrap_test_environment;
-use crate::streaming::common::test_setup::TestSetup;
-use bytes::BytesMut;
-use iggy::prelude::*;
-use server::configs::cache_indexes::CacheIndexesConfig;
-use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
-use server::shard::namespace::IggyFullNamespace;
-use server::shard::system::messages::PollingArgs;
-use server::streaming::polling_consumer::PollingConsumer;
-use server::streaming::segments::IggyMessagesBatchMut;
-use server::streaming::traits::MainOps;
-use std::collections::HashMap;
-use std::str::FromStr;
-use std::thread::sleep;
-use test_case::test_matrix;
-
-/*
- * Below helper functions are here only to make test function name more 
readable.
- */
-
-fn msg_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn segment_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{size}B")).unwrap()
-}
-
-fn msgs_req_to_save(count: u32) -> u32 {
-    count
-}
-
-fn index_cache_all() -> CacheIndexesConfig {
-    CacheIndexesConfig::All
-}
-
-fn index_cache_none() -> CacheIndexesConfig {
-    CacheIndexesConfig::None
-}
-
-fn index_cache_open_segment() -> CacheIndexesConfig {
-    CacheIndexesConfig::OpenSegment
-}
-
-fn small_batches() -> Vec<u32> {
-    vec![3, 4, 5, 6, 7]
-}
-
-fn medium_batches() -> Vec<u32> {
-    vec![10, 20, 30, 40]
-}
-
-fn large_batches() -> Vec<u32> {
-    vec![100, 200, 300, 400]
-}
-
-fn very_large_batches() -> Vec<u32> {
-    vec![500, 1000, 1500, 1000]
-}
-
-#[test_matrix(
-    [msg_size(50), msg_size(1000), msg_size(20000)],
-    [small_batches(), medium_batches(), large_batches(), very_large_batches()],
-    [msgs_req_to_save(1), msgs_req_to_save(24), msgs_req_to_save(1000), 
msgs_req_to_save(10000)],
-    [segment_size(10), segment_size(200), segment_size(10000000)],
-    [index_cache_none(), index_cache_all(), index_cache_open_segment()])]
-#[compio::test]
-async fn test_get_messages_by_timestamp(
-    message_size: IggyByteSize,
-    batch_lengths: Vec<u32>,
-    messages_required_to_save: u32,
-    segment_size: IggyByteSize,
-    cache_indexes: CacheIndexesConfig,
-) {
-    println!(
-        "Running test with message_size: {message_size}, batches: 
{batch_lengths:?}, messages_required_to_save: {messages_required_to_save}, 
segment_size: {segment_size}, cache_indexes: {cache_indexes}"
-    );
-
-    let shard_id = 0;
-    let setup = TestSetup::init().await;
-
-    let total_messages_count = batch_lengths.iter().sum();
-
-    let config = SystemConfig {
-        path: setup.config.path.to_string(),
-        partition: PartitionConfig {
-            messages_required_to_save,
-            enforce_fsync: true,
-            ..Default::default()
-        },
-        segment: SegmentConfig {
-            cache_indexes,
-            size: segment_size,
-            ..Default::default()
-        },
-        ..Default::default()
-    };
-
-    // Use the bootstrap method to create streams with proper slab structure
-    let bootstrap_result = bootstrap_test_environment(shard_id as u16, &config)
-        .await
-        .unwrap();
-    let streams = bootstrap_result.streams;
-    let stream_identifier = bootstrap_result.stream_id;
-    let topic_identifier = bootstrap_result.topic_id;
-    let partition_id = bootstrap_result.partition_id;
-    let task_registry = bootstrap_result.task_registry;
-
-    // Create namespace for MainOps calls
-    let namespace = IggyFullNamespace::new(
-        stream_identifier.clone(),
-        topic_identifier.clone(),
-        partition_id,
-    );
-
-    let mut all_messages = Vec::with_capacity(total_messages_count as usize);
-
-    // Generate all messages as defined in the test matrix
-    for i in 1..=total_messages_count {
-        let id = i as u128;
-        let beginning_of_payload = format!("message {i}");
-        let mut payload = BytesMut::new();
-        payload.extend_from_slice(beginning_of_payload.as_bytes());
-        payload.resize(message_size.as_bytes_usize(), 0xD);
-        let payload = payload.freeze();
-
-        let mut headers = HashMap::new();
-        headers.insert(
-            HeaderKey::new("key_1").unwrap(),
-            HeaderValue::from_str("Value 1").unwrap(),
-        );
-        headers.insert(
-            HeaderKey::new("key 2").unwrap(),
-            HeaderValue::from_bool(true).unwrap(),
-        );
-        headers.insert(
-            HeaderKey::new("key-3").unwrap(),
-            HeaderValue::from_uint64(123456).unwrap(),
-        );
-
-        let message = IggyMessage::builder()
-            .id(id)
-            .payload(payload)
-            .user_headers(headers)
-            .build()
-            .expect("Failed to create message with valid payload and headers");
-
-        all_messages.push(message);
-    }
-
-    // Timestamp tracking for messages
-    let initial_timestamp = IggyTimestamp::now();
-    let mut batch_timestamps = Vec::with_capacity(batch_lengths.len());
-    let mut current_pos = 0;
-
-    // Append all batches as defined in the test matrix with separate 
timestamps
-    for (batch_idx, &batch_len) in batch_lengths.iter().enumerate() {
-        // Add a small delay between batches to ensure distinct timestamps
-        sleep(std::time::Duration::from_millis(2));
-
-        // If we've generated too many messages, skip the rest
-        if current_pos + batch_len as usize > all_messages.len() {
-            break;
-        }
-
-        println!(
-            "Appending batch {}/{} with {} messages",
-            batch_idx + 1,
-            batch_lengths.len(),
-            batch_len
-        );
-
-        let batch_end_pos = current_pos + batch_len as usize;
-        let messages_slice_to_append = 
&all_messages[current_pos..batch_end_pos];
-
-        let messages_size = messages_slice_to_append
-            .iter()
-            .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
-            .sum();
-
-        let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
-        assert_eq!(batch.count(), batch_len);
-        streams
-            .append_messages(&config, &task_registry, &namespace, batch)
-            .await
-            .unwrap();
-
-        // Capture the timestamp of this batch
-        batch_timestamps.push(IggyTimestamp::now());
-        current_pos += batch_len as usize;
-
-        // Add a small delay between batches to ensure distinct timestamps
-        sleep(std::time::Duration::from_millis(2));
-    }
-
-    let final_timestamp = IggyTimestamp::now();
-
-    // Use the exact total messages count from the test matrix
-    let total_sent_messages = total_messages_count;
-
-    // Create a single consumer to reuse throughout the test
-    let consumer =
-        PollingConsumer::consumer(&Identifier::numeric(1).unwrap(), 
partition_id as usize);
-
-    // Test 1: All messages from initial timestamp
-    let args = PollingArgs::new(
-        PollingStrategy::timestamp(initial_timestamp),
-        total_sent_messages,
-        false,
-    );
-    let (_, all_loaded_messages) = streams
-        .poll_messages(&namespace, consumer, args)
-        .await
-        .unwrap();
-    assert_eq!(
-        all_loaded_messages.count(),
-        total_sent_messages,
-        "Expected {} messages from initial timestamp, but got {}",
-        total_sent_messages,
-        all_loaded_messages.count()
-    );
-
-    // Test 2: Get messages from middle timestamp (after 3rd batch)
-    if batch_timestamps.len() >= 3 {
-        // Use a timestamp that's just before the 3rd batch's timestamp to 
ensure we get messages
-        // from that batch onwards
-        let middle_timestamp = 
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
-
-        // Calculate how many messages should be in batches after the 3rd
-        let prior_batches_sum: u32 = batch_lengths[..3].iter().sum();
-        let remaining_messages = total_sent_messages - prior_batches_sum;
-
-        let args = PollingArgs::new(
-            PollingStrategy::timestamp(middle_timestamp),
-            remaining_messages,
-            false,
-        );
-        let (_, middle_messages) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-
-        assert_eq!(
-            middle_messages.count(),
-            remaining_messages,
-            "Expected {} messages from middle timestamp, but got {}",
-            remaining_messages,
-            middle_messages.count()
-        );
-    }
-
-    // Test 3: No messages after final timestamp
-    let args = PollingArgs::new(PollingStrategy::timestamp(final_timestamp), 
1, false);
-    let (_, no_messages) = streams
-        .poll_messages(&namespace, consumer, args)
-        .await
-        .unwrap();
-    assert_eq!(
-        no_messages.count(),
-        0,
-        "Expected no messages after final timestamp, but got {}",
-        no_messages.count()
-    );
-
-    // Test 4: Small subset from initial timestamp
-    let subset_size = std::cmp::min(3, total_sent_messages);
-    let args = PollingArgs::new(
-        PollingStrategy::timestamp(initial_timestamp),
-        subset_size,
-        false,
-    );
-    let (_, subset_messages) = streams
-        .poll_messages(&namespace, consumer, args)
-        .await
-        .unwrap();
-    assert_eq!(
-        subset_messages.count(),
-        subset_size,
-        "Expected {} messages in subset from initial timestamp, but got {}",
-        subset_size,
-        subset_messages.count()
-    );
-
-    // Test 5: Messages spanning multiple batches by timestamp
-    if batch_timestamps.len() >= 4 {
-        // Use a timestamp that's just before the 2nd batch's timestamp
-        let span_timestamp = 
IggyTimestamp::from(batch_timestamps[1].as_micros() + 1000);
-        let span_size = 8; // Should span across multiple batches
-
-        let args = 
PollingArgs::new(PollingStrategy::timestamp(span_timestamp), span_size, false);
-        let (_, spanning_messages) = streams
-            .poll_messages(&namespace, consumer, args)
-            .await
-            .unwrap();
-
-        assert_eq!(
-            spanning_messages.count(),
-            span_size,
-            "Expected {} messages spanning multiple batches, but got {}",
-            span_size,
-            spanning_messages.count()
-        );
-
-        // Verify that all messages have timestamps >= our reference timestamp
-        let span_timestamp_micros = span_timestamp.as_micros();
-
-        // Test 6: Validate message content and ordering
-        for batch in spanning_messages.iter() {
-            for msg in batch.iter() {
-                let msg_timestamp = msg.header().timestamp();
-                assert!(
-                    msg_timestamp >= span_timestamp_micros,
-                    "Message timestamp {msg_timestamp} should be >= span 
timestamp {span_timestamp_micros}"
-                );
-
-                // Verify message content
-                let loaded_id = msg.header().id();
-                let original_offset = msg.header().offset() as usize;
-
-                if original_offset < all_messages.len() {
-                    let original_message = &all_messages[original_offset];
-                    let original_id = original_message.header.id;
-
-                    assert_eq!(
-                        loaded_id,
-                        original_id,
-                        "Message ID mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-
-                    let loaded_payload = msg.payload();
-                    let original_payload = &original_message.payload;
-                    assert_eq!(
-                        loaded_payload,
-                        original_payload,
-                        "Payload mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-
-                    let loaded_headers = 
msg.user_headers_map().unwrap().unwrap();
-                    let original_headers =
-                        
HashMap::from_bytes(original_message.user_headers.clone().unwrap())
-                            .unwrap();
-                    assert_eq!(
-                        loaded_headers,
-                        original_headers,
-                        "Headers mismatch at offset {}",
-                        msg.header().offset(),
-                    );
-                }
-            }
-        }
-    }
-}
diff --git a/core/integration/tests/streaming/mod.rs 
b/core/integration/tests/streaming/mod.rs
deleted file mode 100644
index 6ad773647..000000000
--- a/core/integration/tests/streaming/mod.rs
+++ /dev/null
@@ -1,132 +0,0 @@
-/* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
-use server::{
-    configs::system::SystemConfig,
-    shard::{task_registry::TaskRegistry, 
transmission::connector::ShardConnector},
-    slab::{streams::Streams, traits_ext::EntityMarker},
-    streaming::{
-        self,
-        partitions::{partition, storage::create_partition_file_hierarchy},
-        segments::{Segment, storage::create_segment_storage},
-        streams::{storage::create_stream_file_hierarchy, stream},
-        topics::{storage::create_topic_file_hierarchy, topic},
-    },
-};
-use std::rc::Rc;
-
-mod common;
-mod get_by_offset;
-mod get_by_timestamp;
-mod snapshot;
-
-struct BootstrapResult {
-    streams: Streams,
-    stream_id: Identifier,
-    topic_id: Identifier,
-    partition_id: usize,
-    task_registry: Rc<TaskRegistry>,
-}
-
-async fn bootstrap_test_environment(
-    shard_id: u16,
-    config: &SystemConfig,
-) -> Result<BootstrapResult, IggyError> {
-    let stream_name = "stream-1".to_owned();
-    let topic_name = "topic-1".to_owned();
-    let topic_expiry = IggyExpiry::NeverExpire;
-    let topic_size = MaxTopicSize::Unlimited;
-    let partitions_count = 1;
-
-    let streams = Streams::default();
-    // Create stream together with its dirs
-    let stream = stream::create_and_insert_stream_mem(&streams, stream_name);
-    create_stream_file_hierarchy(stream.id(), config).await?;
-    // Create topic together with its dirs
-    let stream_id = Identifier::numeric(stream.id() as u32).unwrap();
-    let parent_stats = streams.with_stream_by_id(&stream_id, |(_, stats)| 
stats.clone());
-    let message_expiry = config.resolve_message_expiry(topic_expiry);
-    let max_topic_size = config.resolve_max_topic_size(topic_size)?;
-
-    let topic = topic::create_and_insert_topics_mem(
-        &streams,
-        &stream_id,
-        topic_name,
-        1,
-        message_expiry,
-        CompressionAlgorithm::default(),
-        max_topic_size,
-        parent_stats,
-    );
-    create_topic_file_hierarchy(stream.id(), topic.id(), config).await?;
-    // Create partition together with its dirs
-    let topic_id = Identifier::numeric(topic.id() as u32).unwrap();
-    let parent_stats = streams.with_topic_by_id(
-        &stream_id,
-        &topic_id,
-        streaming::topics::helpers::get_stats(),
-    );
-    let partitions = partition::create_and_insert_partitions_mem(
-        &streams,
-        &stream_id,
-        &topic_id,
-        parent_stats,
-        partitions_count,
-        config,
-    );
-    for partition in partitions {
-        create_partition_file_hierarchy(stream.id(), topic.id(), 
partition.id(), config).await?;
-
-        // Open the log
-        let start_offset = 0;
-        let segment = Segment::new(
-            start_offset,
-            config.segment.size,
-            config.segment.message_expiry,
-        );
-        let messages_size = 0;
-        let indexes_size = 0;
-        let storage = create_segment_storage(
-            config,
-            stream.id(),
-            topic.id(),
-            partition.id(),
-            messages_size,
-            indexes_size,
-            start_offset,
-        )
-        .await?;
-
-        streams.with_partition_by_id_mut(&stream_id, &topic_id, 
partition.id(), |(.., log)| {
-            log.add_persisted_segment(segment, storage);
-        });
-    }
-
-    // Create a test task registry with dummy stop sender from ShardConnector
-    let connector: ShardConnector<()> = ShardConnector::new(shard_id);
-    let task_registry = Rc::new(TaskRegistry::new(shard_id, 
vec![connector.stop_sender]));
-
-    Ok(BootstrapResult {
-        streams,
-        stream_id,
-        topic_id,
-        partition_id: 0,
-        task_registry,
-    })
-}


Reply via email to