This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_fixes_7
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1e32b39f9edcb5bfb6c1ed87713f578f4c26a817
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Oct 14 12:38:09 2025 +0200

    fixes
---
 core/bench/src/actors/producer/client/low_level.rs |  2 +-
 .../consumer_offsets/delete_consumer_offset.rs     |  2 +-
 .../consumer_offsets/get_consumer_offset.rs        |  2 +-
 .../consumer_offsets/store_consumer_offset.rs      |  2 +-
 core/common/src/commands/messages/poll_messages.rs |  2 +-
 core/integration/src/test_server.rs                | 48 +++++++-----
 .../data_integrity/verify_after_server_restart.rs  | 91 ++++++++++++++++------
 core/integration/tests/mcp/mod.rs                  | 20 ++---
 .../tests/server/scenarios/encryption_scenario.rs  |  8 +-
 .../tests/server/scenarios/tcp_tls_scenario.rs     |  2 +-
 core/server/server.http                            |  2 +-
 core/server/src/bootstrap.rs                       | 22 +++---
 core/server/src/quic/quic_server.rs                |  3 +
 core/server/src/shard/mod.rs                       |  6 +-
 core/server/src/shard/system/messages.rs           | 19 ++++-
 core/server/src/slab/streams.rs                    |  9 ++-
 core/server/src/tcp/tcp_listener.rs                |  3 +
 core/server/src/tcp/tcp_tls_listener.rs            |  3 +
 18 files changed, 164 insertions(+), 82 deletions(-)

diff --git a/core/bench/src/actors/producer/client/low_level.rs 
b/core/bench/src/actors/producer/client/low_level.rs
index 86d022648..fc2f483ed 100644
--- a/core/bench/src/actors/producer/client/low_level.rs
+++ b/core/bench/src/actors/producer/client/low_level.rs
@@ -49,7 +49,7 @@ impl LowLevelProducerClient {
             client: None,
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partitioning: Partitioning::partition_id(1),
+            partitioning: Partitioning::partition_id(0),
         }
     }
 }
diff --git 
a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
index aab6c096d..8cfeac24d 100644
--- a/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/delete_consumer_offset.rs
@@ -54,7 +54,7 @@ impl Default for DeleteConsumerOffset {
             consumer: Consumer::default(),
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partition_id: Some(1),
+            partition_id: Some(0),
         }
     }
 }
diff --git a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
index c2a3dda8e..a98ac6c4d 100644
--- a/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/get_consumer_offset.rs
@@ -67,7 +67,7 @@ impl Command for GetConsumerOffset {
 }
 
 fn default_partition_id() -> Option<u32> {
-    Some(1)
+    Some(0)
 }
 
 impl Validatable<IggyError> for GetConsumerOffset {
diff --git a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs 
b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
index ad3c4eae6..9500138d1 100644
--- a/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
+++ b/core/common/src/commands/consumer_offsets/store_consumer_offset.rs
@@ -57,7 +57,7 @@ impl Default for StoreConsumerOffset {
             consumer: Consumer::default(),
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
-            partition_id: Some(1),
+            partition_id: Some(0),
             offset: 0,
         }
     }
diff --git a/core/common/src/commands/messages/poll_messages.rs 
b/core/common/src/commands/messages/poll_messages.rs
index 3f4045bbd..e51b03ef6 100644
--- a/core/common/src/commands/messages/poll_messages.rs
+++ b/core/common/src/commands/messages/poll_messages.rs
@@ -24,7 +24,7 @@ use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
 use std::fmt::Display;
 
-pub const DEFAULT_PARTITION_ID: u32 = 1;
+pub const DEFAULT_PARTITION_ID: u32 = 0;
 pub const DEFAULT_NUMBER_OF_MESSAGES_TO_POLL: u32 = 10;
 
 /// `PollMessages` command is used to poll messages from a topic in a stream.
diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index 7914c4c2e..160094d26 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -408,33 +408,41 @@ impl TestServer {
         });
 
         if let Some(config) = config {
-            let quic_addr: SocketAddr = config.quic.address.parse().unwrap();
-            if quic_addr.port() == 0 {
-                panic!("Quic address port is 0!");
+            // Only validate and add enabled protocols
+            if config.quic.enabled {
+                let quic_addr: SocketAddr = 
config.quic.address.parse().unwrap();
+                if quic_addr.port() == 0 {
+                    panic!("Quic address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::QuicUdp(quic_addr));
             }
 
-            let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap();
-            if tcp_addr.port() == 0 {
-                panic!("Tcp address port is 0!");
+            if config.tcp.enabled {
+                let tcp_addr: SocketAddr = config.tcp.address.parse().unwrap();
+                if tcp_addr.port() == 0 {
+                    panic!("Tcp address port is 0!");
+                }
+                self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr));
             }
 
-            let http_addr: SocketAddr = config.http.address.parse().unwrap();
-            if http_addr.port() == 0 {
-                panic!("Http address port is 0!");
+            if config.http.enabled {
+                let http_addr: SocketAddr = 
config.http.address.parse().unwrap();
+                if http_addr.port() == 0 {
+                    panic!("Http address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::HttpTcp(http_addr));
             }
 
-            let websocket_addr: SocketAddr = 
config.websocket.address.parse().unwrap();
-            if websocket_addr.port() == 0 {
-                panic!("WebSocket address port is 0!");
+            if config.websocket.enabled {
+                let websocket_addr: SocketAddr = 
config.websocket.address.parse().unwrap();
+                if websocket_addr.port() == 0 {
+                    panic!("WebSocket address port is 0!");
+                }
+                self.server_addrs
+                    .push(ServerProtocolAddr::WebSocket(websocket_addr));
             }
-
-            self.server_addrs
-                .push(ServerProtocolAddr::QuicUdp(quic_addr));
-            self.server_addrs.push(ServerProtocolAddr::RawTcp(tcp_addr));
-            self.server_addrs
-                .push(ServerProtocolAddr::HttpTcp(http_addr));
-            self.server_addrs
-                .push(ServerProtocolAddr::WebSocket(websocket_addr));
         } else {
             panic!(
                 "Failed to load config from file {config_path} in 
{MAX_PORT_WAIT_DURATION_S} s!"
diff --git 
a/core/integration/tests/data_integrity/verify_after_server_restart.rs 
b/core/integration/tests/data_integrity/verify_after_server_restart.rs
index 17e029e0d..10414364c 100644
--- a/core/integration/tests/data_integrity/verify_after_server_restart.rs
+++ b/core/integration/tests/data_integrity/verify_after_server_restart.rs
@@ -28,10 +28,6 @@ use serial_test::parallel;
 use std::{collections::HashMap, str::FromStr};
 use test_case::test_matrix;
 
-/*
- * Helper functions for test matrix parameters
- */
-
 fn cache_open_segment() -> &'static str {
     "open_segment"
 }
@@ -85,17 +81,6 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         amount_of_data_to_process,
     );
 
-    let default_bench_stream_identifiers: [Identifier; 8] = [
-        Identifier::numeric(3000001).unwrap(),
-        Identifier::numeric(3000002).unwrap(),
-        Identifier::numeric(3000003).unwrap(),
-        Identifier::numeric(3000004).unwrap(),
-        Identifier::numeric(3000005).unwrap(),
-        Identifier::numeric(3000006).unwrap(),
-        Identifier::numeric(3000007).unwrap(),
-        Identifier::numeric(3000008).unwrap(),
-    ];
-
     // 4. Connect and login to newly started server
     let client = TcpClientFactory {
         server_addr,
@@ -103,12 +88,15 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     }
     .create_client()
     .await;
+
     let client = IggyClient::create(client, None, None);
     login_root(&client).await;
-    let topic_id = Identifier::numeric(1).unwrap();
-    for stream_id in default_bench_stream_identifiers {
+
+    let topic_id = Identifier::numeric(0).unwrap();
+    for i in 0..7 {
+        let stream_id = Identifier::numeric(i).unwrap();
         client
-            .flush_unsaved_buffer(&stream_id, &topic_id, 1, false)
+            .flush_unsaved_buffer(&stream_id, &topic_id, 0, true)
             .await
             .unwrap();
     }
@@ -133,7 +121,50 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     test_server.start();
     let server_addr = test_server.get_raw_tcp_addr().unwrap();
 
-    // 8. Run send bench again to add more data
+    // 8. Verify stats are preserved after restart (before adding more data)
+    let client_after_restart = IggyClient::create(
+        TcpClientFactory {
+            server_addr: server_addr.clone(),
+            ..Default::default()
+        }
+        .create_client()
+        .await,
+        None,
+        None,
+    );
+    login_root(&client_after_restart).await;
+
+    let stats_after_restart = client_after_restart.get_stats().await.unwrap();
+    assert_eq!(
+        expected_messages_count, stats_after_restart.messages_count,
+        "Messages count should be preserved after restart (before: {}, after: 
{})",
+        expected_messages_count, stats_after_restart.messages_count
+    );
+    assert_eq!(
+        expected_messages_size_bytes.as_bytes_usize(),
+        stats_after_restart.messages_size_bytes.as_bytes_usize(),
+        "Messages size bytes should be preserved after restart (before: {}, 
after: {})",
+        expected_messages_size_bytes.as_bytes_usize(),
+        stats_after_restart.messages_size_bytes.as_bytes_usize()
+    );
+    assert_eq!(
+        expected_streams_count, stats_after_restart.streams_count,
+        "Streams count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_topics_count, stats_after_restart.topics_count,
+        "Topics count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_partitions_count, stats_after_restart.partitions_count,
+        "Partitions count should be preserved after restart"
+    );
+    assert_eq!(
+        expected_segments_count, stats_after_restart.segments_count,
+        "Segments count should be preserved after restart"
+    );
+
+    // 9. Run send bench again to add more data
     run_bench_and_wait_for_finish(
         &server_addr,
         &TransportProtocol::Tcp,
@@ -141,7 +172,7 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         amount_of_data_to_process,
     );
 
-    // 9. Run poll bench again to check if all data is still there
+    // 10. Run poll bench again to check if all data is still there
     run_bench_and_wait_for_finish(
         &server_addr,
         &TransportProtocol::Tcp,
@@ -149,7 +180,7 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
     );
 
-    // 10. Connect and login to newly started server
+    // 11. Connect and login to newly started server
     let client = IggyClient::create(
         TcpClientFactory {
             server_addr: server_addr.clone(),
@@ -162,7 +193,17 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     );
     login_root(&client).await;
 
-    // 11. Save stats from the second server (should have double the data)
+    // 12. Flush unsaved buffer
+    let topic_id = Identifier::numeric(0).unwrap();
+    for i in 0..7 {
+        let stream_id = Identifier::numeric(i).unwrap();
+        client
+            .flush_unsaved_buffer(&stream_id, &topic_id, 0, true)
+            .await
+            .unwrap();
+    }
+
+    // 13. Save stats from the second server (should have double the data)
     let stats = client.get_stats().await.unwrap();
     let actual_messages_size_bytes = stats.messages_size_bytes;
     let actual_streams_count = stats.streams_count;
@@ -173,7 +214,7 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
     let actual_clients_count = stats.clients_count;
     let actual_consumer_groups_count = stats.consumer_groups_count;
 
-    // 12. Compare stats (expecting double the messages/size after second 
bench run)
+    // 14. Compare stats (expecting double the messages/size after second 
bench run)
     assert_eq!(
         expected_messages_size_bytes.as_bytes_usize() * 2,
         actual_messages_size_bytes.as_bytes_usize(),
@@ -206,7 +247,7 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         "Consumer groups count"
     );
 
-    // 13. Run poll bench to check if all data (10MB total) is still there
+    // 15. Run poll bench to check if all data (10MB total) is still there
     run_bench_and_wait_for_finish(
         &server_addr,
         &TransportProtocol::Tcp,
@@ -214,6 +255,6 @@ async fn 
should_fill_data_and_verify_after_restart(cache_setting: &'static str)
         IggyByteSize::from(amount_of_data_to_process.as_bytes_u64() * 2),
     );
 
-    // 14. Manual cleanup
+    // 16. Manual cleanup
     std::fs::remove_dir_all(local_data_path).unwrap();
 }
diff --git a/core/integration/tests/mcp/mod.rs 
b/core/integration/tests/mcp/mod.rs
index 635b88990..3ffd8fc51 100644
--- a/core/integration/tests/mcp/mod.rs
+++ b/core/integration/tests/mcp/mod.rs
@@ -179,7 +179,7 @@ async fn mcp_server_should_create_topic() {
         "create_topic",
         Some(json!({ "stream_id": STREAM_NAME, "name": name, 
"partitions_count": 1})),
         |topic| {
-            assert_eq!(topic.id, 2);
+            assert_eq!(topic.id, 1);
             assert_eq!(topic.name, name);
             assert_eq!(topic.partitions_count, 1);
             assert_eq!(topic.messages_count, 0);
@@ -244,7 +244,7 @@ async fn mcp_server_should_delete_partitions() {
 async fn mcp_server_should_delete_segments() {
     assert_empty_response(
         "delete_segments",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "segments_count": 1 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "segments_count": 1 })),
     )
     .await;
 }
@@ -254,7 +254,7 @@ async fn mcp_server_should_delete_segments() {
 async fn mcp_server_should_poll_messages() {
     assert_response::<PolledMessages>(
         "poll_messages",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
         |messages| {
             assert_eq!(messages.messages.len(), 1);
             let message = &messages.messages[0];
@@ -271,7 +271,7 @@ async fn mcp_server_should_poll_messages() {
 async fn mcp_server_should_send_messages() {
     assert_empty_response(
         "send_messages",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "messages": [
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "messages": [
             {
                 "payload": "test"
             }
@@ -375,11 +375,11 @@ async fn mcp_server_should_delete_consumer_group() {
 async fn mcp_server_should_return_consumer_offset() {
     assert_response::<Option<ConsumerOffsetInfo>>(
         "get_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0 })),
         |offset| {
             assert!(offset.is_some());
             let offset = offset.unwrap();
-            assert_eq!(offset.partition_id, 1);
+            assert_eq!(offset.partition_id, 0);
             assert_eq!(offset.stored_offset, 0);
             assert_eq!(offset.current_offset, 0);
         },
@@ -392,7 +392,7 @@ async fn mcp_server_should_return_consumer_offset() {
 async fn mcp_server_should_store_consumer_offset() {
     assert_empty_response(
         "store_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
     )
     .await;
 }
@@ -402,7 +402,7 @@ async fn mcp_server_should_store_consumer_offset() {
 async fn mcp_server_should_delete_consumer_offset() {
     assert_empty_response(
         "delete_consumer_offset",
-        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 1, "offset": 0 })),
+        Some(json!({ "stream_id": STREAM_NAME, "topic_id": TOPIC_NAME, 
"partition_id": 0, "offset": 0 })),
     )
     .await;
 }
@@ -630,7 +630,7 @@ async fn seed_data(iggy_server_address: &str) {
         .send_messages(
             &STREAM_ID,
             &TOPIC_ID,
-            &Partitioning::partition_id(1),
+            &Partitioning::partition_id(0),
             &mut messages,
         )
         .await
@@ -640,7 +640,7 @@ async fn seed_data(iggy_server_address: &str) {
         Consumer::new(Identifier::named(CONSUMER_NAME).expect("Failed to 
create consumer"));
 
     iggy_client
-        .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(1), 0)
+        .store_consumer_offset(&consumer, &STREAM_ID, &TOPIC_ID, Some(0), 0)
         .await
         .expect("Failed to store consumer offset");
 
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs 
b/core/integration/tests/server/scenarios/encryption_scenario.rs
index 88c115851..77a81a6c1 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -130,7 +130,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .send_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            &Partitioning::partition_id(1),
+            &Partitioning::partition_id(0),
             &mut messages_batch_1,
         )
         .await
@@ -160,7 +160,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .poll_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            Some(1),
+            Some(0),
             &consumer,
             &PollingStrategy::offset(0),
             messages_per_batch.try_into().unwrap(),
@@ -275,7 +275,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .send_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            &Partitioning::partition_id(1), // Use specific partition for 
testing
+            &Partitioning::partition_id(0), // Use specific partition for 
testing
             &mut messages_batch_2,
         )
         .await
@@ -300,7 +300,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
         .poll_messages(
             &Identifier::numeric(stream_id).unwrap(),
             &Identifier::numeric(topic_id).unwrap(),
-            Some(1),
+            Some(0),
             &consumer,
             &PollingStrategy::offset(0),
             messages_per_batch as u32 * 2,
diff --git a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs 
b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
index bac2b1858..4aa3e70c1 100644
--- a/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
+++ b/core/integration/tests/server/scenarios/tcp_tls_scenario.rs
@@ -69,7 +69,7 @@ pub async fn run(client: &IggyClient) {
         .poll_messages(
             &Identifier::named(stream_name).unwrap(),
             &Identifier::named(topic_name).unwrap(),
-            Some(1),
+            Some(0),
             &Consumer::default(),
             &PollingStrategy::offset(0),
             1,
diff --git a/core/server/server.http b/core/server/server.http
index cb46d4213..19ceb0861 100644
--- a/core/server/server.http
+++ b/core/server/server.http
@@ -18,7 +18,7 @@
 @url = http://localhost:3000
 @stream_id = 1
 @topic_id = 1
-@partition_id = 1
+@partition_id = 0
 @consumer_group_id = 1
 @consumer_id = 1
 @client_id = 1
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 5beb6f41e..a103e8ae1 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -38,7 +38,7 @@ use crate::{
         streams::stream2,
         topics::{consumer_group2, topic2},
         users::user::User,
-        utils::file::overwrite,
+        utils::{crypto, file::overwrite},
     },
     versioning::SemanticVersion,
 };
@@ -48,8 +48,8 @@ use error_set::ErrContext;
 use iggy_common::{
     ConsumerKind, IggyByteSize, IggyError,
     defaults::{
-        DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, 
MAX_USERNAME_LENGTH,
-        MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH,
+        DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, 
MIN_PASSWORD_LENGTH,
+        MIN_USERNAME_LENGTH,
     },
 };
 use std::{collections::HashSet, env, path::Path, sync::Arc};
@@ -289,9 +289,8 @@ pub async fn create_directories(config: &SystemConfig) -> 
Result<(), IggyError>
 }
 
 pub fn create_root_user() -> User {
-    info!("Creating root user...");
-    let username = env::var(IGGY_ROOT_USERNAME_ENV);
-    let password = env::var(IGGY_ROOT_PASSWORD_ENV);
+    let mut username = env::var(IGGY_ROOT_USERNAME_ENV);
+    let mut password = env::var(IGGY_ROOT_PASSWORD_ENV);
     if (username.is_ok() && password.is_err()) || (username.is_err() && 
password.is_ok()) {
         panic!(
             "When providing the custom root user credentials, both username 
and password must be set."
@@ -300,11 +299,15 @@ pub fn create_root_user() -> User {
     if username.is_ok() && password.is_ok() {
         info!("Using the custom root user credentials.");
     } else {
-        info!("Using the default root user credentials.");
+        info!("Using the default root user credentials...");
+        username = Ok(DEFAULT_ROOT_USERNAME.to_string());
+        let generated_password = crypto::generate_secret(20..40);
+        println!("Generated root user password: {generated_password}");
+        password = Ok(generated_password);
     }
 
-    let username = username.unwrap_or(DEFAULT_ROOT_USERNAME.to_string());
-    let password = password.unwrap_or(DEFAULT_ROOT_PASSWORD.to_string());
+    let username = username.expect("Root username is not set.");
+    let password = password.expect("Root password is not set.");
     if username.is_empty() || password.is_empty() {
         panic!("Root user credentials are not set.");
     }
@@ -320,6 +323,7 @@ pub fn create_root_user() -> User {
     if password.len() > MAX_PASSWORD_LENGTH {
         panic!("Root password is too long.");
     }
+
     User::root(&username, &password)
 }
 
diff --git a/core/server/src/quic/quic_server.rs 
b/core/server/src/quic/quic_server.rs
index 1a67d64d8..c5033213a 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -127,6 +127,9 @@ pub async fn spawn_quic_server(
         shard.quic_bound_address.set(Some(actual_addr));
 
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             // Broadcast to other shards for SO_REUSEPORT binding
             let event = ShardEvent::AddressBound {
                 protocol: iggy_common::TransportProtocol::Quic,
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 0996e1fc1..328578eee 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -135,7 +135,7 @@ pub struct IggyShard {
     pub(crate) quic_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) websocket_bound_address: Cell<Option<SocketAddr>>,
     pub(crate) http_bound_address: Cell<Option<SocketAddr>>,
-    config_writer_notify: async_channel::Sender<()>,
+    pub(crate) config_writer_notify: async_channel::Sender<()>,
     config_writer_receiver: async_channel::Receiver<()>,
     pub(crate) task_registry: Rc<TaskRegistry>,
 }
@@ -847,9 +847,9 @@ impl IggyShard {
                 stream_id,
                 topic_id,
                 partition_id,
-                ..
+                fsync,
             } => {
-                self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, 
partition_id)
+                self.flush_unsaved_buffer_bypass_auth(&stream_id, &topic_id, 
partition_id, fsync)
                     .await?;
                 Ok(())
             }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 7c543c3b7..bc04de427 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -281,9 +281,11 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        _fsync: bool,
+        fsync: bool,
     ) -> Result<(), IggyError> {
         self.ensure_authenticated(session)?;
+        self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
+
         let numeric_stream_id = self
             .streams2
             .with_stream_by_id(stream_id, streams::helpers::get_stream_id());
@@ -304,7 +306,7 @@ impl IggyShard {
                 format!("{COMPONENT} (error: {error}) - permission denied to 
append messages for user {} on stream ID: {}, topic ID: {}", 
session.get_user_id(), numeric_stream_id as u32, numeric_topic_id as u32)
             })?;
 
-        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, 
fsync)
             .await?;
         Ok(())
     }
@@ -314,8 +316,10 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
+        fsync: bool,
     ) -> Result<(), IggyError> {
-        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id)
+        self.ensure_partition_exists(stream_id, topic_id, partition_id)?;
+        self.flush_unsaved_buffer_base(stream_id, topic_id, partition_id, 
fsync)
             .await
     }
 
@@ -324,6 +328,7 @@ impl IggyShard {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
+        fsync: bool,
     ) -> Result<(), IggyError> {
         let batches = self.streams2.with_partition_by_id_mut(
             stream_id,
@@ -342,6 +347,14 @@ impl IggyShard {
                 &self.config.system,
             )
             .await?;
+
+        // Ensure all data is flushed to disk before returning
+        if fsync {
+            self.streams2
+                .fsync_all_messages(stream_id, topic_id, partition_id)
+                .await?;
+        }
+
         Ok(())
     }
 
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 0ad76809c..7dccec27f 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1337,9 +1337,16 @@ impl Streams {
         partition_id: usize,
     ) -> Result<(), IggyError> {
         let storage = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
-            log.active_storage().clone()
+            if !log.has_segments() {
+                return None;
+            }
+            Some(log.active_storage().clone())
         });
 
+        let Some(storage) = storage else {
+            return Ok(());
+        };
+
         if storage.messages_writer.is_none() || storage.index_writer.is_none() 
{
             return Ok(());
         }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 580b7a60e..049c5b7f0 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -103,6 +103,9 @@ pub async fn start(
         shard.tcp_bound_address.set(Some(actual_addr));
 
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             // Broadcast to other shards for SO_REUSEPORT binding
             let event = ShardEvent::AddressBound {
                 protocol: TransportProtocol::Tcp,
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 7dc419603..c41233504 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -72,6 +72,9 @@ pub(crate) async fn start(
     if shard.id == 0 {
         shard.tcp_bound_address.set(Some(actual_addr));
         if addr.port() == 0 {
+            // Notify config writer on shard 0
+            let _ = shard.config_writer_notify.try_send(());
+
             let event = ShardEvent::AddressBound {
                 protocol: TransportProtocol::Tcp,
                 address: actual_addr,

Reply via email to