This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_fixes_7
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 4e09f89f793aead1c709b8900718386a226eb66d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 14 12:38:09 2025 +0200

    fixes
---
 core/bench/src/actors/producer/client/low_level.rs |   2 +-
 .../consumer_offsets/delete_consumer_offset.rs     |   2 +-
 .../consumer_offsets/get_consumer_offset.rs        |   2 +-
 .../consumer_offsets/store_consumer_offset.rs      |   2 +-
 core/common/src/commands/messages/poll_messages.rs |   2 +-
 core/integration/src/test_server.rs                |  48 +--
 .../data_integrity/verify_after_server_restart.rs  | 326 ++++++++++++++-------
 core/integration/tests/mcp/mod.rs                  |  20 +-
 .../tests/server/scenarios/encryption_scenario.rs  |   8 +-
 .../tests/server/scenarios/tcp_tls_scenario.rs     |   2 +-
 core/server/server.http                            |   2 +-
 core/server/src/bootstrap.rs                       |  22 +-
 core/server/src/quic/quic_server.rs                |   3 +
 core/server/src/shard/mod.rs                       |   6 +-
 core/server/src/shard/system/messages.rs           |  19 +-
 core/server/src/slab/streams.rs                    |   9 +-
 core/server/src/tcp/tcp_listener.rs                |   3 +
 core/server/src/tcp/tcp_tls_listener.rs            |   3 +
 18 files changed, 319 insertions(+), 162 deletions(-)

diff --git a/core/bench/src/actors/producer/client/low_level.rs 
b/core/bench/src/actors/producer/client/low_level.rs
index 86d022648..fc2f483ed 100644
--- a/core/bench/src/actors/producer/client/low_level.rs
+++ b/core/bench/src/actors/producer/client/low_level.rs
@@ -49,7 +49,7 @@ impl LowLevelProducerClient {
             client: None,
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partitioning: Partitioning::partition_id(1),
+            partitioning: Partitioning::partition_id(0),
         }
     }
 }
diff --git 
a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
index aab6c096d..8cfeac24d 100644
--- a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
@@ -54,7 +54,7 @@ impl Default for DeleteConsumerOffset {
             consumer: Consumer::default(),
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partition_id: Some(1),
+            partition_id: Some(0),
         }
     }
 }
diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
index c2a3dda8e..a98ac6c4d 100644
--- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
@@ -67,7 +67,7 @@ impl Command for GetConsumerOffset {
 }
 
 fn default_partition_id() -> Option<u32> {
-    Some(1)
+    Some(0)
 }
 
 impl Validatable<IggyError> for GetConsumerOffset {
diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
index ad3c4eae6..9500138d1 100644
--- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
@@ -57,7 +57,7 @@ impl Default for StoreConsumerOffset {
             consumer: Consumer::default(),
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partition_id: Some(1),
+            partition_id: Some(0),
             offset: 0,
         }
     }
diff --git a/core/common/src/commands/messages/poll_messages.rs 
b/core/common/src/commands/messages/poll_messages.rs
index 3f4045bbd..e51b03ef6 100644
--- a/core/common/src/commands/messages/poll_messages.rs
+++ b/core/common/src/commands/messages/poll_messages.rs
@@ -24,7 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use std::fmt::Display;
 
-pub const DEFAULT_PARTITION_ID: u32 = 1;
+pub const DEFAULT_PARTITION_ID: u32 = 0;
 pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10;
 
 /// `PollMessages` command is used to poll messages from a topic in a stream.
diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index 7914c4c2e..160094d26 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -408,33 +408,41 @@ impl TestServer {
         });
 
         if let Some(config) = config {
-            let quic_addr: SocketAddr = config.quic.address.parse().unwrap();
-            if quic_addr.port() == 0 {
-                panic!("Quic address port is 0!");
+            // Only validate and add enabled protocols
+            if config.quic.enabled {
+                let quic_addr: SocketAddr = 
config.quic.address.parse().unwrap();
+                if quic_addr.port() == 0 {
+                    panic!("Quic address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::QuicUdp(quic_addr));
             }
 
-            let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap();
-            if tcp_addr.port() == 0 {
-                panic!("Tcp address port is 0!");
+            if config.tcp.enabled {
+                let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap();
+                if tcp_addr.port() == 0 {
+                    panic!("Tcp address port is 0!");
+                }
+                self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr));
             }
 
-            let http_addr: SocketAddr = config.http.address.parse().unwrap();
-            if http_addr.port() == 0 {
-                panic!("Http address port is 0!");
+            if config.http.enabled {
+                let http_addr: SocketAddr = 
config.http.address.parse().unwrap();
+                if http_addr.port() == 0 {
+                    panic!("Http address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::HttpTcp(http_addr));
             }
 
-            let websocket_addr: SocketAddr = 
config.websocket.address.parse().unwrap();
-            if websocket_addr.port() == 0 {
-                panic!("WebSocket address port is 0!");
+            if config.websocket.enabled {
+                let websocket_addr: SocketAddr = 
config.websocket.address.parse().unwrap();
+                if websocket_addr.port() == 0 {
+                    panic!("WebSocket address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::WebSocket(websocket_addr));
             }
-
-            self.server_addrs
-                .push(ServerProtocolAddr::QuicUdp(quic_addr));
-            self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr));
-            self.server_addrs
-                .push(ServerProtocolAddr::HttpTcp(http_addr));
-            self.server_addrs
-                .push(ServerProtocolAddr::WebSocket(websocket_addr));
         } else {
             panic!(
                 "Failed to load config from file {config_path} in 
{MAX_PORT_WAIT_DURATION_S} s!"
diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 17e029e0d..4c4aae1d6 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -16,21 +16,25 @@
  * under the License.
  */
 
+use bytes::Bytes;
 use iggy::clients::client::IggyClient;
-use iggy::prelude::{Identifier, IggyByteSize, MessageClient, SystemClient};
-use iggy_common::TransportProtocol;
-use integration::bench_utils::run_bench_and_wait_for_finish;
+use iggy::prelude::*;
 use integration::{
     tcp_client::TcpClientFactory,
     test_server::{ClientFactory, IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer, 
login_root},
 };
 use serial_test::parallel;
-use std::{collections::HashMap, str::FromStr};
+use std::collections::HashMap;
 use test_case::test_matrix;
 
-/*
- * Helper functions for test matrix parameters
- */
+// Constants matching iggy-bench behavior
+const NUMBER_OF_STREAMS: u32 = 1;
+const TOPIC_ID: u32 = 0;
+const PARTITION_ID: u32 = 0;
+const PARTITIONS_COUNT: u32 = 1;
+const MESSAGE_BATCHES: u64 = 100;
+const MESSAGES_PER_BATCH: u64 = 100;
+const TOTAL_MESSAGES_PER_STREAM: u64 = MESSAGE_BATCHES * MESSAGES_PER_BATCH; 
// 10,000
 
 fn cache_open_segment() -> &'static str {
     "open_segment"
@@ -44,7 +48,6 @@ fn cache_none() -> &'static str {
     "none"
 }
 
-// TODO(numminex) - Move the message generation method from benchmark run to a 
special method.
 #[test_matrix(
     [cache_open_segment(), cache_all(), cache_none()]
 )]
@@ -66,54 +69,29 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, 
IpAddrKind::V4);
     test_server.start();
     let server_addr = test_server.get_raw_tcp_addr().unwrap();
-    let local_data_path = test_server.get_local_data_path().to_owned();
-
-    // 2. Run send bench to fill 5 MB of data
-    let amount_of_data_to_process = IggyByteSize::from_str("5 MB").unwrap();
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        &TransportProtocol::Tcp,
-        "pinned-producer",
-        amount_of_data_to_process,
-    );
 
-    // 3. Run poll bench to check if everything's OK
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        &TransportProtocol::Tcp,
-        "pinned-consumer",
-        amount_of_data_to_process,
-    );
-
-    let default_bench_stream_identifiers: [Identifier; 8] = [
-        Identifier::numeric(3000001).unwrap(),
-        Identifier::numeric(3000002).unwrap(),
-        Identifier::numeric(3000003).unwrap(),
-        Identifier::numeric(3000004).unwrap(),
-        Identifier::numeric(3000005).unwrap(),
-        Identifier::numeric(3000006).unwrap(),
-        Identifier::numeric(3000007).unwrap(),
-        Identifier::numeric(3000008).unwrap(),
-    ];
-
-    // 4. Connect and login to newly started server
+    // 2. Connect, login, and create streams/topics
     let client = TcpClientFactory {
-        server_addr,
+        server_addr: server_addr.clone(),
         ..Default::default()
     }
     .create_client()
     .await;
     let client = IggyClient::create(client, None, None);
     login_root(&client).await;
-    let topic_id = Identifier::numeric(1).unwrap();
-    for stream_id in default_bench_stream_identifiers {
-        client
-            .flush_unsaved_buffer(&stream_id, &topic_id, 1, false)
-            .await
-            .unwrap();
-    }
 
-    // 5. Save stats from the first server
+    create_streams_and_topics(&client).await;
+
+    // 3. Produce messages
+    produce_messages(&client, TOTAL_MESSAGES_PER_STREAM).await;
+
+    // 4. Consume and verify all messages
+    consume_messages(&client, TOTAL_MESSAGES_PER_STREAM).await;
+
+    // 5. Flush unsaved buffers for all streams
+    flush_all_streams(&client).await;
+
+    // 6. Save stats from the first server
     let stats = client.get_stats().await.unwrap();
     let expected_messages_size_bytes = stats.messages_size_bytes;
     let expected_streams_count = stats.streams_count;
@@ -124,33 +102,17 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     let expected_clients_count = stats.clients_count;
     let expected_consumer_groups_count = stats.consumer_groups_count;
 
-    // 6. Stop server
+    // 7. Stop server
     test_server.stop();
     drop(test_server);
 
-    // 7. Restart server with same settings
+    // 8. Restart server with same settings
     let mut test_server = TestServer::new(Some(env_vars.clone()), false, None, 
IpAddrKind::V4);
     test_server.start();
     let server_addr = test_server.get_raw_tcp_addr().unwrap();
 
-    // 8. Run send bench again to add more data
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        &TransportProtocol::Tcp,
-        "pinned-producer",
-        amount_of_data_to_process,
-    );
-
-    // 9. Run poll bench again to check if all data is still there
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        &TransportProtocol::Tcp,
-        "pinned-consumer",
-        IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
-    );
-
-    // 10. Connect and login to newly started server
-    let client = IggyClient::create(
+    // 9. Verify stats are preserved after restart (before adding more data)
+    let client_after_restart = IggyClient::create(
         TcpClientFactory {
             server_addr: server_addr.clone(),
             ..Default::default()
@@ -160,60 +122,214 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         None,
         None,
     );
-    login_root(&client).await;
+    login_root(&client_after_restart).await;
 
-    // 11. Save stats from the second server (should have double the data)
-    let stats = client.get_stats().await.unwrap();
-    let actual_messages_size_bytes = stats.messages_size_bytes;
-    let actual_streams_count = stats.streams_count;
-    let actual_topics_count = stats.topics_count;
-    let actual_partitions_count = stats.partitions_count;
-    let actual_segments_count = stats.segments_count;
-    let actual_messages_count = stats.messages_count;
-    let actual_clients_count = stats.clients_count;
-    let actual_consumer_groups_count = stats.consumer_groups_count;
-
-    // 12. Compare stats (expecting double the messages/size after second 
bench run)
+    let stats_after_restart = client_after_restart.get_stats().await.unwrap();
     assert_eq!(
-        expected_messages_size_bytes.as_bytes_usize() * 2,
-        actual_messages_size_bytes.as_bytes_usize(),
-        "Messages size bytes should be doubled"
+        expected_messages_count, stats_after_restart.messages_count,
+        "Messages count should be preserved after restart",
+    );
+    assert_eq!(
+        expected_messages_size_bytes.as_bytes_usize(),
+        stats_after_restart.messages_size_bytes.as_bytes_usize(),
+        "Messages size bytes should be preserved after restart"
+    );
+    assert_eq!(
+        expected_streams_count, stats_after_restart.streams_count,
+        "Streams count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_topics_count, stats_after_restart.topics_count,
+        "Topics count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_partitions_count, stats_after_restart.partitions_count,
+        "Partitions count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_segments_count, stats_after_restart.segments_count,
+        "Segments count should be preserved after restart"
+    );
+
+    // 10. Verify streams and topics still exist after restart
+    for stream_id in 0..NUMBER_OF_STREAMS {
+        let stream = client_after_restart
+            .get_stream(&Identifier::numeric(stream_id).unwrap())
+            .await
+            .unwrap();
+        assert!(
+            stream.is_some(),
+            "Stream {} should exist after restart",
+            stream_id
+        );
+    }
+
+    // 11. Verify we can still consume the first batch after restart
+    consume_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM).await;
+
+    // 12. Produce more messages (second batch)
+    produce_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM).await;
+
+    // 15. Save stats from the second server (should have double the data)
+    let stats_after_2nd_send = client_after_restart.get_stats().await.unwrap();
+    eprintln!("Stats after restart: {:#?}", stats_after_2nd_send);
+
+    // 16. Compare stats (expecting double the messages/size after second 
bench run)
+    assert_eq!(
+        expected_streams_count, stats_after_2nd_send.streams_count,
+        "Streams count should be the same"
+    );
+    assert_eq!(
+        expected_topics_count, stats_after_2nd_send.topics_count,
+        "Topics count should be the same"
     );
     assert_eq!(
-        expected_streams_count, actual_streams_count,
-        "Streams count"
+        expected_partitions_count, stats_after_2nd_send.partitions_count,
+        "Partitions count should be the same"
     );
-    assert_eq!(expected_topics_count, actual_topics_count, "Topics count");
     assert_eq!(
-        expected_partitions_count, actual_partitions_count,
-        "Partitions count"
+        stats_after_2nd_send.segments_count, expected_segments_count,
+        "Segments count should be the same"
     );
-    assert!(
-        actual_segments_count >= expected_segments_count,
-        "Segments count should be at least the same or more"
+    assert_eq!(
+        expected_messages_size_bytes.as_bytes_usize() * 2,
+        stats_after_2nd_send.messages_size_bytes.as_bytes_usize(),
+        "Messages size bytes should be doubled"
     );
     assert_eq!(
         expected_messages_count * 2,
-        actual_messages_count,
+        stats_after_2nd_send.messages_count,
         "Messages count should be doubled"
     );
     assert_eq!(
-        expected_clients_count, actual_clients_count,
-        "Clients count"
+        expected_clients_count, stats_after_2nd_send.clients_count,
+        "Clients count should be the same"
     );
     assert_eq!(
-        expected_consumer_groups_count, actual_consumer_groups_count,
-        "Consumer groups count"
+        expected_consumer_groups_count, 
stats_after_2nd_send.consumer_groups_count,
+        "Consumer groups count should be the same"
     );
 
-    // 13. Run poll bench to check if all data (10MB total) is still there
-    run_bench_and_wait_for_finish(
-        &server_addr,
-        &TransportProtocol::Tcp,
-        "pinned-consumer",
-        IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
-    );
+    eprintln!("Consuming all messages after restart");
+
+    // 13. Consume all messages (should be 20K total per stream)
+    consume_messages(&client_after_restart, TOTAL_MESSAGES_PER_STREAM * 
2).await;
+}
+
+/// Create all streams and topics matching iggy-bench behavior
+async fn create_streams_and_topics(client: &IggyClient) {
+    for stream_id in 0..NUMBER_OF_STREAMS {
+        // Create stream
+        client.create_stream(&stream_id.to_string()).await.unwrap();
+
+        // Create topic
+        client
+            .create_topic(
+                &Identifier::numeric(stream_id).unwrap(),
+                &TOPIC_ID.to_string(),
+                PARTITIONS_COUNT,
+                CompressionAlgorithm::default(),
+                None,
+                IggyExpiry::NeverExpire,
+                MaxTopicSize::ServerDefault,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+/// Create messages with the specified payload size
+fn create_messages(message_payload_size: usize, count: u64) -> 
Vec<IggyMessage> {
+    let mut messages = Vec::with_capacity(count as usize);
+    let payload = Bytes::from(vec![0xAB; message_payload_size]);
+
+    for offset in 0..count {
+        let id = (offset + 1) as u128;
+        let message = IggyMessage::builder()
+            .id(id)
+            .payload(payload.clone())
+            .build()
+            .expect("Failed to create message");
+        messages.push(message);
+    }
+    messages
+}
 
-    // 14. Manual cleanup
-    std::fs::remove_dir_all(local_data_path).unwrap();
+/// Produce messages to all streams/topics matching iggy-bench behavior
+async fn produce_messages(client: &IggyClient, messages_per_stream: u64) {
+    const MESSAGE_PAYLOAD_SIZE: usize = 100;
+    for stream_id in 0..NUMBER_OF_STREAMS {
+        let mut messages = create_messages(MESSAGE_PAYLOAD_SIZE, 
messages_per_stream);
+        client
+            .send_messages(
+                &Identifier::numeric(stream_id).unwrap(),
+                &Identifier::numeric(TOPIC_ID).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+}
+
+/// Consume and verify messages from all streams/topics
+async fn consume_messages(client: &IggyClient, expected_messages_per_stream: 
u64) {
+    for stream_id in 0..NUMBER_OF_STREAMS {
+        let mut total_messages = 0u64;
+        let mut offset = 0u64;
+
+        // Poll messages in batches until we get all expected messages
+        while total_messages < expected_messages_per_stream {
+            let polled_messages = client
+                .poll_messages(
+                    &Identifier::numeric(stream_id).unwrap(),
+                    &Identifier::numeric(TOPIC_ID).unwrap(),
+                    Some(PARTITION_ID),
+                    &Consumer::default(),
+                    &PollingStrategy::offset(offset),
+                    1000, // Poll up to 1000 messages at a time
+                    false,
+                )
+                .await
+                .unwrap();
+
+            if polled_messages.messages.is_empty() {
+                eprintln!(
+                    "Stream {}: No more messages at offset {}, total so far: 
{}/{}",
+                    stream_id, offset, total_messages, 
expected_messages_per_stream
+                );
+                break;
+            }
+
+            let messages_count = polled_messages.messages.len() as u64;
+            total_messages += messages_count;
+            eprintln!(
+                "Stream {}: Polled {} messages at offset {}, total: {}/{}",
+                stream_id, messages_count, offset, total_messages, 
expected_messages_per_stream
+            );
+
+            // Update offset to the last message's offset + 1
+            if let Some(last_msg) = polled_messages.messages.last() {
+                offset = last_msg.header.offset + 1;
+            }
+        }
+
+        assert_eq!(
+            total_messages, expected_messages_per_stream,
+            "Expected {} messages in stream {}, but got {}",
+            expected_messages_per_stream, stream_id, total_messages
+        );
+    }
+}
+
+/// Flush unsaved buffers for all streams
+async fn flush_all_streams(client: &IggyClient) {
+    let topic_id = Identifier::numeric(TOPIC_ID).unwrap();
+    for stream_id in 0..NUMBER_OF_STREAMS {
+        let stream_id = Identifier::numeric(stream_id).unwrap();
+        client
+            .flush_unsaved_buffer(&stream_id, &topic_id, PARTITION_ID, true)
+            .await
+            .unwrap();
+    }
 }
diff --git a/core/integration/tests/mcp/mod.rs 
b/core/integration/tests/mcp/mod.rs
index 635b88990..3ffd8fc51 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -179,7 +179,7 @@ async fn mcp_server_should_create_topic() {
         "create_topic",
         Some(json!({ "stream_id": STREAM_NAME, "name": name, 
"partitions_count": 1})),
         |topic| {
-            assert_eq!(topic.id, 2);
+            assert_eq!(topic.id, 1);
             assert_eq!(topic.name, name);
             assert_eq!(topic.partitions_count, 1);
             assert_eq!(topic.messages_count, 0);
@@ -244,7 +244,7 @@ async fn mcp_server_should_delete_partitions() {
 async fn mcp_server_should_delete_segments() {
     assert_empty_response(
         "delete_segments",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "segments_count": 1 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "segments_count": 1 })),
     )
     .await;
 }
@@ -254,7 +254,7 @@ async fn mcp_server_should_delete_segments() {
 async fn mcp_server_should_poll_messages() {
     assert_response::<PolledMessages>(
         "poll_messages",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
         |messages| {
             assert_eq!(messages.messages.len(), 1);
             let message = &messages.messages[0];
@@ -271,7 +271,7 @@ async fn mcp_server_should_poll_messages() {
 async fn mcp_server_should_send_messages() {
     assert_empty_response(
         "send_messages",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "messages": [
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "messages": [
             {
                 "payload": "test"
             }
@@ -375,11 +375,11 @@ async fn mcp_server_should_delete_consumer_group() {
 async fn mcp_server_should_return_consumer_offset() {
     assert_response::<Option<ConsumerOffsetInfo>>(
         "get_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0 })),
         |offset| {
             assert!(offset.is_some());
             let offset = offset.unwrap();
-            assert_eq!(offset.partition_id, 1);
+            assert_eq!(offset.partition_id, 0);
             assert_eq!(offset.stored_offset, 0);
             assert_eq!(offset.current_offset, 0);
         },
@@ -392,7 +392,7 @@ async fn mcp_server_should_return_consumer_offset() {
 async fn mcp_server_should_store_consumer_offset() {
     assert_empty_response(
         "store_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
     )
     .await;
 }
@@ -402,7 +402,7 @@ async fn mcp_server_should_store_consumer_offset() {
 async fn mcp_server_should_delete_consumer_offset() {
     assert_empty_response(
         "delete_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
     )
     .await;
 }
@@ -630,7 +630,7 @@ async fn seed_data(iggy_server_address: &str) {
         .send_messages(
             &STREAM_ID,
             &TOPIC_ID,
-            &Partitioning::partition_id(1),
+            &Partitioning::partition_id(0),
             &mut messages,
         )
         .await
@@ -640,7 +640,7 @@ async fn seed_data(iggy_server_address: &str) {
         Consumer::new(Identifier::named(CONSUMER_NAME).expect("Failed to 
create consumer"));
 
     iggy_client
-        .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(1), 0)
+        .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(0), 0)
         .await
         .expect("Failed to store consumer offset");
 
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs 
b/core/integration/tests/server/scenarios/encryption_scenario.rs
index 88c115851..77a81a6c1 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -130,7 +130,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .send_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            &Partitioning::partition_id(1),
+            &Partitioning::partition_id(0),
             &mut messages_batch_1,
         )
         .await
@@ -160,7 +160,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .poll_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            Some(1),
+            Some(0),
             &consumer,
             &PollingStrategy::offset(0),
             messages_per_batch.try_into().unwrap(),
@@ -275,7 +275,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .send_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            &Partitioning::partition_id(1), // Use specific partition for 
testing
+            &Partitioning::partition_id(0), // Use specific partition for 
testing
             &mut messages_batch_2,
         )
         .await
@@ -300,7 +300,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .poll_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            Some(1),
+            Some(0),
             &consumer,
             &PollingStrategy::offset(0),
             messages_per_batch as u32 * 2,
diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs 
b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
index bac2b1858..4aa3e70c1 100644
--- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
+++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
@@ -69,7 +69,7 @@ pub async fn run(client: &IggyClient) {
         .poll_messages(
             &Identifier::named(stream_name).unwrap(),
             &Identifier::named(topic_name).unwrap(),
-            Some(1),
+            Some(0),
             &Consumer::default(),
             &PollingStrategy::offset(0),
             1,
diff --git a/core/server/server.http b/core/server/server.http
index cb46d4213..19ceb0861 100644
--- a/core/server/server.http
+++ b/core/server/server.http
@@ -18,7 +18,7 @@
 @url = http://localhost:3000
 @stream_id = 1
 @topic_id = 1
-@partition_id = 1
+@partition_id = 0
 @consumer_group_id = 1
 @consumer_id = 1
 @client_id = 1
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 5beb6f41e..a103e8ae1 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -38,7 +38,7 @@ use crate::{
         streams::stream2,
         topics::{consumer_group2, topic2},
         users::user::User,
-        utils::file::overwrite,
+        utils::{crypto, file::overwrite},
     },
     versioning::SemanticVersion,
 };
@@ -48,8 +48,8 @@ use error_set::ErrContext;
 use iggy_common::{
     ConsumerKind, IggyByteSize, IggyError,
     defaults::{
-        DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, 
MAX_USERNAME_LENGTH,
-        MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
+        DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, 
MIN_PASSWORD_LENGTH,
+        MIN_USERNAME_LENGTH,
     },
 };
 use std::{collections::HashSet, env, path::Path, sync::Arc};
@@ -289,9 +289,8 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
 }
 
 pub fn create_root_user() -> User {
-    info!("Creating root user...");
-    let username = env::var(IGGY_ROOT_USERNAME_ENV);
-    let password = env::var(IGGY_ROOT_PASSWORD_ENV);
+    let mut username = env::var(IGGY_ROOT_USERNAME_ENV);
+    let mut password = env::var(IGGY_ROOT_PASSWORD_ENV);
     if (username.is_ok() && password.is_err()) || (username.is_err() && 
password.is_ok()) {
         panic!(
             "When providing the custom root user credentials, both username 
and password must be set."
@@ -300,11 +299,15 @@ pub fn create_root_user() -> User {
     if username.is_ok() && password.is_ok() {
         info!("Using the custom root user credentials.");
     } else {
-        info!("Using the default root user credentials.");
+        info!("Using the default root user credentials...");
+        username = Ok(DEFAULT_ROOT_USERNAME.to_string());
+        let generated_password = crypto::generate_secret(20..40);
+        println!("Generated root user password: {generated_password}");
+        password = Ok(generated_password);
     }
 
-    let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
-    let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
+    let username = username.expect("Root username is not set.");
+    let password = password.expect("Root password is not set.");
     if username.is_empty() || password.is_empty() {
         panic!("Root user credentials are not set.");
     }
@@ -320,6 +323,7 @@ pub fn create_root_user() -> User {
     if password.len() > MAX_PASSWORD_LENGTH {
         panic!("Root password is too long.");
     }
+
     User::root(&username, &password)
 }
 
diff --git a/core/server/src/quic/quic_server.rs 
b/core/server/src/quic/quic_server.rs
index 1a67d64d8..c5033213a 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -127,6 +127,9 @@ pub async fn spawn_quic_server(
         shard.quic_bound_address.set(Some(actual_addr));
 
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             // Broadcast to other shards for SO_REUSEPORT binding
             let event = ShardEvent::AddressBound {
                 protocol: iggy_common::TransportProtocol::Quic,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 0996e1fc1..328578eee 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -135,7 +135,7 @@ pub struct IggyShard {
     pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) websocket_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) http_bound_address: Cell<Option<SocketAddr>>,
-    config_writer_notify: async_channel::Sender<()>,
+    pub(crate) config_writer_notify: async_channel::Sender<()>,
     config_writer_receiver: async_channel::Receiver<()>,
     pub(crate) task_registry: Rc<TaskRegistry>,
 }
@@ -847,9 +847,9 @@ impl IggyShard {
                 stream_id,
                 topic_id,
                 partition_id,
-                ..
+                fsync,
             } => {
-                self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, 
partition_id)
+                self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, 
partition_id, fsync)
                     .await?;
                 Ok(())
             }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 7c543c3b7..bc04de427 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -281,9 +281,11 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        _fsync: bool,
+        fsync: bool,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
+        self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
+
         let numeric_stream_id = self
             .streams2
             .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
@@ -304,7 +306,7 @@ impl IggyShard {
                 format!("{COMPONENT} (error: {error}) - permission denied to 
append messages for user {} on stream ID: {}, topic ID: {}", 
session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32)
             })?;
 
-        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, 
fsync)
             .await?;
         Ok(())
     }
@@ -314,8 +316,10 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
+        fsync: bool,
     ) -> Result<(), IggyError> {
-        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+        self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
+        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, 
fsync)
             .await
     }
 
@@ -324,6 +328,7 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
+        fsync: bool,
     ) -> Result<(), IggyError> {
         let batches = self.streams2.with_partition_by_id_mut(
             stream_id,
@@ -342,6 +347,14 @@ impl IggyShard {
                 &self.config.system,
             )
             .await?;
+
+        // Ensure all data is flushed to disk before returning
+        if fsync {
+            self.streams2
+                .fsync_all_messages(stream_id, topic_id, partition_id)
+                .await?;
+        }
+
         Ok(())
     }
 
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 0ad76809c..7dccec27f 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1337,9 +1337,16 @@ impl Streams {
         partition_id: usize,
     ) -> Result<(), IggyError> {
         let storage = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
-            log.active_storage().clone()
+            if !log.has_segments() {
+                return None;
+            }
+            Some(log.active_storage().clone())
         });
 
+        let Some(storage) = storage else {
+            return Ok(());
+        };
+
         if storage.messages_writer.is_none() || storage.index_writer.is_none() 
{
             return Ok(());
         }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 580b7a60e..049c5b7f0 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -103,6 +103,9 @@ pub async fn start(
         shard.tcp_bound_address.set(Some(actual_addr));
 
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             // Broadcast to other shards for SO_REUSEPORT binding
             let event = ShardEvent::AddressBound {
                 protocol: TransportProtocol::Tcp,
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 7dc419603..c41233504 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -72,6 +72,9 @@ pub(crate) async fn start(
     if shard.id == 0 {
         shard.tcp_bound_address.set(Some(actual_addr));
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             let event = ShardEvent::AddressBound {
                 protocol: TransportProtocol::Tcp,
                 address: actual_addr,


Reply via email to